代码拉取完成,页面将自动刷新
同步操作将从 src-openEuler/gazelle 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
From 409b6155a1ec9324bd68aae97a07e33560c19028 Mon Sep 17 00:00:00 2001
From: jiangheng <jiangheng14@huawei.com>
Date: Wed, 21 Feb 2024 08:05:03 +0800
Subject: [PATCH] split the flow fules related functions into separate file
---
src/lstack/core/lstack_init.c | 1 +
src/lstack/core/lstack_protocol_stack.c | 25 +-
src/lstack/include/lstack_ethdev.h | 27 -
src/lstack/include/lstack_flow.h | 51 ++
src/lstack/include/lstack_vdev.h | 9 -
src/lstack/netif/dir.mk | 2 +-
src/lstack/netif/lstack_ethdev.c | 687 +-----------------------
src/lstack/netif/lstack_flow.c | 680 +++++++++++++++++++++++
src/lstack/netif/lstack_vdev.c | 1 +
9 files changed, 747 insertions(+), 736 deletions(-)
create mode 100644 src/lstack/include/lstack_flow.h
create mode 100644 src/lstack/netif/lstack_flow.c
diff --git a/src/lstack/core/lstack_init.c b/src/lstack/core/lstack_init.c
index 31fd91d..d22a295 100644
--- a/src/lstack/core/lstack_init.c
+++ b/src/lstack/core/lstack_init.c
@@ -48,6 +48,7 @@
#include "lstack_protocol_stack.h"
#include "lstack_preload.h"
#include "lstack_wrap.h"
+#include "lstack_flow.h"
static void check_process_start(void)
{
diff --git a/src/lstack/core/lstack_protocol_stack.c b/src/lstack/core/lstack_protocol_stack.c
index 18e5df7..a545b73 100644
--- a/src/lstack/core/lstack_protocol_stack.c
+++ b/src/lstack/core/lstack_protocol_stack.c
@@ -454,14 +454,12 @@ int stack_polling(uint32_t wakeup_tick)
{
int force_quit;
struct cfg_params *cfg = get_global_cfg_params();
- uint8_t use_ltran_flag = cfg->use_ltran;
#if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0)
bool kni_switch = cfg->kni_switch;
#endif
bool use_sockmap = cfg->use_sockmap;
bool stack_mode_rtc = cfg->stack_mode_rtc;
uint32_t rpc_number = cfg->rpc_number;
- uint32_t nic_read_number = cfg->nic_read_number;
uint32_t read_connect_number = cfg->read_connect_number;
struct protocol_stack *stack = get_protocol_stack();
@@ -469,7 +467,7 @@ int stack_polling(uint32_t wakeup_tick)
rpc_poll_msg(&stack->dfx_rpc_queue, 2);
force_quit = rpc_poll_msg(&stack->rpc_queue, rpc_number);
- gazelle_eth_dev_poll(stack, use_ltran_flag, nic_read_number);
+ eth_dev_poll();
sys_timer_run();
if (cfg->low_power_mod != 0) {
low_power_idling(stack);
@@ -525,10 +523,6 @@ static void* gazelle_stack_thread(void *arg)
}
sem_post(&g_stack_group.sem_stack_setup);
- if (!use_ltran() && queue_id == 0) {
- init_listen_and_user_ports();
- }
-
LSTACK_LOG(INFO, LSTACK, "stack_%02hu init success\n", queue_id);
if (get_global_cfg_params()->stack_mode_rtc) {
return NULL;
@@ -545,12 +539,6 @@ static void* gazelle_stack_thread(void *arg)
return NULL;
}
-static void gazelle_listen_thread(void *arg)
-{
- struct cfg_params *cfg_param = get_global_cfg_params();
- recv_pkts_from_other_process(cfg_param->process_idx, arg);
-}
-
int32_t stack_group_init_mempool(void)
{
struct cfg_params *global_cfg_parmas = get_global_cfg_params();
@@ -611,17 +599,6 @@ int32_t stack_group_init(void)
}
}
- /* run to completion mode does not currently support multiple process */
- if (!use_ltran() && !get_global_cfg_params()->stack_mode_rtc) {
- char name[PATH_MAX];
- sem_init(&stack_group->sem_listen_thread, 0, 0);
- sprintf_s(name, sizeof(name), "%s", "listen_thread");
- struct sys_thread *thread = sys_thread_new(name, gazelle_listen_thread,
- (void*)(&stack_group->sem_listen_thread), 0, 0);
- free(thread);
- sem_wait(&stack_group->sem_listen_thread);
- }
-
return 0;
}
diff --git a/src/lstack/include/lstack_ethdev.h b/src/lstack/include/lstack_ethdev.h
index 3252906..0c3d906 100644
--- a/src/lstack/include/lstack_ethdev.h
+++ b/src/lstack/include/lstack_ethdev.h
@@ -16,25 +16,6 @@
#include <rte_eal.h>
#include <rte_version.h>
-#define INVAILD_PROCESS_IDX 255
-
-enum port_type {
- PORT_LISTEN,
- PORT_CONNECT,
-};
-
-enum PACKET_TRANSFER_TYPE {
- TRANSFER_KERNEL = -1,
- TRANSFER_OTHER_THREAD,
- TRANSFER_CURRENT_THREAD,
-};
-
-enum TRANSFER_MESSAGE_RESULT {
- CONNECT_ERROR = -2,
- REPLY_ERROR = -1,
- TRANSFER_SUCESS = 0,
-};
-
struct protocol_stack;
struct rte_mbuf;
struct lstack_dev_ops {
@@ -44,21 +25,13 @@ struct lstack_dev_ops {
int32_t ethdev_init(struct protocol_stack *stack);
int32_t eth_dev_poll(void);
-int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_flag, uint32_t nic_read_number);
void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack);
-int recv_pkts_from_other_process(int process_index, void* arg);
-int32_t check_params_from_primary(void);
-
#if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0)
void kni_handle_rx(uint16_t port_id);
void kni_handle_tx(struct rte_mbuf *mbuf);
#endif
-void delete_user_process_port(uint16_t dst_port, enum port_type type);
-void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type);
-void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
-void config_flow_director(uint16_t queue_id, uint32_t src_ip, uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
void netif_poll(struct netif *netif);
#endif /* __GAZELLE_ETHDEV_H__ */
diff --git a/src/lstack/include/lstack_flow.h b/src/lstack/include/lstack_flow.h
new file mode 100644
index 0000000..ad35cdf
--- /dev/null
+++ b/src/lstack/include/lstack_flow.h
@@ -0,0 +1,51 @@
+/*
+* Copyright (c) Huawei Technologies Co., Ltd. 2020-2021. All rights reserved.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+
+#ifndef __LSTACK_FLOW_H__
+#define __LSTACK_FLOW_H__
+
+#include <rte_mbuf.h>
+
+enum port_type {
+ PORT_LISTEN,
+ PORT_CONNECT,
+};
+
+enum PACKET_TRANSFER_TYPE {
+ TRANSFER_KERNEL = -1,
+ TRANSFER_OTHER_THREAD,
+ TRANSFER_CURRENT_THREAD,
+};
+
+enum TRANSFER_MESSAGE_RESULT {
+ CONNECT_ERROR = -2,
+ REPLY_ERROR = -1,
+ TRANSFER_SUCESS = 0,
+};
+
+int distribute_pakages(struct rte_mbuf *mbuf);
+void flow_init(void);
+int32_t check_params_from_primary(void);
+
+int recv_pkts_from_other_process(int process_index, void* arg);
+void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
+void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip,
+ uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
+void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add);
+void transfer_arp_to_other_process(struct rte_mbuf *mbuf);
+
+void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type);
+void delete_user_process_port(uint16_t dst_port, enum port_type type);
+
+void gazelle_listen_thread(void *arg);
+
+#endif
diff --git a/src/lstack/include/lstack_vdev.h b/src/lstack/include/lstack_vdev.h
index 007eec7..4e5d191 100644
--- a/src/lstack/include/lstack_vdev.h
+++ b/src/lstack/include/lstack_vdev.h
@@ -13,19 +13,10 @@
#ifndef _GAZELLE_VDEV_H_
#define _GAZELLE_VDEV_H_
-#include <stdbool.h>
-
struct lstack_dev_ops;
struct gazelle_quintuple;
enum reg_ring_type;
void vdev_dev_ops_init(struct lstack_dev_ops *dev_ops);
int vdev_reg_xmit(enum reg_ring_type type, struct gazelle_quintuple *qtuple);
-int recv_pkts_from_other_process(int process_index, void* arg);
-void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
-void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip,
- uint32_t dst_ip, uint16_t src_port, uint16_t dst_port);
-void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add);
-void init_listen_and_user_ports();
-
#endif /* _GAZELLE_VDEV_H_ */
diff --git a/src/lstack/netif/dir.mk b/src/lstack/netif/dir.mk
index ec7c4ad..20fb5d6 100644
--- a/src/lstack/netif/dir.mk
+++ b/src/lstack/netif/dir.mk
@@ -8,5 +8,5 @@
# PURPOSE.
# See the Mulan PSL v2 for more details.
-SRC = lstack_ethdev.c lstack_vdev.c
+SRC = lstack_ethdev.c lstack_vdev.c lstack_flow.c
$(eval $(call register_dir, netif, $(SRC)))
diff --git a/src/lstack/netif/lstack_ethdev.c b/src/lstack/netif/lstack_ethdev.c
index 25c94eb..2e938b0 100644
--- a/src/lstack/netif/lstack_ethdev.c
+++ b/src/lstack/netif/lstack_ethdev.c
@@ -10,74 +10,36 @@
* See the Mulan PSL v2 for more details.
*/
-#include <sys/socket.h>
-#include <sys/un.h>
-
#include <rte_eal.h>
#include <rte_version.h>
#if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0)
#include <rte_kni.h>
#endif
#include <rte_ethdev.h>
-#include <rte_malloc.h>
-#include <rte_ether.h>
-#include <lwip/debug.h>
#include <lwip/etharp.h>
#include <lwip/ethip6.h>
#include <lwip/posix_api.h>
#include <netif/ethernet.h>
-#include <lwip/tcp.h>
-#include <lwip/prot/tcp.h>
#include <securec.h>
-#include <rte_jhash.h>
-#include <uthash.h>
+#include "dpdk_common.h"
#include "lstack_cfg.h"
#include "lstack_vdev.h"
#include "lstack_stack_stat.h"
#include "lstack_log.h"
#include "lstack_dpdk.h"
#include "lstack_lwip.h"
-#include "dpdk_common.h"
#include "lstack_protocol_stack.h"
#include "lstack_thread_rpc.h"
+#include "lstack_flow.h"
#include "lstack_ethdev.h"
/* FRAME_MTU + 14byte header */
#define MBUF_MAX_LEN 1514
-#define MAX_PATTERN_NUM 4
-#define MAX_ACTION_NUM 2
-#define FULL_MASK 0xffffffff /* full mask */
-#define EMPTY_MASK 0x0 /* empty mask */
-#define LSTACK_MBUF_LEN 64
-#define TRANSFER_TCP_MUBF_LEN (LSTACK_MBUF_LEN + 3)
-#define DELETE_FLOWS_PARAMS_NUM 3
-#define DELETE_FLOWS_PARAMS_LENGTH 30
-#define CREATE_FLOWS_PARAMS_NUM 6
-#define CREATE_FLOWS_PARAMS_LENGTH 60
-#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH 25
-#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM 3
-#define REPLY_LEN 10
-#define SUCCESS_REPLY "success"
-#define ERROR_REPLY "error"
#define PACKET_READ_SIZE 32
-#define GET_LSTACK_NUM 14
-#define GET_LSTACK_NUM_STRING "get_lstack_num"
-
-#define SERVER_PATH "/var/run/gazelle/server.socket"
-#define SPLIT_DELIM ","
-
-#define UNIX_TCP_PORT_MAX 65535
-
-#define IPV4_VERSION_OFFSET 4
-#define IPV4_VERSION 4
-
-static uint8_t g_user_ports[UNIX_TCP_PORT_MAX] = {INVAILD_PROCESS_IDX, };
-static uint8_t g_listen_ports[UNIX_TCP_PORT_MAX] = {INVAILD_PROCESS_IDX, };
-
void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
{
int32_t ret;
@@ -126,636 +88,6 @@ void eth_dev_recv(struct rte_mbuf *mbuf, struct protocol_stack *stack)
}
}
-int32_t eth_dev_poll(void)
-{
- uint32_t nr_pkts;
- struct cfg_params *cfg = get_global_cfg_params();
- struct protocol_stack *stack = get_protocol_stack();
-
- nr_pkts = stack->dev_ops.rx_poll(stack, stack->pkts, cfg->nic_read_number);
- if (nr_pkts == 0) {
- return 0;
- }
-
- if (!cfg->use_ltran && get_protocol_stack_group()->latency_start) {
- uint64_t time_stamp = get_current_time();
- time_stamp_into_mbuf(nr_pkts, stack->pkts, time_stamp);
- }
-
- for (uint32_t i = 0; i < nr_pkts; i++) {
- /* copy arp into other stack */
- if (!cfg->use_ltran) {
- struct rte_ether_hdr *ethh = rte_pktmbuf_mtod(stack->pkts[i], struct rte_ether_hdr *);
- if (unlikely(RTE_BE16(RTE_ETHER_TYPE_ARP) == ethh->ether_type)) {
- stack_broadcast_arp(stack->pkts[i], stack);
- }
- }
-
- eth_dev_recv(stack->pkts[i], stack);
- }
-
- stack->stats.rx += nr_pkts;
-
- return nr_pkts;
-}
-
-/* flow rule map */
-#define RULE_KEY_LEN 23
-struct flow_rule {
- char rule_key[RULE_KEY_LEN];
- struct rte_flow *flow;
- UT_hash_handle hh;
-};
-
-static uint16_t g_flow_num = 0;
-struct flow_rule *g_flow_rules = NULL;
-struct flow_rule *find_rule(char *rule_key)
-{
- struct flow_rule *fl;
- HASH_FIND_STR(g_flow_rules, rule_key, fl);
- return fl;
-}
-
-void add_rule(char* rule_key, struct rte_flow *flow)
-{
- struct flow_rule *rule;
- HASH_FIND_STR(g_flow_rules, rule_key, rule);
- if (rule == NULL) {
- rule = (struct flow_rule*)malloc(sizeof(struct flow_rule));
- strcpy_s(rule->rule_key, RULE_KEY_LEN, rule_key);
- HASH_ADD_STR(g_flow_rules, rule_key, rule);
- }
- rule->flow = flow;
-}
-
-void delete_rule(char* rule_key)
-{
- struct flow_rule *rule = NULL;
- HASH_FIND_STR(g_flow_rules, rule_key, rule);
- if (rule != NULL) {
- HASH_DEL(g_flow_rules, rule);
- free(rule);
- }
-}
-
-void init_listen_and_user_ports(void)
-{
- memset_s(g_user_ports, sizeof(g_user_ports), INVAILD_PROCESS_IDX, sizeof(g_user_ports));
- memset_s(g_listen_ports, sizeof(g_listen_ports), INVAILD_PROCESS_IDX, sizeof(g_listen_ports));
-}
-
-int transfer_pkt_to_other_process(char *buf, int process_index, int write_len, bool need_reply)
-{
- /* other process queue_id */
- struct sockaddr_un serun;
- int sockfd;
- int ret = 0;
-
- sockfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0);
- memset_s(&serun, sizeof(serun), 0, sizeof(serun));
- serun.sun_family = AF_UNIX;
- sprintf_s(serun.sun_path, PATH_MAX, "%s%d", SERVER_PATH, process_index);
- int32_t len = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path);
- if (posix_api->connect_fn(sockfd, (struct sockaddr *)&serun, len) < 0) {
- return CONNECT_ERROR;
- }
- posix_api->write_fn(sockfd, buf, write_len);
- if (need_reply) {
- char reply_message[REPLY_LEN];
- int32_t read_result = posix_api->read_fn(sockfd, reply_message, REPLY_LEN);
- if (read_result > 0) {
- if (strcmp(reply_message, SUCCESS_REPLY) == 0) {
- ret = TRANSFER_SUCESS;
- } else if (strcmp(reply_message, ERROR_REPLY) == 0) {
- ret = REPLY_ERROR;
- } else {
- ret = atoi(reply_message);
- }
- } else {
- ret = REPLY_ERROR;
- }
- }
- posix_api->close_fn(sockfd);
-
- return ret;
-}
-
-int32_t check_params_from_primary(void)
-{
- struct cfg_params *cfg = get_global_cfg_params();
- if (cfg->is_primary) {
- return 0;
- }
- // check lstack num
- char get_lstack_num[GET_LSTACK_NUM];
- sprintf_s(get_lstack_num, GET_LSTACK_NUM, "%s", GET_LSTACK_NUM_STRING);
- int32_t ret = transfer_pkt_to_other_process(get_lstack_num, 0, GET_LSTACK_NUM, true);
- if (ret != cfg->num_cpu) {
- return -1;
- }
- return 0;
-}
-
-struct rte_flow *create_flow_director(uint16_t port_id, uint16_t queue_id,
- uint32_t src_ip, uint32_t dst_ip,
- uint16_t src_port, uint16_t dst_port,
- struct rte_flow_error *error)
-{
- struct rte_flow_attr attr;
- struct rte_flow_item pattern[MAX_PATTERN_NUM];
- struct rte_flow_action action[MAX_ACTION_NUM];
- struct rte_flow *flow = NULL;
- struct rte_flow_action_queue queue = { .index = queue_id };
- struct rte_flow_item_ipv4 ip_spec;
- struct rte_flow_item_ipv4 ip_mask;
-
- struct rte_flow_item_tcp tcp_spec;
- struct rte_flow_item_tcp tcp_mask;
- int res;
-
- memset_s(pattern, sizeof(pattern), 0, sizeof(pattern));
- memset_s(action, sizeof(action), 0, sizeof(action));
-
- /*
- * set the rule attribute.
- * in this case only ingress packets will be checked.
- */
- memset_s(&attr, sizeof(struct rte_flow_attr), 0, sizeof(struct rte_flow_attr));
- attr.ingress = 1;
-
- /*
- * create the action sequence.
- * one action only, move packet to queue
- */
- action[0].type = RTE_FLOW_ACTION_TYPE_QUEUE;
- action[0].conf = &queue;
- action[1].type = RTE_FLOW_ACTION_TYPE_END;
-
- // not limit eth header
- pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH;
-
- // ip header
- memset_s(&ip_spec, sizeof(struct rte_flow_item_ipv4), 0, sizeof(struct rte_flow_item_ipv4));
- memset_s(&ip_mask, sizeof(struct rte_flow_item_ipv4), 0, sizeof(struct rte_flow_item_ipv4));
- ip_spec.hdr.dst_addr = dst_ip;
- ip_mask.hdr.dst_addr = FULL_MASK;
- ip_spec.hdr.src_addr = src_ip;
- ip_mask.hdr.src_addr = FULL_MASK;
- pattern[1].type = RTE_FLOW_ITEM_TYPE_IPV4;
- pattern[1].spec = &ip_spec;
- pattern[1].mask = &ip_mask;
-
- // tcp header, full mask 0xffff
- memset_s(&tcp_spec, sizeof(struct rte_flow_item_tcp), 0, sizeof(struct rte_flow_item_tcp));
- memset_s(&tcp_mask, sizeof(struct rte_flow_item_tcp), 0, sizeof(struct rte_flow_item_tcp));
- pattern[2].type = RTE_FLOW_ITEM_TYPE_TCP; // 2: pattern 2 is tcp header
- tcp_spec.hdr.src_port = src_port;
- tcp_spec.hdr.dst_port = dst_port;
- tcp_mask.hdr.src_port = rte_flow_item_tcp_mask.hdr.src_port;
- tcp_mask.hdr.dst_port = rte_flow_item_tcp_mask.hdr.dst_port;
- pattern[2].spec = &tcp_spec;
- pattern[2].mask = &tcp_mask;
-
- /* the final level must be always type end */
- pattern[3].type = RTE_FLOW_ITEM_TYPE_END;
- res = rte_flow_validate(port_id, &attr, pattern, action, error);
- if (!res) {
- flow = rte_flow_create(port_id, &attr, pattern, action, error);
- } else {
- LSTACK_LOG(ERR, LSTACK, "rte_flow_create.rte_flow_validate error, res %d \n", res);
- }
-
- return flow;
-}
-
-void config_flow_director(uint16_t queue_id, uint32_t src_ip,
- uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
-{
- uint16_t port_id = get_protocol_stack_group()->port_id;
- char rule_key[RULE_KEY_LEN] = {0};
- sprintf_s(rule_key, sizeof(rule_key), "%u_%u_%u", src_ip, src_port, dst_port);
- struct flow_rule *fl_exist = find_rule(rule_key);
- if (fl_exist != NULL) {
- return;
- }
-
- LSTACK_LOG(INFO, LSTACK,
- "config_flow_director, flow queue_id %u, src_ip %u,src_port_ntohs:%u, dst_port_ntohs:%u\n",
- queue_id, src_ip, ntohs(src_port), ntohs(dst_port));
-
- struct rte_flow_error error;
- struct rte_flow *flow = create_flow_director(port_id, queue_id, src_ip, dst_ip, src_port, dst_port, &error);
- if (!flow) {
- LSTACK_LOG(ERR, LSTACK,"flow can not be created. queue_id %u, src_ip %u, src_port %u,"
- "dst_port %u, dst_port_ntohs :%u, type %d. message: %s\n",
- queue_id, src_ip, src_port, dst_port, ntohs(dst_port),
- error.type, error.message ? error.message : "(no stated reason)");
- return;
- }
- __sync_fetch_and_add(&g_flow_num, 1);
- add_rule(rule_key, flow);
-}
-
-void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
-{
- uint16_t port_id = get_protocol_stack_group()->port_id;
- char rule_key[RULE_KEY_LEN] = {0};
- sprintf_s(rule_key, RULE_KEY_LEN, "%u_%u_%u",dst_ip, dst_port, src_port);
- struct flow_rule *fl = find_rule(rule_key);
-
- if(fl != NULL){
- struct rte_flow_error error;
- int ret = rte_flow_destroy(port_id, fl->flow, &error);
- if(ret != 0){
- LSTACK_LOG(ERR, LSTACK, "Flow can't be delete %d message: %s\n",
- error.type, error.message ? error.message : "(no stated reason)");
- }
- delete_rule(rule_key);
- __sync_fetch_and_sub(&g_flow_num, 1);
- }
-}
-
-/* if process 0, delete directly, else transfer 'dst_ip,src_port,dst_port' to process 0. */
-void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
-{
- if (get_global_cfg_params()->is_primary) {
- delete_flow_director(dst_ip, src_port, dst_port);
- } else {
- char process_server_path[DELETE_FLOWS_PARAMS_LENGTH];
- sprintf_s(process_server_path, DELETE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u",
- dst_ip, SPLIT_DELIM, src_port, SPLIT_DELIM, dst_port);
- int ret = transfer_pkt_to_other_process(process_server_path, 0, DELETE_FLOWS_PARAMS_LENGTH, false);
- if(ret != TRANSFER_SUCESS){
- LSTACK_LOG(ERR, LSTACK, "error. tid %d. dst_ip %u, src_port: %u, dst_port %u\n",
- rte_gettid(), dst_ip, src_port, dst_port);
- }
- }
-}
-
-// if process 0, add directly, else transfer 'src_ip,dst_ip,src_port,dst_port,queue_id' to process 0.
-void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip,
- uint32_t dst_ip, uint16_t src_port,
- uint16_t dst_port)
-{
- char process_server_path[CREATE_FLOWS_PARAMS_LENGTH];
- /* exchage src_ip and dst_ip, src_port and dst_port */
- uint8_t process_idx = get_global_cfg_params()->process_idx;
- sprintf_s(process_server_path, CREATE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u%s%u%s%u%s%u",
- dst_ip, SPLIT_DELIM, src_ip, SPLIT_DELIM,
- dst_port, SPLIT_DELIM, src_port, SPLIT_DELIM,
- queue_id, SPLIT_DELIM, process_idx);
- int ret = transfer_pkt_to_other_process(process_server_path, 0, CREATE_FLOWS_PARAMS_LENGTH, true);
- if (ret != TRANSFER_SUCESS) {
- LSTACK_LOG(ERR, LSTACK, "error. tid %d. src_ip %u, dst_ip %u, src_port: %u, dst_port %u,"
- "queue_id %u, process_idx %u\n",
- rte_gettid(), src_ip, dst_ip, src_port, dst_port, queue_id, process_idx);
- }
-}
-
-void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add)
-{
- char process_server_path[ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH];
- sprintf_s(process_server_path, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH,
- "%u%s%u%s%u", listen_port, SPLIT_DELIM, process_idx, SPLIT_DELIM, is_add);
- int ret = transfer_pkt_to_other_process(process_server_path, 0, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, true);
- if(ret != TRANSFER_SUCESS) {
- LSTACK_LOG(ERR, LSTACK, "error. tid %d. listen_port %u, process_idx %u\n",
- rte_gettid(), listen_port, process_idx);
- }
-}
-
-static int str_to_array(char *args, uint32_t *array, int size)
-{
- int val;
- uint16_t cnt = 0;
- char *elem = NULL;
- char *next_token = NULL;
-
- memset_s(array, sizeof(*array) * size, 0, sizeof(*array) * size);
- elem = strtok_s((char *)args, SPLIT_DELIM, &next_token);
- while (elem != NULL) {
- if (cnt >= size) {
- return -1;
- }
- val = atoi(elem);
- if (val < 0) {
- return -1;
- }
- array[cnt] = (uint32_t)val;
- cnt++;
-
- elem = strtok_s(NULL, SPLIT_DELIM, &next_token);
- }
-
- return cnt;
-}
-
-void parse_and_delete_rule(char* buf)
-{
- uint32_t array[DELETE_FLOWS_PARAMS_NUM];
- str_to_array(buf, array, DELETE_FLOWS_PARAMS_NUM);
- uint32_t dst_ip = array[0];
- uint16_t src_port = array[1];
- uint16_t dst_port = array[2];
- delete_flow_director(dst_ip, src_port, dst_port);
-}
-
-void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type)
-{
- if (type == PORT_LISTEN) {
- g_listen_ports[dst_port] = process_idx;
- } else {
- g_user_ports[dst_port] = process_idx;
- }
-}
-
-void delete_user_process_port(uint16_t dst_port, enum port_type type)
-{
- if (type == PORT_LISTEN) {
- g_listen_ports[dst_port] = INVAILD_PROCESS_IDX;
- } else {
- g_user_ports[dst_port] = INVAILD_PROCESS_IDX;
- }
-}
-
-void parse_and_create_rule(char* buf)
-{
- uint32_t array[CREATE_FLOWS_PARAMS_NUM];
- str_to_array(buf, array, CREATE_FLOWS_PARAMS_NUM);
- uint32_t src_ip = array[0];
- uint32_t dst_ip = array[1];
- uint16_t src_port = array[2];
- uint16_t dst_port = array[3];
- uint16_t queue_id = array[4];
- uint8_t process_idx = array[5];
- config_flow_director(queue_id, src_ip, dst_ip, src_port, dst_port);
- add_user_process_port(dst_port, process_idx, PORT_CONNECT);
-}
-
-void parse_and_add_or_delete_listen_port(char* buf)
-{
- uint32_t array[ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM];
- str_to_array(buf, array, ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM);
- uint16_t listen_port = array[0];
- uint8_t process_idx = array[1];
- uint8_t is_add = array[2];
- if (is_add == 1) {
- add_user_process_port(listen_port, process_idx, PORT_LISTEN);
- } else {
- delete_user_process_port(listen_port, PORT_LISTEN);
- }
-
-}
-
-void transfer_arp_to_other_process(struct rte_mbuf *mbuf)
-{
- struct cfg_params *cfgs = get_global_cfg_params();
-
- for(int i = 1; i < cfgs->num_process; i++){
- char arp_mbuf[LSTACK_MBUF_LEN] = {0};
- sprintf_s(arp_mbuf, sizeof(arp_mbuf), "%lu", mbuf);
- int result = transfer_pkt_to_other_process(arp_mbuf, i, LSTACK_MBUF_LEN, false);
- if (result == CONNECT_ERROR) {
- LSTACK_LOG(INFO, LSTACK,"connect process %d failed, ensure the process is started.\n", i);
- } else if (result == REPLY_ERROR) {
- LSTACK_LOG(ERR, LSTACK,"transfer arp pakages to process %d error. %m\n", i);
- }
- }
-}
-
-void transfer_tcp_to_thread(struct rte_mbuf *mbuf, uint16_t stk_idx)
-{
- /* current process queue_id */
- struct protocol_stack *stack = get_protocol_stack_group()->stacks[stk_idx];
- int ret = -1;
- while(ret != 0) {
- ret = rpc_call_arp(&stack->rpc_queue, mbuf);
- printf("transfer_tcp_to_thread, ret : %d \n", ret);
- }
-}
-
-void parse_arp_and_transefer(char* buf)
-{
- struct rte_mbuf *mbuf = (struct rte_mbuf *)atoll(buf);
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
- struct rte_mbuf *mbuf_copy = NULL;
- struct protocol_stack *stack = NULL;
- int32_t ret;
- for (int32_t i = 0; i < stack_group->stack_num; i++) {
- stack = stack_group->stacks[i];
- ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
- while (ret != 0) {
- ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
- stack->stats.rx_allocmbuf_fail++;
- }
- copy_mbuf(mbuf_copy, mbuf);
-
- ret = rpc_call_arp(&stack->rpc_queue, mbuf_copy);
-
- while (ret != 0) {
- rpc_call_arp(&stack->rpc_queue, mbuf_copy);
- }
- }
-}
-
-void parse_tcp_and_transefer(char* buf)
-{
- char *next_token = NULL;
- char *elem = strtok_s(buf, SPLIT_DELIM, &next_token);
- struct rte_mbuf *mbuf = (struct rte_mbuf *) atoll(elem);
- elem = strtok_s(NULL, SPLIT_DELIM, &next_token);
- uint16_t queue_id = atoll(elem);
-
- struct protocol_stack_group *stack_group = get_protocol_stack_group();
- uint16_t num_queue = get_global_cfg_params()->num_queue;
- uint16_t stk_index = queue_id % num_queue;
- struct rte_mbuf *mbuf_copy = NULL;
- struct protocol_stack *stack = stack_group->stacks[stk_index];
-
- int32_t ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
- while (ret != 0) {
- ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, true);
- stack->stats.rx_allocmbuf_fail++;
- }
-
- copy_mbuf(mbuf_copy,mbuf);
-
- transfer_tcp_to_thread(mbuf_copy, stk_index);
-}
-
-int recv_pkts_from_other_process(int process_index, void* arg)
-{
- struct sockaddr_un serun, cliun;
- socklen_t cliun_len;
- int listenfd, connfd, size;
- char buf[132];
- /* socket */
- if ((listenfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0)) < 0) {
- perror("socket error");
- return -1;
- }
- /* bind */
- memset_s(&serun, sizeof(serun), 0, sizeof(serun));
- serun.sun_family = AF_UNIX;
- char process_server_path[PATH_MAX];
- sprintf_s(process_server_path, sizeof(process_server_path), "%s%d", SERVER_PATH, process_index);
- strcpy_s(serun.sun_path, sizeof(serun.sun_path), process_server_path);
- size = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path);
- unlink(process_server_path);
- if (posix_api->bind_fn(listenfd, (struct sockaddr *)&serun, size) < 0) {
- perror("bind error");
- return -1;
- }
- if (posix_api->listen_fn(listenfd, 20) < 0) { /* 20: max backlog */
- perror("listen error");
- return -1;
- }
- sem_post((sem_t *)arg);
- /* block */
- while(1) {
- cliun_len = sizeof(cliun);
- if ((connfd = posix_api->accept_fn(listenfd, (struct sockaddr *)&cliun, &cliun_len)) < 0) {
- perror("accept error");
- continue;
- }
- while(1) {
- int n = posix_api->read_fn(connfd, buf, sizeof(buf));
- if (n < 0) {
- perror("read error");
- break;
- } else if (n == 0) {
- break;
- }
-
- if(n == LSTACK_MBUF_LEN) {
- /* arp */
- parse_arp_and_transefer(buf);
- } else if (n == TRANSFER_TCP_MUBF_LEN) {
- /* tcp. lstack_mbuf_queue_id */
- parse_tcp_and_transefer(buf);
- } else if (n == DELETE_FLOWS_PARAMS_LENGTH) {
- /* delete rule */
- parse_and_delete_rule(buf);
- } else if(n == CREATE_FLOWS_PARAMS_LENGTH) {
- /* add rule */
- parse_and_create_rule(buf);
- char reply_buf[REPLY_LEN];
- sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY);
- posix_api->write_fn(connfd, reply_buf, REPLY_LEN);
- } else if (n == GET_LSTACK_NUM) {
- char reply_buf[REPLY_LEN];
- sprintf_s(reply_buf, sizeof(reply_buf), "%d", get_global_cfg_params()->num_cpu);
- posix_api->write_fn(connfd, reply_buf, REPLY_LEN);
- } else {
- /* add port */
- parse_and_add_or_delete_listen_port(buf);
- char reply_buf[REPLY_LEN];
- sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY);
- posix_api->write_fn(connfd, reply_buf, REPLY_LEN);
- }
-
- }
- posix_api->close_fn(connfd);
- }
- posix_api->close_fn(listenfd);
- return 0;
-}
-
-void concat_mbuf_and_queue_id(struct rte_mbuf *mbuf, uint16_t queue_id,
- char* mbuf_and_queue_id, int write_len)
-{
- sprintf_s(mbuf_and_queue_id, write_len, "%lu%s%u", mbuf, SPLIT_DELIM, queue_id);
-}
-
-static int mbuf_to_idx(struct rte_mbuf *mbuf, uint16_t *dst_port)
-{
- struct rte_ether_hdr *ethh = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *);
- u16_t type = rte_be_to_cpu_16(ethh->ether_type);
- uint32_t index = 0;
- if (type == RTE_ETHER_TYPE_IPV4) {
- struct rte_ipv4_hdr *iph = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv4_hdr *, sizeof(struct rte_ether_hdr));
- uint8_t ip_version = (iph->version_ihl & 0xf0) >> IPV4_VERSION_OFFSET;
- if (likely(ip_version == IPV4_VERSION)) {
- if (likely(iph->next_proto_id == IPPROTO_TCP)) {
- struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_tcp_hdr *,
- sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr));
- *dst_port = tcp_hdr->dst_port;
-
- if (unlikely(tcp_hdr->tcp_flags == TCP_SYN)) {
- uint32_t src_ip = iph->src_addr;
- uint16_t src_port = tcp_hdr->src_port;
- index = rte_jhash_3words(src_ip, src_port | ((*dst_port) << 16), 0, 0);
- } else {
- return -1;
- }
- }
- }
- } else if (type == RTE_ETHER_TYPE_IPV6) {
- struct rte_ipv6_hdr *iph = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv6_hdr *, sizeof(struct rte_ether_hdr));
- if (likely(iph->proto == IPPROTO_TCP)) {
- struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_tcp_hdr *,
- sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv6_hdr));
- *dst_port = tcp_hdr->dst_port;
-
- if (unlikely(tcp_hdr->tcp_flags == TCP_SYN)) {
- uint32_t *src_ip = (uint32_t *) &iph->src_addr;
- uint16_t src_port = tcp_hdr->src_port;
- uint32_t v = rte_jhash_3words(src_ip[0], src_ip[1], src_ip[2], 0);
- index = rte_jhash_3words(src_ip[3], src_port | ((*dst_port) << 16), v, 0);
- } else {
- return -1;
- }
- }
- } else {
- return -1;
- }
- return index;
-}
-
-int distribute_pakages(struct rte_mbuf *mbuf)
-{
- uint16_t dst_port = 0;
- uint32_t index = mbuf_to_idx(mbuf, &dst_port);
- if (index == -1) {
- return TRANSFER_CURRENT_THREAD;
- }
-
- uint16_t queue_id = 0;
- uint32_t user_process_idx = 0;
- int each_process_queue_num = get_global_cfg_params()->num_queue;
- index = index % each_process_queue_num;
- if (g_listen_ports[dst_port] != INVAILD_PROCESS_IDX) {
- user_process_idx = g_listen_ports[dst_port];
- } else {
- user_process_idx = g_user_ports[dst_port];
- }
-
- if (user_process_idx == INVAILD_PROCESS_IDX) {
- return TRANSFER_KERNEL;
- }
-
- if (get_global_cfg_params()->seperate_send_recv) {
- queue_id = user_process_idx * each_process_queue_num + (index / 2) * 2;
- } else {
- queue_id = user_process_idx * each_process_queue_num + index;
- }
- if (queue_id != 0) {
- if (user_process_idx == 0) {
- transfer_tcp_to_thread(mbuf, queue_id);
- } else {
- char mbuf_and_queue_id[TRANSFER_TCP_MUBF_LEN];
- concat_mbuf_and_queue_id(mbuf, queue_id, mbuf_and_queue_id, TRANSFER_TCP_MUBF_LEN);
- transfer_pkt_to_other_process(mbuf_and_queue_id, user_process_idx,
- TRANSFER_TCP_MUBF_LEN, false);
- }
- return TRANSFER_OTHER_THREAD;
- } else {
- return TRANSFER_CURRENT_THREAD;
- }
-
- return TRANSFER_KERNEL;
-}
-
#if RTE_VERSION < RTE_VERSION_NUM(23, 11, 0, 0)
void kni_handle_rx(uint16_t port_id)
{
@@ -797,17 +129,18 @@ void kni_handle_tx(struct rte_mbuf *mbuf)
}
#endif
-/* optimized eth_dev_poll() in lstack */
-int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_flag, uint32_t nic_read_number)
+int32_t eth_dev_poll(void)
{
uint32_t nr_pkts;
+ struct cfg_params *cfg = get_global_cfg_params();
+ struct protocol_stack *stack = get_protocol_stack();
- nr_pkts = stack->dev_ops.rx_poll(stack, stack->pkts, nic_read_number);
+ nr_pkts = stack->dev_ops.rx_poll(stack, stack->pkts, cfg->nic_read_number);
if (nr_pkts == 0) {
return 0;
}
- if (!use_ltran_flag && get_protocol_stack_group()->latency_start) {
+ if (!use_ltran() && get_protocol_stack_group()->latency_start) {
uint64_t time_stamp = get_current_time();
time_stamp_into_mbuf(nr_pkts, stack->pkts, time_stamp);
}
@@ -816,7 +149,7 @@ int32_t gazelle_eth_dev_poll(struct protocol_stack *stack, uint8_t use_ltran_fla
/* 1 current thread recv; 0 other thread recv; -1 kni recv; */
int transfer_type = TRANSFER_CURRENT_THREAD;
/* copy arp into other stack */
- if (!use_ltran_flag) {
+ if (!use_ltran()) {
struct rte_ether_hdr *ethh = rte_pktmbuf_mtod(stack->pkts[i], struct rte_ether_hdr *);
u16_t type;
type = ethh->ether_type;
@@ -946,6 +279,10 @@ int32_t ethdev_init(struct protocol_stack *stack)
LSTACK_LOG(ERR, LSTACK, "fill mbuf to rx_ring failed ret=%d\n", ret);
return ret;
}
+ } else {
+ if (cfg->tuple_filter && stack->queue_id == 0) {
+ flow_init();
+ }
}
netif_set_default(&stack->netif);
diff --git a/src/lstack/netif/lstack_flow.c b/src/lstack/netif/lstack_flow.c
new file mode 100644
index 0000000..4e04209
--- /dev/null
+++ b/src/lstack/netif/lstack_flow.c
@@ -0,0 +1,680 @@
+/*
+* Copyright (c) Huawei Technologies Co., Ltd. 2020-2021. All rights reserved.
+* gazelle is licensed under the Mulan PSL v2.
+* You can use this software according to the terms and conditions of the Mulan PSL v2.
+* You may obtain a copy of Mulan PSL v2 at:
+* http://license.coscl.org.cn/MulanPSL2
+* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR
+* IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR
+* PURPOSE.
+* See the Mulan PSL v2 for more details.
+*/
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <securec.h>
+
+#include <rte_mbuf.h>
+#include <rte_flow.h>
+#include <rte_jhash.h>
+#include <uthash.h>
+
+#include <lwip/posix_api.h>
+#include <lwip/tcp.h>
+#include <lwip/prot/tcp.h>
+
+#include "dpdk_common.h"
+#include "lstack_log.h"
+#include "lstack_dpdk.h"
+#include "lstack_cfg.h"
+#include "lstack_protocol_stack.h"
+#include "lstack_flow.h"
+
+#define MAX_PATTERN_NUM 4
+#define MAX_ACTION_NUM 2
+#define FULL_MASK 0xffffffff /* full mask */
+#define EMPTY_MASK 0x0 /* empty mask */
+#define LSTACK_MBUF_LEN 64
+#define TRANSFER_TCP_MUBF_LEN (LSTACK_MBUF_LEN + 3)
+#define DELETE_FLOWS_PARAMS_NUM 3
+#define DELETE_FLOWS_PARAMS_LENGTH 30
+#define CREATE_FLOWS_PARAMS_NUM 6
+#define CREATE_FLOWS_PARAMS_LENGTH 60
+#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH 25
+#define ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM 3
+#define REPLY_LEN 10
+#define SUCCESS_REPLY "success"
+#define ERROR_REPLY "error"
+
+#define GET_LSTACK_NUM 14
+#define GET_LSTACK_NUM_STRING "get_lstack_num"
+
+#define SERVER_PATH "/var/run/gazelle/server.socket"
+#define SPLIT_DELIM ","
+
+#define UNIX_TCP_PORT_MAX 65535
+
+#define INVAILD_PROCESS_IDX 255
+
+#define IPV4_VERSION_OFFSET 4
+#define IPV4_VERSION 4
+
+static uint8_t g_user_ports[UNIX_TCP_PORT_MAX] = {INVAILD_PROCESS_IDX, };
+static uint8_t g_listen_ports[UNIX_TCP_PORT_MAX] = {INVAILD_PROCESS_IDX, };
+
+/* flow rule map */
+#define RULE_KEY_LEN 23
+struct flow_rule {
+ char rule_key[RULE_KEY_LEN];
+ struct rte_flow *flow;
+ UT_hash_handle hh;
+};
+
+static uint16_t g_flow_num = 0;
+static struct flow_rule *g_flow_rules = NULL;
+static struct flow_rule *find_rule(char *rule_key)
+{
+ struct flow_rule *fl;
+ HASH_FIND_STR(g_flow_rules, rule_key, fl);
+ return fl;
+}
+
+static void add_rule(char* rule_key, struct rte_flow *flow)
+{
+ struct flow_rule *rule;
+ HASH_FIND_STR(g_flow_rules, rule_key, rule);
+ if (rule == NULL) {
+ rule = (struct flow_rule*)malloc(sizeof(struct flow_rule));
+ strcpy_s(rule->rule_key, RULE_KEY_LEN, rule_key);
+ HASH_ADD_STR(g_flow_rules, rule_key, rule);
+ }
+ rule->flow = flow;
+}
+
+static void delete_rule(char* rule_key)
+{
+ struct flow_rule *rule = NULL;
+ HASH_FIND_STR(g_flow_rules, rule_key, rule);
+ if (rule != NULL) {
+ HASH_DEL(g_flow_rules, rule);
+ free(rule);
+ }
+}
+
+static void init_listen_and_user_ports(void)
+{
+ memset_s(g_user_ports, sizeof(g_user_ports), INVAILD_PROCESS_IDX, sizeof(g_user_ports));
+ memset_s(g_listen_ports, sizeof(g_listen_ports), INVAILD_PROCESS_IDX, sizeof(g_listen_ports));
+}
+
+static int transfer_pkt_to_other_process(char *buf, int process_index, int write_len, bool need_reply)
+{
+ /* other process queue_id */
+ struct sockaddr_un serun;
+ int sockfd;
+ int ret = 0;
+
+ sockfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0);
+ memset_s(&serun, sizeof(serun), 0, sizeof(serun));
+ serun.sun_family = AF_UNIX;
+ sprintf_s(serun.sun_path, PATH_MAX, "%s%d", SERVER_PATH, process_index);
+ int32_t len = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path);
+ if (posix_api->connect_fn(sockfd, (struct sockaddr *)&serun, len) < 0) {
+ return CONNECT_ERROR;
+ }
+ posix_api->write_fn(sockfd, buf, write_len);
+ if (need_reply) {
+ char reply_message[REPLY_LEN];
+ int32_t read_result = posix_api->read_fn(sockfd, reply_message, REPLY_LEN);
+ if (read_result > 0) {
+ if (strcmp(reply_message, SUCCESS_REPLY) == 0) {
+ ret = TRANSFER_SUCESS;
+ } else if (strcmp(reply_message, ERROR_REPLY) == 0) {
+ ret = REPLY_ERROR;
+ } else {
+ ret = atoi(reply_message);
+ }
+ } else {
+ ret = REPLY_ERROR;
+ }
+ }
+ posix_api->close_fn(sockfd);
+
+ return ret;
+}
+
+int32_t check_params_from_primary(void)
+{
+ struct cfg_params *cfg = get_global_cfg_params();
+ if (cfg->is_primary) {
+ return 0;
+ }
+ // check lstack num
+ char get_lstack_num[GET_LSTACK_NUM];
+ sprintf_s(get_lstack_num, GET_LSTACK_NUM, "%s", GET_LSTACK_NUM_STRING);
+ int32_t ret = transfer_pkt_to_other_process(get_lstack_num, 0, GET_LSTACK_NUM, true);
+ if (ret != cfg->num_cpu) {
+ return -1;
+ }
+ return 0;
+}
+
+static struct rte_flow *create_flow_director(uint16_t port_id, uint16_t queue_id,
+ uint32_t src_ip, uint32_t dst_ip,
+ uint16_t src_port, uint16_t dst_port,
+ struct rte_flow_error *error)
+{
+ struct rte_flow_attr attr;
+ struct rte_flow_item pattern[MAX_PATTERN_NUM];
+ struct rte_flow_action action[MAX_ACTION_NUM];
+ struct rte_flow *flow = NULL;
+ struct rte_flow_action_queue queue = { .index = queue_id };
+ struct rte_flow_item_ipv4 ip_spec;
+ struct rte_flow_item_ipv4 ip_mask;
+
+ struct rte_flow_item_tcp tcp_spec;
+ struct rte_flow_item_tcp tcp_mask;
+ int res;
+
+ memset_s(pattern, sizeof(pattern), 0, sizeof(pattern));
+ memset_s(action, sizeof(action), 0, sizeof(action));
+
+ /*
+ * set the rule attribute.
+ * in this case only ingress packets will be checked.
+ */
+ memset_s(&attr, sizeof(struct rte_flow_attr), 0, sizeof(struct rte_flow_attr));
+ attr.ingress = 1;
+
+ /*
+ * create the action sequence.
+ * one action only, move packet to queue
+ */
+ action[0].type = RTE_FLOW_ACTION_TYPE_QUEUE;
+ action[0].conf = &queue;
+ action[1].type = RTE_FLOW_ACTION_TYPE_END;
+
+ // not limit eth header
+ pattern[0].type = RTE_FLOW_ITEM_TYPE_ETH;
+
+ // ip header
+ memset_s(&ip_spec, sizeof(struct rte_flow_item_ipv4), 0, sizeof(struct rte_flow_item_ipv4));
+ memset_s(&ip_mask, sizeof(struct rte_flow_item_ipv4), 0, sizeof(struct rte_flow_item_ipv4));
+ ip_spec.hdr.dst_addr = dst_ip;
+ ip_mask.hdr.dst_addr = FULL_MASK;
+ ip_spec.hdr.src_addr = src_ip;
+ ip_mask.hdr.src_addr = FULL_MASK;
+ pattern[1].type = RTE_FLOW_ITEM_TYPE_IPV4;
+ pattern[1].spec = &ip_spec;
+ pattern[1].mask = &ip_mask;
+
+ // tcp header, full mask 0xffff
+ memset_s(&tcp_spec, sizeof(struct rte_flow_item_tcp), 0, sizeof(struct rte_flow_item_tcp));
+ memset_s(&tcp_mask, sizeof(struct rte_flow_item_tcp), 0, sizeof(struct rte_flow_item_tcp));
+ pattern[2].type = RTE_FLOW_ITEM_TYPE_TCP; // 2: pattern 2 is tcp header
+ tcp_spec.hdr.src_port = src_port;
+ tcp_spec.hdr.dst_port = dst_port;
+ tcp_mask.hdr.src_port = rte_flow_item_tcp_mask.hdr.src_port;
+ tcp_mask.hdr.dst_port = rte_flow_item_tcp_mask.hdr.dst_port;
+ pattern[2].spec = &tcp_spec;
+ pattern[2].mask = &tcp_mask;
+
+ /* the final level must be always type end */
+ pattern[3].type = RTE_FLOW_ITEM_TYPE_END;
+ res = rte_flow_validate(port_id, &attr, pattern, action, error);
+ if (!res) {
+ flow = rte_flow_create(port_id, &attr, pattern, action, error);
+ } else {
+ LSTACK_LOG(ERR, LSTACK, "rte_flow_create.rte_flow_validate error, res %d \n", res);
+ }
+
+ return flow;
+}
+
+static void config_flow_director(uint16_t queue_id, uint32_t src_ip,
+ uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
+{
+ uint16_t port_id = get_protocol_stack_group()->port_id;
+ char rule_key[RULE_KEY_LEN] = {0};
+ sprintf_s(rule_key, sizeof(rule_key), "%u_%u_%u", src_ip, src_port, dst_port);
+ struct flow_rule *fl_exist = find_rule(rule_key);
+ if (fl_exist != NULL) {
+ return;
+ }
+
+ LSTACK_LOG(INFO, LSTACK,
+ "config_flow_director, flow queue_id %u, src_ip %u,src_port_ntohs:%u, dst_port_ntohs:%u\n",
+ queue_id, src_ip, ntohs(src_port), ntohs(dst_port));
+
+ struct rte_flow_error error;
+ struct rte_flow *flow = create_flow_director(port_id, queue_id, src_ip, dst_ip, src_port, dst_port, &error);
+ if (!flow) {
+ LSTACK_LOG(ERR, LSTACK,"flow can not be created. queue_id %u, src_ip %u, src_port %u,"
+ "dst_port %u, dst_port_ntohs :%u, type %d. message: %s\n",
+ queue_id, src_ip, src_port, dst_port, ntohs(dst_port),
+ error.type, error.message ? error.message : "(no stated reason)");
+ return;
+ }
+ __sync_fetch_and_add(&g_flow_num, 1);
+ add_rule(rule_key, flow);
+}
+
+static void delete_flow_director(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
+{
+ uint16_t port_id = get_protocol_stack_group()->port_id;
+ char rule_key[RULE_KEY_LEN] = {0};
+ sprintf_s(rule_key, RULE_KEY_LEN, "%u_%u_%u",dst_ip, dst_port, src_port);
+ struct flow_rule *fl = find_rule(rule_key);
+
+ if(fl != NULL) {
+ struct rte_flow_error error;
+ int ret = rte_flow_destroy(port_id, fl->flow, &error);
+ if (ret != 0) {
+ LSTACK_LOG(ERR, LSTACK, "Flow can't be delete %d message: %s\n",
+ error.type, error.message ? error.message : "(no stated reason)");
+ }
+ delete_rule(rule_key);
+ __sync_fetch_and_sub(&g_flow_num, 1);
+ }
+}
+
+/* if process 0, delete directly, else transfer 'dst_ip,src_port,dst_port' to process 0. */
+void transfer_delete_rule_info_to_process0(uint32_t dst_ip, uint16_t src_port, uint16_t dst_port)
+{
+ if (get_global_cfg_params()->is_primary) {
+ delete_flow_director(dst_ip, src_port, dst_port);
+ } else {
+ char process_server_path[DELETE_FLOWS_PARAMS_LENGTH];
+ sprintf_s(process_server_path, DELETE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u",
+ dst_ip, SPLIT_DELIM, src_port, SPLIT_DELIM, dst_port);
+ int ret = transfer_pkt_to_other_process(process_server_path, 0, DELETE_FLOWS_PARAMS_LENGTH, false);
+ if (ret != TRANSFER_SUCESS) {
+ LSTACK_LOG(ERR, LSTACK, "error. tid %d. dst_ip %u, src_port: %u, dst_port %u\n",
+ rte_gettid(), dst_ip, src_port, dst_port);
+ }
+ }
+}
+
+// if process 0, add directly, else transfer 'src_ip,dst_ip,src_port,dst_port,queue_id' to process 0.
+void transfer_create_rule_info_to_process0(uint16_t queue_id, uint32_t src_ip,
+ uint32_t dst_ip, uint16_t src_port,
+ uint16_t dst_port)
+{
+ char process_server_path[CREATE_FLOWS_PARAMS_LENGTH];
+ /* exchage src_ip and dst_ip, src_port and dst_port */
+ uint8_t process_idx = get_global_cfg_params()->process_idx;
+ sprintf_s(process_server_path, CREATE_FLOWS_PARAMS_LENGTH, "%u%s%u%s%u%s%u%s%u%s%u",
+ dst_ip, SPLIT_DELIM, src_ip, SPLIT_DELIM,
+ dst_port, SPLIT_DELIM, src_port, SPLIT_DELIM,
+ queue_id, SPLIT_DELIM, process_idx);
+ int ret = transfer_pkt_to_other_process(process_server_path, 0, CREATE_FLOWS_PARAMS_LENGTH, true);
+ if (ret != TRANSFER_SUCESS) {
+ LSTACK_LOG(ERR, LSTACK, "error. tid %d. src_ip %u, dst_ip %u, src_port: %u, dst_port %u,"
+ "queue_id %u, process_idx %u\n",
+ rte_gettid(), src_ip, dst_ip, src_port, dst_port, queue_id, process_idx);
+ }
+}
+
+void transfer_add_or_delete_listen_port_to_process0(uint16_t listen_port, uint8_t process_idx, uint8_t is_add)
+{
+ char process_server_path[ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH];
+ sprintf_s(process_server_path, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH,
+ "%u%s%u%s%u", listen_port, SPLIT_DELIM, process_idx, SPLIT_DELIM, is_add);
+ int ret = transfer_pkt_to_other_process(process_server_path, 0, ADD_OR_DELETE_LISTEN_PORT_PARAMS_LENGTH, true);
+ if (ret != TRANSFER_SUCESS) {
+ LSTACK_LOG(ERR, LSTACK, "error. tid %d. listen_port %u, process_idx %u\n",
+ rte_gettid(), listen_port, process_idx);
+ }
+}
+
+static int str_to_array(char *args, uint32_t *array, int size)
+{
+ int val;
+ uint16_t cnt = 0;
+ char *elem = NULL;
+ char *next_token = NULL;
+
+ memset_s(array, sizeof(*array) * size, 0, sizeof(*array) * size);
+ elem = strtok_s((char *)args, SPLIT_DELIM, &next_token);
+ while (elem != NULL) {
+ if (cnt >= size) {
+ return -1;
+ }
+ val = atoi(elem);
+ if (val < 0) {
+ return -1;
+ }
+ array[cnt] = (uint32_t)val;
+ cnt++;
+
+ elem = strtok_s(NULL, SPLIT_DELIM, &next_token);
+ }
+
+ return cnt;
+}
+
+static void parse_and_delete_rule(char* buf)
+{
+ uint32_t array[DELETE_FLOWS_PARAMS_NUM];
+ str_to_array(buf, array, DELETE_FLOWS_PARAMS_NUM);
+ uint32_t dst_ip = array[0];
+ uint16_t src_port = array[1];
+ uint16_t dst_port = array[2];
+ delete_flow_director(dst_ip, src_port, dst_port);
+}
+
+void add_user_process_port(uint16_t dst_port, uint8_t process_idx, enum port_type type)
+{
+ if (type == PORT_LISTEN) {
+ g_listen_ports[dst_port] = process_idx;
+ } else {
+ g_user_ports[dst_port] = process_idx;
+ }
+}
+
+void delete_user_process_port(uint16_t dst_port, enum port_type type)
+{
+ if (type == PORT_LISTEN) {
+ g_listen_ports[dst_port] = INVAILD_PROCESS_IDX;
+ } else {
+ g_user_ports[dst_port] = INVAILD_PROCESS_IDX;
+ }
+}
+
+static void parse_and_create_rule(char* buf)
+{
+ uint32_t array[CREATE_FLOWS_PARAMS_NUM];
+ str_to_array(buf, array, CREATE_FLOWS_PARAMS_NUM);
+ uint32_t src_ip = array[0];
+ uint32_t dst_ip = array[1];
+ uint16_t src_port = array[2];
+ uint16_t dst_port = array[3];
+ uint16_t queue_id = array[4];
+ uint8_t process_idx = array[5];
+ config_flow_director(queue_id, src_ip, dst_ip, src_port, dst_port);
+ add_user_process_port(dst_port, process_idx, PORT_CONNECT);
+}
+
+static void parse_and_add_or_delete_listen_port(char* buf)
+{
+ uint32_t array[ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM];
+ str_to_array(buf, array, ADD_OR_DELETE_LISTEN_PORT_PARAMS_NUM);
+ uint16_t listen_port = array[0];
+ uint8_t process_idx = array[1];
+ uint8_t is_add = array[2];
+ if (is_add == 1) {
+ add_user_process_port(listen_port, process_idx, PORT_LISTEN);
+ } else {
+ delete_user_process_port(listen_port, PORT_LISTEN);
+ }
+}
+
+void transfer_arp_to_other_process(struct rte_mbuf *mbuf)
+{
+ struct cfg_params *cfgs = get_global_cfg_params();
+
+ for (int i = 1; i < cfgs->num_process; i++) {
+ char arp_mbuf[LSTACK_MBUF_LEN] = {0};
+ sprintf_s(arp_mbuf, sizeof(arp_mbuf), "%lu", mbuf);
+ int result = transfer_pkt_to_other_process(arp_mbuf, i, LSTACK_MBUF_LEN, false);
+ if (result == CONNECT_ERROR) {
+ LSTACK_LOG(INFO, LSTACK,"connect process %d failed, ensure the process is started.\n", i);
+ } else if (result == REPLY_ERROR) {
+ LSTACK_LOG(ERR, LSTACK,"transfer arp pakages to process %d error. %m\n", i);
+ }
+ }
+}
+
+static void transfer_tcp_to_thread(struct rte_mbuf *mbuf, uint16_t stk_idx)
+{
+ /* current process queue_id */
+ struct protocol_stack *stack = get_protocol_stack_group()->stacks[stk_idx];
+ int ret = -1;
+ while (ret != 0) {
+ ret = rpc_call_arp(&stack->rpc_queue, mbuf);
+ printf("transfer_tcp_to_thread, ret : %d \n", ret);
+ }
+}
+
+static void parse_arp_and_transefer(char* buf)
+{
+ struct rte_mbuf *mbuf = (struct rte_mbuf *)atoll(buf);
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ struct rte_mbuf *mbuf_copy = NULL;
+ struct protocol_stack *stack = NULL;
+ int32_t ret;
+ for (int32_t i = 0; i < stack_group->stack_num; i++) {
+ stack = stack_group->stacks[i];
+ ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, false);
+ while (ret != 0) {
+ ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, false);
+ stack->stats.rx_allocmbuf_fail++;
+ }
+ copy_mbuf(mbuf_copy, mbuf);
+
+ ret = rpc_call_arp(&stack->rpc_queue, mbuf_copy);
+
+ while (ret != 0) {
+ rpc_call_arp(&stack->rpc_queue, mbuf_copy);
+ }
+ }
+}
+
+static void parse_tcp_and_transefer(char* buf)
+{
+ char *next_token = NULL;
+ char *elem = strtok_s(buf, SPLIT_DELIM, &next_token);
+ struct rte_mbuf *mbuf = (struct rte_mbuf *) atoll(elem);
+ elem = strtok_s(NULL, SPLIT_DELIM, &next_token);
+ uint16_t queue_id = atoll(elem);
+
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ uint16_t num_queue = get_global_cfg_params()->num_queue;
+ uint16_t stk_index = queue_id % num_queue;
+ struct rte_mbuf *mbuf_copy = NULL;
+ struct protocol_stack *stack = stack_group->stacks[stk_index];
+
+ int32_t ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, false);
+ while (ret != 0) {
+ ret = dpdk_alloc_pktmbuf(stack->rxtx_mbuf_pool, &mbuf_copy, 1, false);
+ stack->stats.rx_allocmbuf_fail++;
+ }
+
+ copy_mbuf(mbuf_copy,mbuf);
+ transfer_tcp_to_thread(mbuf_copy, stk_index);
+}
+
+int recv_pkts_from_other_process(int process_index, void* arg)
+{
+ struct sockaddr_un serun, cliun;
+ socklen_t cliun_len;
+ int listenfd, connfd, size;
+ char buf[132];
+ /* socket */
+ if ((listenfd = posix_api->socket_fn(AF_UNIX, SOCK_STREAM, 0)) < 0) {
+ perror("socket error");
+ return -1;
+ }
+ /* bind */
+ memset_s(&serun, sizeof(serun), 0, sizeof(serun));
+ serun.sun_family = AF_UNIX;
+ char process_server_path[PATH_MAX];
+ sprintf_s(process_server_path, sizeof(process_server_path), "%s%d", SERVER_PATH, process_index);
+ strcpy_s(serun.sun_path, sizeof(serun.sun_path), process_server_path);
+ size = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path);
+ unlink(process_server_path);
+ if (posix_api->bind_fn(listenfd, (struct sockaddr *)&serun, size) < 0) {
+ perror("bind error");
+ return -1;
+ }
+ if (posix_api->listen_fn(listenfd, 20) < 0) { /* 20: max backlog */
+ perror("listen error");
+ return -1;
+ }
+ sem_post((sem_t *)arg);
+ /* block */
+ while (1) {
+ cliun_len = sizeof(cliun);
+ if ((connfd = posix_api->accept_fn(listenfd, (struct sockaddr *)&cliun, &cliun_len)) < 0) {
+ perror("accept error");
+ continue;
+ }
+ while (1) {
+ int n = posix_api->read_fn(connfd, buf, sizeof(buf));
+ if (n < 0) {
+ perror("read error");
+ break;
+ } else if (n == 0) {
+ break;
+ }
+
+ if (n == LSTACK_MBUF_LEN) {
+ /* arp */
+ parse_arp_and_transefer(buf);
+ } else if (n == TRANSFER_TCP_MUBF_LEN) {
+ /* tcp. lstack_mbuf_queue_id */
+ parse_tcp_and_transefer(buf);
+ } else if (n == DELETE_FLOWS_PARAMS_LENGTH) {
+ /* delete rule */
+ parse_and_delete_rule(buf);
+ } else if (n == CREATE_FLOWS_PARAMS_LENGTH) {
+ /* add rule */
+ parse_and_create_rule(buf);
+ char reply_buf[REPLY_LEN];
+ sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY);
+ posix_api->write_fn(connfd, reply_buf, REPLY_LEN);
+ } else if (n == GET_LSTACK_NUM) {
+ char reply_buf[REPLY_LEN];
+ sprintf_s(reply_buf, sizeof(reply_buf), "%d", get_global_cfg_params()->num_cpu);
+ posix_api->write_fn(connfd, reply_buf, REPLY_LEN);
+ } else {
+ /* add port */
+ parse_and_add_or_delete_listen_port(buf);
+ char reply_buf[REPLY_LEN];
+ sprintf_s(reply_buf, sizeof(reply_buf), "%s", SUCCESS_REPLY);
+ posix_api->write_fn(connfd, reply_buf, REPLY_LEN);
+ }
+
+ }
+ posix_api->close_fn(connfd);
+ }
+ posix_api->close_fn(listenfd);
+ return 0;
+}
+
+void concat_mbuf_and_queue_id(struct rte_mbuf *mbuf, uint16_t queue_id,
+ char* mbuf_and_queue_id, int write_len)
+{
+ sprintf_s(mbuf_and_queue_id, write_len, "%lu%s%u", mbuf, SPLIT_DELIM, queue_id);
+}
+
+static int mbuf_to_idx(struct rte_mbuf *mbuf, uint16_t *dst_port)
+{
+ struct rte_ether_hdr *ethh = rte_pktmbuf_mtod(mbuf, struct rte_ether_hdr *);
+ u16_t type = rte_be_to_cpu_16(ethh->ether_type);
+ uint32_t index = 0;
+ if (type == RTE_ETHER_TYPE_IPV4) {
+ struct rte_ipv4_hdr *iph = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv4_hdr *, sizeof(struct rte_ether_hdr));
+ uint8_t ip_version = (iph->version_ihl & 0xf0) >> IPV4_VERSION_OFFSET;
+ if (likely(ip_version == IPV4_VERSION)) {
+ if (likely(iph->next_proto_id == IPPROTO_TCP)) {
+ struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_tcp_hdr *,
+ sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv4_hdr));
+ *dst_port = tcp_hdr->dst_port;
+
+ if (unlikely(tcp_hdr->tcp_flags == TCP_SYN)) {
+ uint32_t src_ip = iph->src_addr;
+ uint16_t src_port = tcp_hdr->src_port;
+ index = rte_jhash_3words(src_ip, src_port | ((*dst_port) << 16), 0, 0);
+ } else {
+ return -1;
+ }
+ }
+ }
+ } else if (type == RTE_ETHER_TYPE_IPV6) {
+ struct rte_ipv6_hdr *iph = rte_pktmbuf_mtod_offset(mbuf, struct rte_ipv6_hdr *, sizeof(struct rte_ether_hdr));
+ if (likely(iph->proto == IPPROTO_TCP)) {
+ struct rte_tcp_hdr *tcp_hdr = rte_pktmbuf_mtod_offset(mbuf, struct rte_tcp_hdr *,
+ sizeof(struct rte_ether_hdr) + sizeof(struct rte_ipv6_hdr));
+ *dst_port = tcp_hdr->dst_port;
+
+ if (unlikely(tcp_hdr->tcp_flags == TCP_SYN)) {
+ uint32_t *src_ip = (uint32_t *) &iph->src_addr;
+ uint16_t src_port = tcp_hdr->src_port;
+ uint32_t v = rte_jhash_3words(src_ip[0], src_ip[1], src_ip[2], 0);
+ index = rte_jhash_3words(src_ip[3], src_port | ((*dst_port) << 16), v, 0);
+ } else {
+ return -1;
+ }
+ }
+ } else {
+ return -1;
+ }
+ return index;
+}
+
+int distribute_pakages(struct rte_mbuf *mbuf)
+{
+ uint16_t dst_port = 0;
+ uint32_t index = mbuf_to_idx(mbuf, &dst_port);
+ if (index == -1) {
+ return TRANSFER_CURRENT_THREAD;
+ }
+
+ uint16_t queue_id = 0;
+ uint32_t user_process_idx = 0;
+ int each_process_queue_num = get_global_cfg_params()->num_queue;
+ index = index % each_process_queue_num;
+ if (g_listen_ports[dst_port] != INVAILD_PROCESS_IDX) {
+ user_process_idx = g_listen_ports[dst_port];
+ } else {
+ user_process_idx = g_user_ports[dst_port];
+ }
+
+ if (user_process_idx == INVAILD_PROCESS_IDX) {
+ return TRANSFER_KERNEL;
+ }
+
+ if (get_global_cfg_params()->seperate_send_recv) {
+ queue_id = user_process_idx * each_process_queue_num + (index / 2) * 2;
+ } else {
+ queue_id = user_process_idx * each_process_queue_num + index;
+ }
+ if (queue_id != 0) {
+ if (user_process_idx == 0) {
+ transfer_tcp_to_thread(mbuf, queue_id);
+ } else {
+ char mbuf_and_queue_id[TRANSFER_TCP_MUBF_LEN];
+ concat_mbuf_and_queue_id(mbuf, queue_id, mbuf_and_queue_id, TRANSFER_TCP_MUBF_LEN);
+ transfer_pkt_to_other_process(mbuf_and_queue_id, user_process_idx,
+ TRANSFER_TCP_MUBF_LEN, false);
+ }
+ return TRANSFER_OTHER_THREAD;
+ } else {
+ return TRANSFER_CURRENT_THREAD;
+ }
+
+ return TRANSFER_KERNEL;
+}
+
+void gazelle_listen_thread(void *arg)
+{
+ struct cfg_params *cfg_param = get_global_cfg_params();
+ recv_pkts_from_other_process(cfg_param->process_idx, arg);
+}
+
+void flow_init(void)
+{
+ struct protocol_stack_group *stack_group = get_protocol_stack_group();
+ init_listen_and_user_ports();
+
+ /* run to completion mode does not currently support multiple process */
+ if (!use_ltran() && !get_global_cfg_params()->stack_mode_rtc) {
+ char name[PATH_MAX];
+ sem_init(&stack_group->sem_listen_thread, 0, 0);
+ sprintf_s(name, sizeof(name), "%s", "listen_thread");
+ struct sys_thread *thread = sys_thread_new(name, gazelle_listen_thread,
+ (void*)(&stack_group->sem_listen_thread), 0, 0);
+ free(thread);
+ sem_wait(&stack_group->sem_listen_thread);
+ }
+}
diff --git a/src/lstack/netif/lstack_vdev.c b/src/lstack/netif/lstack_vdev.c
index f78e48a..3703092 100644
--- a/src/lstack/netif/lstack_vdev.c
+++ b/src/lstack/netif/lstack_vdev.c
@@ -33,6 +33,7 @@
#include "lstack_protocol_stack.h"
#include "gazelle_reg_msg.h"
#include "lstack_lwip.h"
+#include "lstack_flow.h"
#include "lstack_vdev.h"
/* INUSE_TX_PKTS_WATERMARK < VDEV_RX_QUEUE_SZ;
--
2.27.0
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。