diff options
Diffstat (limited to 'example/simple_pipeline')
-rw-r--r-- | example/simple_pipeline/.gitignore | 4 | ||||
-rw-r--r-- | example/simple_pipeline/Makefile.am | 34 | ||||
-rw-r--r-- | example/simple_pipeline/odp_simple_pipeline.c | 962 | ||||
-rwxr-xr-x | example/simple_pipeline/simple_pipeline_run.sh | 37 | ||||
-rw-r--r-- | example/simple_pipeline/udp64.pcap | bin | 0 -> 7624 bytes |
5 files changed, 1037 insertions, 0 deletions
diff --git a/example/simple_pipeline/.gitignore b/example/simple_pipeline/.gitignore new file mode 100644 index 000000000..28cb24d41 --- /dev/null +++ b/example/simple_pipeline/.gitignore @@ -0,0 +1,4 @@ +odp_simple_pipeline +pktio_env +*.log +*.trs diff --git a/example/simple_pipeline/Makefile.am b/example/simple_pipeline/Makefile.am new file mode 100644 index 000000000..f258434aa --- /dev/null +++ b/example/simple_pipeline/Makefile.am @@ -0,0 +1,34 @@ +include $(top_srcdir)/example/Makefile.inc + +bin_PROGRAMS = odp_simple_pipeline + +odp_simple_pipeline_SOURCES = odp_simple_pipeline.c + +if test_example +if ODP_PKTIO_PCAP +TESTS = simple_pipeline_run.sh +endif +endif +EXTRA_DIST = simple_pipeline_run.sh udp64.pcap + +# If building out-of-tree, make check will not copy the scripts and data to the +# $(builddir) assuming that all commands are run locally. However this prevents +# running tests on a remote target using LOG_COMPILER. +# So copy all script and data files explicitly here. +all-local: + if [ "x$(srcdir)" != "x$(builddir)" ]; then \ + for f in $(EXTRA_DIST); do \ + if [ -e $(srcdir)/$$f ]; then \ + mkdir -p $(builddir)/$$(dirname $$f); \ + cp -f $(srcdir)/$$f $(builddir)/$$f; \ + fi \ + done \ + fi + ln -f -s $(top_srcdir)/platform/$(with_platform)/test/example/simple_pipeline/pktio_env \ + pktio_env +clean-local: + if [ "x$(srcdir)" != "x$(builddir)" ]; then \ + for f in $(EXTRA_DIST); do \ + rm -f $(builddir)/$$f; \ + done \ + fi diff --git a/example/simple_pipeline/odp_simple_pipeline.c b/example/simple_pipeline/odp_simple_pipeline.c new file mode 100644 index 000000000..ad80bc394 --- /dev/null +++ b/example/simple_pipeline/odp_simple_pipeline.c @@ -0,0 +1,962 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * Copyright (c) 2019 Nokia + */ + + /** + * @example odp_simple_pipeline.c + * + * Simple pipeline example application which receives packets from one + * interface and passes them through 0-N worker stages before outputting them + * from a second network interface. The RX, worker, and TX stages are connected + * using plain queues and each stage is run on a separate CPU thread. + * + * @cond _ODP_HIDE_FROM_DOXYGEN_ + */ + +#include <stdlib.h> +#include <stdio.h> +#include <getopt.h> +#include <signal.h> +#include <unistd.h> +#include <inttypes.h> + +#include <odp_api.h> +#include <odp/helper/odph_api.h> + +#define POOL_PKT_NUM 8192 +#define POOL_PKT_LEN 1536 +#define MAX_PKT_BURST 32 +/* Three threads required for RX, TX and statistics */ +#define MAX_WORKERS (ODP_THREAD_COUNT_MAX - 3) +#define QUEUE_SIZE 1024 +#define MAX_PKTIOS 2 +#define DUMMY_HASH 1234567890 + +/* Get rid of path in filename - only for unix-type paths using '/' */ +#define NO_PATH(file_name) (strrchr((file_name), '/') ? \ + strrchr((file_name), '/') + 1 : (file_name)) + +/* Statistics */ +typedef union ODP_ALIGNED_CACHE { + struct { + uint64_t pps; /* Packet per second */ + uint64_t rx_cnt; /* RX packets */ + uint64_t tx_cnt; /* TX packets */ + uint64_t rx_drops; /* Dropped packets on RX */ + uint64_t tx_drops; /* Dropped packets on TX */ + } s; + uint8_t padding[ODP_CACHE_LINE_SIZE]; +} stats_t; + +/* Thread specific data */ +typedef struct thread_args_t { + odp_queue_t rx_queue; + odp_queue_t tx_queue; + stats_t stats; +} thread_args_t; + +/* Parsed command line application arguments */ +typedef struct { + char **if_names; /* Array of pointers to interface names */ + odph_ethaddr_t dst_addr; /* Destination MAC address */ + int accuracy; /* Statistics print interval in seconds */ + int extra_work; /* Add extra processing to worker stage */ + int dst_change; /* Change destination eth address */ + int src_change; /* Change source eth address */ + int dst_set; /* Custom destination eth address given */ + int time; /* Time in seconds to run. */ + int num_workers; /* Number of pipeline worker stages */ + char *if_str; /* Storage for interface names */ +} appl_args_t; + +/* Global application data */ +typedef struct { + odp_queue_t queue[ODP_THREAD_COUNT_MAX]; + /* Thread specific arguments */ + thread_args_t thread[ODP_THREAD_COUNT_MAX]; + /* Barriers to synchronize main and workers */ + odp_barrier_t init_barrier; + odp_barrier_t term_barrier; + /* Pktio interfaces */ + odp_pktio_t if0, if1; + odp_pktin_queue_t if0in, if1in; + odp_pktout_queue_t if0out, if1out; + odph_ethaddr_t src_addr; /* Source MAC address */ + odph_ethaddr_t dst_addr; /* Destination MAC address */ + odp_atomic_u32_t exit_threads; + /* Application (parsed) arguments */ + appl_args_t appl; +} global_data_t; + +static global_data_t *global; + +static void sig_handler(int signo ODP_UNUSED) +{ + odp_atomic_store_u32(&global->exit_threads, 1); +} + +static odp_pktio_t create_pktio(const char *name, odp_pool_t pool, + odp_pktin_queue_t *pktin, + odp_pktout_queue_t *pktout) +{ + odp_pktio_param_t pktio_param; + odp_pktin_queue_param_t in_param; + odp_pktout_queue_param_t out_param; + odp_pktio_t pktio; + odp_pktio_config_t config; + + odp_pktio_param_init(&pktio_param); + + pktio = odp_pktio_open(name, pool, &pktio_param); + if (pktio == ODP_PKTIO_INVALID) { + printf("Error: failed to open %s\n", name); + exit(1); + } + + odp_pktio_config_init(&config); + config.parser.layer = ODP_PROTO_LAYER_L2; + odp_pktio_config(pktio, &config); + + odp_pktin_queue_param_init(&in_param); + odp_pktout_queue_param_init(&out_param); + + in_param.op_mode = ODP_PKTIO_OP_MT_UNSAFE; + + if (odp_pktin_queue_config(pktio, &in_param)) { + printf("Error: failed to config input queue for %s\n", name); + exit(1); + } + + out_param.op_mode = ODP_PKTIO_OP_MT_UNSAFE; + + if (odp_pktout_queue_config(pktio, &out_param)) { + printf("Error: failed to config output queue for %s\n", name); + exit(1); + } + + if (odp_pktin_queue(pktio, pktin, 1) != 1) { + printf("Error: pktin queue query failed for %s\n", name); + exit(1); + } + if (odp_pktout_queue(pktio, pktout, 1) != 1) { + printf("Error: pktout queue query failed for %s\n", name); + exit(1); + } + return pktio; +} + +/* + * Fill packets' eth addresses and convert packets to events + * + * pkt_tbl Array of packets + * event_tbl[out] Array of events + * num Number of packets in the array + */ +static inline unsigned int prep_events(odp_packet_t pkt_tbl[], + odp_event_t event_tbl[], + unsigned int num) +{ + unsigned int i; + unsigned int events = 0; + + if (!global->appl.dst_change && !global->appl.src_change) { + odp_packet_to_event_multi(pkt_tbl, event_tbl, num); + return num; + } + + for (i = 0; i < num; ++i) { + odp_packet_t pkt = pkt_tbl[i]; + odph_ethhdr_t *eth; + + odp_packet_prefetch(pkt, 0, ODPH_ETHHDR_LEN); + + if (odp_unlikely(!odp_packet_has_eth(pkt))) { + odp_packet_free(pkt); + continue; + } + + eth = odp_packet_data(pkt); + + if (global->appl.src_change) + eth->src = global->src_addr; + + if (global->appl.dst_change) + eth->dst = global->dst_addr; + + event_tbl[events++] = odp_packet_to_event(pkt); + } + return events; +} + +static inline int rx_thread(void *arg) +{ + thread_args_t *thr_args = arg; + odp_event_t event_tbl[MAX_PKT_BURST]; + odp_packet_t pkt_tbl[MAX_PKT_BURST]; + odp_pktin_queue_t pktin_queue = global->if0in; + odp_queue_t out_queue = thr_args->tx_queue; + stats_t *stats = &thr_args->stats; + int pkts, events, sent, drops; + + odp_barrier_wait(&global->init_barrier); + + while (!odp_atomic_load_u32(&global->exit_threads)) { + pkts = odp_pktin_recv(pktin_queue, pkt_tbl, MAX_PKT_BURST); + if (odp_unlikely(pkts <= 0)) + continue; + + stats->s.rx_cnt += pkts; + + events = prep_events(pkt_tbl, event_tbl, pkts); + drops = events - pkts; + if (odp_unlikely(drops)) + stats->s.rx_drops += pkts - events; + + sent = odp_queue_enq_multi(out_queue, event_tbl, events); + if (odp_unlikely(sent < 0)) + sent = 0; + + stats->s.tx_cnt += sent; + + drops = events - sent; + if (odp_unlikely(drops)) { + stats->s.tx_drops += drops; + odp_packet_free_multi(&pkt_tbl[sent], drops); + } + } + + /* Wait until pktio devices are stopped */ + odp_barrier_wait(&global->term_barrier); + + return 0; +} + +static inline int tx_thread(void *arg) +{ + thread_args_t *thr_args = arg; + odp_event_t event_tbl[MAX_PKT_BURST]; + odp_packet_t pkt_tbl[MAX_PKT_BURST]; + odp_queue_t rx_queue = thr_args->rx_queue; + odp_pktout_queue_t pktout_queue = global->if1out; + stats_t *stats = &thr_args->stats; + int events, sent, tx_drops; + + odp_barrier_wait(&global->init_barrier); + + while (!odp_atomic_load_u32(&global->exit_threads)) { + events = odp_queue_deq_multi(rx_queue, event_tbl, + MAX_PKT_BURST); + if (odp_unlikely(events <= 0)) + continue; + + stats->s.rx_cnt += events; + + odp_packet_from_event_multi(pkt_tbl, event_tbl, events); + + sent = odp_pktout_send(pktout_queue, pkt_tbl, events); + if (odp_unlikely(sent < 0)) + sent = 0; + + stats->s.tx_cnt += sent; + + tx_drops = events - sent; + if (odp_unlikely(tx_drops)) { + stats->s.tx_drops += tx_drops; + odp_packet_free_multi(&pkt_tbl[sent], tx_drops); + } + } + + /* Wait until pktio devices are stopped */ + odp_barrier_wait(&global->term_barrier); + + /* Empty queue before exiting */ + events = 1; + while (events > 0) { + events = odp_queue_deq_multi(rx_queue, event_tbl, + MAX_PKT_BURST); + + if (events > 0) + odp_event_free_multi(event_tbl, events); + } + + return 0; +} + +/* + * Work on packets + */ +static inline void work_on_events(odp_event_t event_tbl[], unsigned int num) +{ + unsigned int i; + + for (i = 0; i < num; i++) { + odp_packet_t pkt = odp_packet_from_event(event_tbl[i]); + + if (odp_hash_crc32c(odp_packet_data(pkt), + odp_packet_seg_len(pkt), 123) == DUMMY_HASH) + printf("Dummy hash match\n"); + } +} + +static inline int worker_thread(void *arg ODP_UNUSED) +{ + thread_args_t *thr_args = arg; + odp_event_t event_tbl[MAX_PKT_BURST]; + stats_t *stats = &thr_args->stats; + odp_queue_t rx_queue = thr_args->rx_queue; + odp_queue_t tx_queue = thr_args->tx_queue; + int events, sent, tx_drops; + int extra_work = global->appl.extra_work; + + odp_barrier_wait(&global->init_barrier); + + while (!odp_atomic_load_u32(&global->exit_threads)) { + events = odp_queue_deq_multi(rx_queue, event_tbl, + MAX_PKT_BURST); + + if (odp_unlikely(events <= 0)) + continue; + + stats->s.rx_cnt += events; + + if (extra_work) + work_on_events(event_tbl, events); + + sent = odp_queue_enq_multi(tx_queue, event_tbl, events); + if (odp_unlikely(sent < 0)) + sent = 0; + + stats->s.tx_cnt += sent; + + tx_drops = events - sent; + if (odp_unlikely(tx_drops)) { + stats->s.tx_drops += tx_drops; + odp_event_free_multi(&event_tbl[sent], tx_drops); + } + } + + /* Wait until pktio devices are stopped */ + odp_barrier_wait(&global->term_barrier); + + /* Empty queue before exiting */ + events = 1; + while (events > 0) { + events = odp_queue_deq_multi(rx_queue, event_tbl, + MAX_PKT_BURST); + + if (events > 0) + odp_event_free_multi(event_tbl, events); + } + + return 0; +} + +static int setup_thread_masks(odp_cpumask_t *thr_mask_rx, + odp_cpumask_t *thr_mask_tx, + odp_cpumask_t *thr_mask_workers, + int num_workers) +{ + odp_cpumask_t cpumask; + int num_threads = 0; + int i, cpu; + + if (num_workers > MAX_WORKERS) { + printf("Worker count limited to MAX_WORKERS define (=%d)\n", + MAX_WORKERS); + num_workers = MAX_WORKERS; + } + + /* Two threads required for RX and TX*/ + num_threads = num_workers + 2; + + num_workers = odp_cpumask_default_worker(&cpumask, num_threads); + if (num_workers != num_threads) { + printf("Error: Not enough available CPU cores: %d/%d\n", + num_workers, num_threads); + exit(1); + } + + odp_cpumask_zero(thr_mask_rx); + odp_cpumask_zero(thr_mask_tx); + odp_cpumask_zero(thr_mask_workers); + + cpu = odp_cpumask_first(&cpumask); + for (i = 0; i < num_threads; i++) { + if (i == 0) + odp_cpumask_set(thr_mask_rx, cpu); + else if (i == 1) + odp_cpumask_set(thr_mask_tx, cpu); + else + odp_cpumask_set(thr_mask_workers, cpu); + cpu = odp_cpumask_next(&cpumask, cpu); + } + + return num_threads; +} + +/* + * Print statistics + * + * num_workers Number of worker threads + * thr_stats Pointers to stats storage + * duration Number of seconds to loop in + * timeout Number of seconds for stats calculation + */ +static int print_speed_stats(int num_workers, stats_t **thr_stats, + int duration, int timeout) +{ + uint64_t total_pkts = 0; + uint64_t pkts_prev = 0; + uint64_t maximum_pps = 0; + stats_t thr_stats_prev[num_workers]; + int i; + int elapsed = 0; + int stats_enabled = 1; + int loop_forever = (duration == 0); + + memset(thr_stats_prev, 0, sizeof(thr_stats_prev)); + + if (timeout <= 0) { + stats_enabled = 0; + timeout = 1; + } + + /* Wait for all threads to be ready*/ + odp_barrier_wait(&global->init_barrier); + + do { + uint64_t total_rx_drops = 0; + uint64_t total_tx_drops = 0; + uint64_t pps; + + sleep(timeout); + + for (i = 0; i < num_workers; i++) { + uint64_t rx_cnt = thr_stats[i]->s.rx_cnt; + uint64_t tx_cnt = thr_stats[i]->s.tx_cnt; + uint64_t rx_drops = thr_stats[i]->s.rx_drops; + uint64_t tx_drops = thr_stats[i]->s.tx_drops; + + /* Count only transmitted packets */ + if (i == (num_workers - 1)) + total_pkts = tx_cnt; + + total_rx_drops += rx_drops; + total_tx_drops += tx_drops; + + pps = (tx_cnt - thr_stats_prev[i].s.tx_cnt) / timeout; + thr_stats_prev[i].s.pps = pps; + thr_stats_prev[i].s.rx_cnt = rx_cnt; + thr_stats_prev[i].s.tx_cnt = tx_cnt; + thr_stats_prev[i].s.rx_drops = rx_drops; + thr_stats_prev[i].s.tx_drops = tx_drops; + } + if (stats_enabled) { + printf("----------------------------------------\n"); + for (i = 0; i < num_workers; i++) { + if (i == 0) + printf("RX thread: "); + else if (i == (num_workers - 1)) + printf("TX thread: "); + else + printf("Worker %d: ", i - 1); + + printf("%" PRIu64 " pps, " + "%" PRIu64 " rx drops, " + "%" PRIu64 " tx drops\n", + thr_stats_prev[i].s.pps, + thr_stats_prev[i].s.rx_drops, + thr_stats_prev[i].s.tx_drops); + } + pps = (total_pkts - pkts_prev) / timeout; + if (pps > maximum_pps) + maximum_pps = pps; + printf("TOTAL: %" PRIu64 " pps, " + "%" PRIu64 " rx drops, " + "%" PRIu64 " tx drops, " + "%" PRIu64 " max pps\n", + pps, total_rx_drops, total_tx_drops, + maximum_pps); + + pkts_prev = total_pkts; + } + elapsed += timeout; + } while (!odp_atomic_load_u32(&global->exit_threads) && (loop_forever || + (elapsed < duration))); + + if (stats_enabled) + printf("TEST RESULT: %" PRIu64 " maximum packets per second.\n", + maximum_pps); + + return total_pkts > 0 ? 0 : -1; +} + +/* + * Print system and application info + */ +static void print_info(char *progname, appl_args_t *appl_args) +{ + odp_sys_info_print(); + + printf("Running ODP appl: \"%s\"\n" + "-----------------\n" + "Using IFs: %s %s\n" + "Worker stages: %d\n" + "Extra work: %d\n\n", + progname, appl_args->if_names[0], appl_args->if_names[1], + appl_args->num_workers, appl_args->extra_work); + + fflush(NULL); +} + +/* + * Print usage information + */ +static void usage(char *progname) +{ + printf("\n" + "OpenDataPlane simple pipeline example application.\n" + "\n" + "Usage: %s [options]\n" + "\n" + " E.g. %s -i eth0,eth1 -e -w 3\n\n" + " ---- ---- ---- ---- ----\n" + " | RX | -> | W1 | -> | W2 | -> | W3 | -> | TX |\n" + " ---- ---- ---- ---- ----\n\n" + " In the above example,\n" + " each application stage is executed by a separate CPU thread and the stages\n" + " are connected using plain queues. The RX stage receives packets from eth0 and\n" + " enqueues them to the first worker stage (W1). The workers stages calculate\n" + " CRC-32C over packet data. After the final worker stage (W3) has processed\n" + " packets they are enqueued to the TX stage, which transmits the packets out\n" + " from interface eth1.\n" + "\n" + "Mandatory OPTIONS:\n" + " -i, --interface <name> Two eth interfaces (comma-separated, no spaces)\n" + "\n" + "Optional OPTIONS:\n" + " -a, --accuracy <sec> Time in seconds get print statistics\n" + " (default is 10 seconds).\n" + " -d, --dst_change <arg> 0: Don't change packets' dst eth addresses\n" + " 1: Change packets' dst eth addresses (default)\n" + " -s, --src_change <arg> 0: Don't change packets' src eth addresses\n" + " 1: Change packets' src eth addresses (default)\n" + " -r, --dst_addr <addr> Destination address\n" + " Requires also the -d flag to be set\n" + " -t, --time <sec> Time in seconds to run\n" + " -w, --workers <num> Number of worker stages (default 0)\n" + " -e, --extra-work Calculate CRC-32C over packet data in worker stage\n" + " -h, --help Display help and exit\n\n" + "\n", NO_PATH(progname), NO_PATH(progname) + ); +} + +/* + * Parse and store the command line arguments + * + * argc Argument count + * argv Argument vector + * appl_args[out] Storage for application arguments + */ +static void parse_args(int argc, char *argv[], appl_args_t *appl_args) +{ + char *token; + size_t len; + int opt; + int long_index; + int i; + int if_count = 0; + static const struct option longopts[] = { + {"accuracy", required_argument, NULL, 'a'}, + {"extra-work", no_argument, NULL, 'e'}, + {"dst_addr", required_argument, NULL, 'r'}, + {"dst_change", required_argument, NULL, 'd'}, + {"src_change", required_argument, NULL, 's'}, + {"interface", required_argument, NULL, 'i'}, + {"time", required_argument, NULL, 't'}, + {"workers", required_argument, NULL, 'w'}, + {"help", no_argument, NULL, 'h'}, + {NULL, 0, NULL, 0} + }; + + static const char *shortopts = "+a:d:er:s:t:i:w:h"; + + appl_args->accuracy = 10; /* get and print pps stats second */ + appl_args->dst_change = 1; /* change eth dst address by default */ + appl_args->src_change = 1; /* change eth src address by default */ + appl_args->time = 0; /* loop forever if time to run is 0 */ + appl_args->extra_work = 0; + + while (1) { + opt = getopt_long(argc, argv, shortopts, longopts, &long_index); + + if (opt == -1) + break; /* No more options */ + + switch (opt) { + case 'a': + appl_args->accuracy = atoi(optarg); + break; + case 'd': + appl_args->dst_change = atoi(optarg); + break; + case 'e': + appl_args->extra_work = 1; + break; + case 'r': + len = strlen(optarg); + if (len == 0) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + len += 1; /* add room for '\0' */ + + if (odph_eth_addr_parse(&appl_args->dst_addr, + optarg) != 0) { + printf("invalid MAC address\n"); + usage(argv[0]); + exit(EXIT_FAILURE); + } + + appl_args->dst_set = 1; + + break; + case 's': + appl_args->src_change = atoi(optarg); + break; + case 't': + appl_args->time = atoi(optarg); + break; + case 'i': + len = strlen(optarg); + if (len == 0) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + len += 1; /* add room for '\0' */ + + appl_args->if_str = malloc(len); + if (appl_args->if_str == NULL) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + /* count the number of tokens separated by ',' */ + strcpy(appl_args->if_str, optarg); + for (token = strtok(appl_args->if_str, ","), i = 0; + token != NULL; + token = strtok(NULL, ","), i++) + ; + + if_count = i; + + if (if_count != 2) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + /* allocate storage for the if names */ + appl_args->if_names = calloc(if_count, sizeof(char *)); + + /* store the if names (reset names string) */ + strcpy(appl_args->if_str, optarg); + for (token = strtok(appl_args->if_str, ","), i = 0; + token != NULL; token = strtok(NULL, ","), i++) { + appl_args->if_names[i] = token; + } + break; + case 'w': + appl_args->num_workers = atoi(optarg); + break; + case 'h': + usage(argv[0]); + exit(EXIT_SUCCESS); + break; + default: + break; + } + } + + if (if_count != 2) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + optind = 1; /* reset 'extern optind' from the getopt lib */ +} + +int main(int argc, char **argv) +{ + odp_cpumask_t thr_mask_rx; + odp_cpumask_t thr_mask_tx; + odp_cpumask_t thr_mask_worker; + odp_init_t init_param; + odp_instance_t instance; + odp_pool_t pool; + odp_pool_capability_t pool_capa; + odp_pool_param_t pool_param; + odp_queue_capability_t queue_capa; + odp_queue_param_t queue_param; + odp_shm_t shm; + odph_helper_options_t helper_options; + odph_thread_t thr_tbl[ODP_THREAD_COUNT_MAX]; + odph_thread_param_t thr_param[ODP_THREAD_COUNT_MAX]; + odph_thread_common_param_t thr_common; + odph_ethaddr_t new_addr; + stats_t *stats[ODP_THREAD_COUNT_MAX]; + thread_args_t *thr_args; + uint32_t pkt_len, seg_len, pkt_num; + int num_threads, num_workers; + int i; + int ret; + + /* Let helper collect its own arguments (e.g. --odph_proc) */ + argc = odph_parse_options(argc, argv); + if (odph_options(&helper_options)) { + printf("Error: reading ODP helper options failed.\n"); + exit(EXIT_FAILURE); + } + + odp_init_param_init(&init_param); + init_param.mem_model = helper_options.mem_model; + + if (odp_init_global(&instance, &init_param, NULL)) { + printf("Error: ODP global init failed.\n"); + exit(1); + } + + if (odp_init_local(instance, ODP_THREAD_CONTROL)) { + printf("Error: ODP local init failed.\n"); + exit(1); + } + + /* Reserve memory for global data */ + shm = odp_shm_reserve("simple_pipeline", sizeof(global_data_t), + ODP_CACHE_LINE_SIZE, 0); + if (shm == ODP_SHM_INVALID) { + printf("Error: shared mem reserve failed.\n"); + exit(EXIT_FAILURE); + } + + global = odp_shm_addr(shm); + if (global == NULL) { + printf("Error: shared mem alloc failed.\n"); + exit(EXIT_FAILURE); + } + + memset(global, 0, sizeof(global_data_t)); + odp_atomic_init_u32(&global->exit_threads, 0); + + signal(SIGINT, sig_handler); + + /* Parse and store the application arguments */ + parse_args(argc, argv, &global->appl); + + num_threads = setup_thread_masks(&thr_mask_rx, &thr_mask_tx, + &thr_mask_worker, + global->appl.num_workers); + num_workers = num_threads - 2; + + /* Print both system and application information */ + print_info(NO_PATH(argv[0]), &global->appl); + + /* Create queues for pipeline */ + if (odp_queue_capability(&queue_capa)) { + printf("Error: reading queue capability failed.\n"); + exit(EXIT_FAILURE); + } + if (queue_capa.plain.max_num < (unsigned int)num_threads) { + printf("Error: insufficient number of queues supported.\n"); + exit(EXIT_FAILURE); + } + + odp_queue_param_init(&queue_param); + queue_param.type = ODP_QUEUE_TYPE_PLAIN; + queue_param.enq_mode = ODP_QUEUE_OP_MT_UNSAFE; + queue_param.deq_mode = ODP_QUEUE_OP_MT_UNSAFE; + queue_param.size = QUEUE_SIZE; + if (queue_capa.plain.max_size && + queue_param.size > queue_capa.plain.max_size) + queue_param.size = queue_capa.plain.max_size; + for (i = 0; i < num_threads; i++) { + odp_queue_t queue = odp_queue_create("plain_queue", + &queue_param); + + if (queue == ODP_QUEUE_INVALID) { + printf("Error: queue create failed.\n"); + exit(EXIT_FAILURE); + } + global->queue[i] = queue; + } + + /* Create packet pool */ + if (odp_pool_capability(&pool_capa)) { + printf("Error: reading pool capability failed.\n"); + exit(EXIT_FAILURE); + } + + pkt_len = POOL_PKT_LEN; + seg_len = POOL_PKT_LEN; + pkt_num = POOL_PKT_NUM; + + if (pool_capa.pkt.max_len && pkt_len > pool_capa.pkt.max_len) + pkt_len = pool_capa.pkt.max_len; + + if (pool_capa.pkt.max_seg_len && seg_len > pool_capa.pkt.max_seg_len) + seg_len = pool_capa.pkt.max_seg_len; + + if (pool_capa.pkt.max_num && pkt_num > pool_capa.pkt.max_num) + pkt_num = pool_capa.pkt.max_num; + + odp_pool_param_init(&pool_param); + pool_param.pkt.seg_len = seg_len; + pool_param.pkt.len = pkt_len; + pool_param.pkt.num = pkt_num; + pool_param.type = ODP_POOL_PACKET; + + pool = odp_pool_create("packet pool", &pool_param); + if (pool == ODP_POOL_INVALID) { + printf("Error: packet pool create failed.\n"); + exit(1); + } + + global->if0 = create_pktio(global->appl.if_names[0], pool, + &global->if0in, &global->if0out); + global->if1 = create_pktio(global->appl.if_names[1], pool, + &global->if1in, &global->if1out); + + /* Save TX interface Ethernet address */ + if (odp_pktio_mac_addr(global->if1, global->src_addr.addr, + ODPH_ETHADDR_LEN) != ODPH_ETHADDR_LEN) { + printf("Error: TX interface Ethernet address unknown\n"); + exit(EXIT_FAILURE); + } + + /* Save destination Ethernet address */ + if (global->appl.dst_change) { + /* 02:00:00:00:00:XX */ + memset(&new_addr, 0, sizeof(odph_ethaddr_t)); + if (global->appl.dst_set) { + memcpy(&new_addr, &global->appl.dst_addr, + sizeof(odph_ethaddr_t)); + } else { + new_addr.addr[0] = 0x02; + new_addr.addr[5] = 1; + } + global->dst_addr = new_addr; + } + + if (odp_pktio_start(global->if0)) { + printf("Error: unable to start input interface\n"); + exit(1); + } + if (odp_pktio_start(global->if1)) { + printf("Error: unable to start output interface\n"); + exit(1); + } + + odp_barrier_init(&global->init_barrier, num_threads + 1); + odp_barrier_init(&global->term_barrier, num_threads + 1); + + for (i = 0; i < num_threads; i++) + stats[i] = &global->thread[i].stats; + + memset(thr_tbl, 0, sizeof(thr_tbl)); + odph_thread_common_param_init(&thr_common); + + thr_common.instance = instance; + + /* RX thread */ + thr_args = &global->thread[0]; + thr_args->tx_queue = global->queue[0]; + odph_thread_param_init(&thr_param[0]); + thr_param[0].start = rx_thread; + thr_param[0].arg = thr_args; + thr_param[0].thr_type = ODP_THREAD_WORKER; + thr_common.cpumask = &thr_mask_rx; + odph_thread_create(thr_tbl, &thr_common, thr_param, 1); + + /* Worker threads */ + for (i = 0; i < num_workers; i++) { + thr_args = &global->thread[i + 1]; + thr_args->rx_queue = global->queue[i]; + thr_args->tx_queue = global->queue[i + 1]; + + odph_thread_param_init(&thr_param[i]); + thr_param[i].start = worker_thread; + thr_param[i].arg = thr_args; + thr_param[i].thr_type = ODP_THREAD_WORKER; + } + + if (num_workers) { + thr_common.cpumask = &thr_mask_worker; + odph_thread_create(&thr_tbl[1], &thr_common, thr_param, + num_workers); + } + + /* TX thread */ + thr_args = &global->thread[num_threads - 1]; + thr_args->rx_queue = global->queue[num_workers]; + odph_thread_param_init(&thr_param[0]); + thr_param[0].start = tx_thread; + thr_param[0].arg = thr_args; + thr_param[0].thr_type = ODP_THREAD_WORKER; + thr_common.cpumask = &thr_mask_tx; + odph_thread_create(&thr_tbl[num_threads - 1], &thr_common, thr_param, + 1); + + ret = print_speed_stats(num_threads, stats, global->appl.time, + global->appl.accuracy); + + if (odp_pktio_stop(global->if0)) { + printf("Error: failed to stop interface %s\n", argv[1]); + exit(EXIT_FAILURE); + } + if (odp_pktio_stop(global->if1)) { + printf("Error: failed to stop interface %s\n", argv[2]); + exit(EXIT_FAILURE); + } + + odp_atomic_store_u32(&global->exit_threads, 1); + odp_barrier_wait(&global->term_barrier); + + odph_thread_join(thr_tbl, num_threads); + + if (odp_pktio_close(global->if0)) { + printf("Error: failed to close interface %s\n", argv[1]); + exit(EXIT_FAILURE); + } + if (odp_pktio_close(global->if1)) { + printf("Error: failed to close interface %s\n", argv[2]); + exit(EXIT_FAILURE); + } + + for (i = 0; i < num_threads; i++) { + if (odp_queue_destroy(global->queue[i])) { + printf("Error: failed to destroy queue %d\n", i); + exit(EXIT_FAILURE); + } + } + + if (odp_pool_destroy(pool)) { + printf("Error: pool destroy\n"); + exit(EXIT_FAILURE); + } + + if (odp_shm_free(shm)) { + printf("Error: shm free global data\n"); + exit(EXIT_FAILURE); + } + + if (odp_term_local()) { + printf("Error: term local\n"); + exit(EXIT_FAILURE); + } + + if (odp_term_global(instance)) { + printf("Error: term global\n"); + exit(EXIT_FAILURE); + } + + return ret; +} diff --git a/example/simple_pipeline/simple_pipeline_run.sh b/example/simple_pipeline/simple_pipeline_run.sh new file mode 100755 index 000000000..ba66e506e --- /dev/null +++ b/example/simple_pipeline/simple_pipeline_run.sh @@ -0,0 +1,37 @@ +#!/bin/bash +# +# SPDX-License-Identifier: BSD-3-Clause +# Copyright (c) 2019 Nokia +# + +# Exit code expected by automake for skipped tests +TEST_SKIPPED=77 + +if [ -f ./pktio_env ]; then + . ./pktio_env +else + echo "BUG: unable to find pktio_env!" + echo "pktio_env has to be in current directory" + exit 1 +fi + +if [ $(nproc --all) -lt 3 ]; then + echo "Not enough CPU cores. Skipping test." + exit $TEST_SKIPPED +fi + +setup_interfaces + +./odp_simple_pipeline${EXEEXT} -i $IF0,$IF1 -e -t 1 -a 1 +STATUS=$? + +if [ "$STATUS" -ne 0 ]; then + echo "Error: status was: $STATUS, expected 0" + exit 1 +fi + +validate_result + +cleanup_interfaces + +exit 0 diff --git a/example/simple_pipeline/udp64.pcap b/example/simple_pipeline/udp64.pcap Binary files differnew file mode 100644 index 000000000..45f9d6e63 --- /dev/null +++ b/example/simple_pipeline/udp64.pcap |