aboutsummaryrefslogtreecommitdiff
path: root/example/simple_pipeline
diff options
context:
space:
mode:
Diffstat (limited to 'example/simple_pipeline')
-rw-r--r--example/simple_pipeline/.gitignore4
-rw-r--r--example/simple_pipeline/Makefile.am34
-rw-r--r--example/simple_pipeline/odp_simple_pipeline.c962
-rwxr-xr-xexample/simple_pipeline/simple_pipeline_run.sh37
-rw-r--r--example/simple_pipeline/udp64.pcapbin0 -> 7624 bytes
5 files changed, 1037 insertions, 0 deletions
diff --git a/example/simple_pipeline/.gitignore b/example/simple_pipeline/.gitignore
new file mode 100644
index 000000000..28cb24d41
--- /dev/null
+++ b/example/simple_pipeline/.gitignore
@@ -0,0 +1,4 @@
+odp_simple_pipeline
+pktio_env
+*.log
+*.trs
diff --git a/example/simple_pipeline/Makefile.am b/example/simple_pipeline/Makefile.am
new file mode 100644
index 000000000..f258434aa
--- /dev/null
+++ b/example/simple_pipeline/Makefile.am
@@ -0,0 +1,34 @@
+include $(top_srcdir)/example/Makefile.inc
+
+bin_PROGRAMS = odp_simple_pipeline
+
+odp_simple_pipeline_SOURCES = odp_simple_pipeline.c
+
+if test_example
+if ODP_PKTIO_PCAP
+TESTS = simple_pipeline_run.sh
+endif
+endif
+EXTRA_DIST = simple_pipeline_run.sh udp64.pcap
+
+# If building out-of-tree, make check will not copy the scripts and data to the
+# $(builddir) assuming that all commands are run locally. However this prevents
+# running tests on a remote target using LOG_COMPILER.
+# So copy all script and data files explicitly here.
+all-local:
+ if [ "x$(srcdir)" != "x$(builddir)" ]; then \
+ for f in $(EXTRA_DIST); do \
+ if [ -e $(srcdir)/$$f ]; then \
+ mkdir -p $(builddir)/$$(dirname $$f); \
+ cp -f $(srcdir)/$$f $(builddir)/$$f; \
+ fi \
+ done \
+ fi
+ ln -f -s $(top_srcdir)/platform/$(with_platform)/test/example/simple_pipeline/pktio_env \
+ pktio_env
+clean-local:
+ if [ "x$(srcdir)" != "x$(builddir)" ]; then \
+ for f in $(EXTRA_DIST); do \
+ rm -f $(builddir)/$$f; \
+ done \
+ fi
diff --git a/example/simple_pipeline/odp_simple_pipeline.c b/example/simple_pipeline/odp_simple_pipeline.c
new file mode 100644
index 000000000..ad80bc394
--- /dev/null
+++ b/example/simple_pipeline/odp_simple_pipeline.c
@@ -0,0 +1,962 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright (c) 2019 Nokia
+ */
+
+ /**
+ * @example odp_simple_pipeline.c
+ *
+ * Simple pipeline example application which receives packets from one
+ * interface and passes them through 0-N worker stages before outputting them
+ * from a second network interface. The RX, worker, and TX stages are connected
+ * using plain queues and each stage is run on a separate CPU thread.
+ *
+ * @cond _ODP_HIDE_FROM_DOXYGEN_
+ */
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <getopt.h>
+#include <signal.h>
+#include <unistd.h>
+#include <inttypes.h>
+
+#include <odp_api.h>
+#include <odp/helper/odph_api.h>
+
+#define POOL_PKT_NUM 8192
+#define POOL_PKT_LEN 1536
+#define MAX_PKT_BURST 32
+/* Three threads required for RX, TX and statistics */
+#define MAX_WORKERS (ODP_THREAD_COUNT_MAX - 3)
+#define QUEUE_SIZE 1024
+#define MAX_PKTIOS 2
+#define DUMMY_HASH 1234567890
+
+/* Get rid of path in filename - only for unix-type paths using '/' */
+#define NO_PATH(file_name) (strrchr((file_name), '/') ? \
+ strrchr((file_name), '/') + 1 : (file_name))
+
+/* Statistics */
+typedef union ODP_ALIGNED_CACHE {
+ struct {
+ uint64_t pps; /* Packet per second */
+ uint64_t rx_cnt; /* RX packets */
+ uint64_t tx_cnt; /* TX packets */
+ uint64_t rx_drops; /* Dropped packets on RX */
+ uint64_t tx_drops; /* Dropped packets on TX */
+ } s;
+ uint8_t padding[ODP_CACHE_LINE_SIZE];
+} stats_t;
+
+/* Thread specific data */
+typedef struct thread_args_t {
+ odp_queue_t rx_queue;
+ odp_queue_t tx_queue;
+ stats_t stats;
+} thread_args_t;
+
+/* Parsed command line application arguments */
+typedef struct {
+ char **if_names; /* Array of pointers to interface names */
+ odph_ethaddr_t dst_addr; /* Destination MAC address */
+ int accuracy; /* Statistics print interval in seconds */
+ int extra_work; /* Add extra processing to worker stage */
+ int dst_change; /* Change destination eth address */
+ int src_change; /* Change source eth address */
+ int dst_set; /* Custom destination eth address given */
+ int time; /* Time in seconds to run. */
+ int num_workers; /* Number of pipeline worker stages */
+ char *if_str; /* Storage for interface names */
+} appl_args_t;
+
+/* Global application data */
+typedef struct {
+ odp_queue_t queue[ODP_THREAD_COUNT_MAX];
+ /* Thread specific arguments */
+ thread_args_t thread[ODP_THREAD_COUNT_MAX];
+ /* Barriers to synchronize main and workers */
+ odp_barrier_t init_barrier;
+ odp_barrier_t term_barrier;
+ /* Pktio interfaces */
+ odp_pktio_t if0, if1;
+ odp_pktin_queue_t if0in, if1in;
+ odp_pktout_queue_t if0out, if1out;
+ odph_ethaddr_t src_addr; /* Source MAC address */
+ odph_ethaddr_t dst_addr; /* Destination MAC address */
+ odp_atomic_u32_t exit_threads;
+ /* Application (parsed) arguments */
+ appl_args_t appl;
+} global_data_t;
+
+static global_data_t *global;
+
+static void sig_handler(int signo ODP_UNUSED)
+{
+ odp_atomic_store_u32(&global->exit_threads, 1);
+}
+
+static odp_pktio_t create_pktio(const char *name, odp_pool_t pool,
+ odp_pktin_queue_t *pktin,
+ odp_pktout_queue_t *pktout)
+{
+ odp_pktio_param_t pktio_param;
+ odp_pktin_queue_param_t in_param;
+ odp_pktout_queue_param_t out_param;
+ odp_pktio_t pktio;
+ odp_pktio_config_t config;
+
+ odp_pktio_param_init(&pktio_param);
+
+ pktio = odp_pktio_open(name, pool, &pktio_param);
+ if (pktio == ODP_PKTIO_INVALID) {
+ printf("Error: failed to open %s\n", name);
+ exit(1);
+ }
+
+ odp_pktio_config_init(&config);
+ config.parser.layer = ODP_PROTO_LAYER_L2;
+ odp_pktio_config(pktio, &config);
+
+ odp_pktin_queue_param_init(&in_param);
+ odp_pktout_queue_param_init(&out_param);
+
+ in_param.op_mode = ODP_PKTIO_OP_MT_UNSAFE;
+
+ if (odp_pktin_queue_config(pktio, &in_param)) {
+ printf("Error: failed to config input queue for %s\n", name);
+ exit(1);
+ }
+
+ out_param.op_mode = ODP_PKTIO_OP_MT_UNSAFE;
+
+ if (odp_pktout_queue_config(pktio, &out_param)) {
+ printf("Error: failed to config output queue for %s\n", name);
+ exit(1);
+ }
+
+ if (odp_pktin_queue(pktio, pktin, 1) != 1) {
+ printf("Error: pktin queue query failed for %s\n", name);
+ exit(1);
+ }
+ if (odp_pktout_queue(pktio, pktout, 1) != 1) {
+ printf("Error: pktout queue query failed for %s\n", name);
+ exit(1);
+ }
+ return pktio;
+}
+
+/*
+ * Fill packets' eth addresses and convert packets to events
+ *
+ * pkt_tbl Array of packets
+ * event_tbl[out] Array of events
+ * num Number of packets in the array
+ */
+static inline unsigned int prep_events(odp_packet_t pkt_tbl[],
+ odp_event_t event_tbl[],
+ unsigned int num)
+{
+ unsigned int i;
+ unsigned int events = 0;
+
+ if (!global->appl.dst_change && !global->appl.src_change) {
+ odp_packet_to_event_multi(pkt_tbl, event_tbl, num);
+ return num;
+ }
+
+ for (i = 0; i < num; ++i) {
+ odp_packet_t pkt = pkt_tbl[i];
+ odph_ethhdr_t *eth;
+
+ odp_packet_prefetch(pkt, 0, ODPH_ETHHDR_LEN);
+
+ if (odp_unlikely(!odp_packet_has_eth(pkt))) {
+ odp_packet_free(pkt);
+ continue;
+ }
+
+ eth = odp_packet_data(pkt);
+
+ if (global->appl.src_change)
+ eth->src = global->src_addr;
+
+ if (global->appl.dst_change)
+ eth->dst = global->dst_addr;
+
+ event_tbl[events++] = odp_packet_to_event(pkt);
+ }
+ return events;
+}
+
+static inline int rx_thread(void *arg)
+{
+ thread_args_t *thr_args = arg;
+ odp_event_t event_tbl[MAX_PKT_BURST];
+ odp_packet_t pkt_tbl[MAX_PKT_BURST];
+ odp_pktin_queue_t pktin_queue = global->if0in;
+ odp_queue_t out_queue = thr_args->tx_queue;
+ stats_t *stats = &thr_args->stats;
+ int pkts, events, sent, drops;
+
+ odp_barrier_wait(&global->init_barrier);
+
+ while (!odp_atomic_load_u32(&global->exit_threads)) {
+ pkts = odp_pktin_recv(pktin_queue, pkt_tbl, MAX_PKT_BURST);
+ if (odp_unlikely(pkts <= 0))
+ continue;
+
+ stats->s.rx_cnt += pkts;
+
+ events = prep_events(pkt_tbl, event_tbl, pkts);
+ drops = events - pkts;
+ if (odp_unlikely(drops))
+ stats->s.rx_drops += pkts - events;
+
+ sent = odp_queue_enq_multi(out_queue, event_tbl, events);
+ if (odp_unlikely(sent < 0))
+ sent = 0;
+
+ stats->s.tx_cnt += sent;
+
+ drops = events - sent;
+ if (odp_unlikely(drops)) {
+ stats->s.tx_drops += drops;
+ odp_packet_free_multi(&pkt_tbl[sent], drops);
+ }
+ }
+
+ /* Wait until pktio devices are stopped */
+ odp_barrier_wait(&global->term_barrier);
+
+ return 0;
+}
+
+static inline int tx_thread(void *arg)
+{
+ thread_args_t *thr_args = arg;
+ odp_event_t event_tbl[MAX_PKT_BURST];
+ odp_packet_t pkt_tbl[MAX_PKT_BURST];
+ odp_queue_t rx_queue = thr_args->rx_queue;
+ odp_pktout_queue_t pktout_queue = global->if1out;
+ stats_t *stats = &thr_args->stats;
+ int events, sent, tx_drops;
+
+ odp_barrier_wait(&global->init_barrier);
+
+ while (!odp_atomic_load_u32(&global->exit_threads)) {
+ events = odp_queue_deq_multi(rx_queue, event_tbl,
+ MAX_PKT_BURST);
+ if (odp_unlikely(events <= 0))
+ continue;
+
+ stats->s.rx_cnt += events;
+
+ odp_packet_from_event_multi(pkt_tbl, event_tbl, events);
+
+ sent = odp_pktout_send(pktout_queue, pkt_tbl, events);
+ if (odp_unlikely(sent < 0))
+ sent = 0;
+
+ stats->s.tx_cnt += sent;
+
+ tx_drops = events - sent;
+ if (odp_unlikely(tx_drops)) {
+ stats->s.tx_drops += tx_drops;
+ odp_packet_free_multi(&pkt_tbl[sent], tx_drops);
+ }
+ }
+
+ /* Wait until pktio devices are stopped */
+ odp_barrier_wait(&global->term_barrier);
+
+ /* Empty queue before exiting */
+ events = 1;
+ while (events > 0) {
+ events = odp_queue_deq_multi(rx_queue, event_tbl,
+ MAX_PKT_BURST);
+
+ if (events > 0)
+ odp_event_free_multi(event_tbl, events);
+ }
+
+ return 0;
+}
+
+/*
+ * Work on packets
+ */
+static inline void work_on_events(odp_event_t event_tbl[], unsigned int num)
+{
+ unsigned int i;
+
+ for (i = 0; i < num; i++) {
+ odp_packet_t pkt = odp_packet_from_event(event_tbl[i]);
+
+ if (odp_hash_crc32c(odp_packet_data(pkt),
+ odp_packet_seg_len(pkt), 123) == DUMMY_HASH)
+ printf("Dummy hash match\n");
+ }
+}
+
+static inline int worker_thread(void *arg ODP_UNUSED)
+{
+ thread_args_t *thr_args = arg;
+ odp_event_t event_tbl[MAX_PKT_BURST];
+ stats_t *stats = &thr_args->stats;
+ odp_queue_t rx_queue = thr_args->rx_queue;
+ odp_queue_t tx_queue = thr_args->tx_queue;
+ int events, sent, tx_drops;
+ int extra_work = global->appl.extra_work;
+
+ odp_barrier_wait(&global->init_barrier);
+
+ while (!odp_atomic_load_u32(&global->exit_threads)) {
+ events = odp_queue_deq_multi(rx_queue, event_tbl,
+ MAX_PKT_BURST);
+
+ if (odp_unlikely(events <= 0))
+ continue;
+
+ stats->s.rx_cnt += events;
+
+ if (extra_work)
+ work_on_events(event_tbl, events);
+
+ sent = odp_queue_enq_multi(tx_queue, event_tbl, events);
+ if (odp_unlikely(sent < 0))
+ sent = 0;
+
+ stats->s.tx_cnt += sent;
+
+ tx_drops = events - sent;
+ if (odp_unlikely(tx_drops)) {
+ stats->s.tx_drops += tx_drops;
+ odp_event_free_multi(&event_tbl[sent], tx_drops);
+ }
+ }
+
+ /* Wait until pktio devices are stopped */
+ odp_barrier_wait(&global->term_barrier);
+
+ /* Empty queue before exiting */
+ events = 1;
+ while (events > 0) {
+ events = odp_queue_deq_multi(rx_queue, event_tbl,
+ MAX_PKT_BURST);
+
+ if (events > 0)
+ odp_event_free_multi(event_tbl, events);
+ }
+
+ return 0;
+}
+
+static int setup_thread_masks(odp_cpumask_t *thr_mask_rx,
+ odp_cpumask_t *thr_mask_tx,
+ odp_cpumask_t *thr_mask_workers,
+ int num_workers)
+{
+ odp_cpumask_t cpumask;
+ int num_threads = 0;
+ int i, cpu;
+
+ if (num_workers > MAX_WORKERS) {
+ printf("Worker count limited to MAX_WORKERS define (=%d)\n",
+ MAX_WORKERS);
+ num_workers = MAX_WORKERS;
+ }
+
+ /* Two threads required for RX and TX*/
+ num_threads = num_workers + 2;
+
+ num_workers = odp_cpumask_default_worker(&cpumask, num_threads);
+ if (num_workers != num_threads) {
+ printf("Error: Not enough available CPU cores: %d/%d\n",
+ num_workers, num_threads);
+ exit(1);
+ }
+
+ odp_cpumask_zero(thr_mask_rx);
+ odp_cpumask_zero(thr_mask_tx);
+ odp_cpumask_zero(thr_mask_workers);
+
+ cpu = odp_cpumask_first(&cpumask);
+ for (i = 0; i < num_threads; i++) {
+ if (i == 0)
+ odp_cpumask_set(thr_mask_rx, cpu);
+ else if (i == 1)
+ odp_cpumask_set(thr_mask_tx, cpu);
+ else
+ odp_cpumask_set(thr_mask_workers, cpu);
+ cpu = odp_cpumask_next(&cpumask, cpu);
+ }
+
+ return num_threads;
+}
+
+/*
+ * Print statistics
+ *
+ * num_workers Number of worker threads
+ * thr_stats Pointers to stats storage
+ * duration Number of seconds to loop in
+ * timeout Number of seconds for stats calculation
+ */
+static int print_speed_stats(int num_workers, stats_t **thr_stats,
+ int duration, int timeout)
+{
+ uint64_t total_pkts = 0;
+ uint64_t pkts_prev = 0;
+ uint64_t maximum_pps = 0;
+ stats_t thr_stats_prev[num_workers];
+ int i;
+ int elapsed = 0;
+ int stats_enabled = 1;
+ int loop_forever = (duration == 0);
+
+ memset(thr_stats_prev, 0, sizeof(thr_stats_prev));
+
+ if (timeout <= 0) {
+ stats_enabled = 0;
+ timeout = 1;
+ }
+
+ /* Wait for all threads to be ready*/
+ odp_barrier_wait(&global->init_barrier);
+
+ do {
+ uint64_t total_rx_drops = 0;
+ uint64_t total_tx_drops = 0;
+ uint64_t pps;
+
+ sleep(timeout);
+
+ for (i = 0; i < num_workers; i++) {
+ uint64_t rx_cnt = thr_stats[i]->s.rx_cnt;
+ uint64_t tx_cnt = thr_stats[i]->s.tx_cnt;
+ uint64_t rx_drops = thr_stats[i]->s.rx_drops;
+ uint64_t tx_drops = thr_stats[i]->s.tx_drops;
+
+ /* Count only transmitted packets */
+ if (i == (num_workers - 1))
+ total_pkts = tx_cnt;
+
+ total_rx_drops += rx_drops;
+ total_tx_drops += tx_drops;
+
+ pps = (tx_cnt - thr_stats_prev[i].s.tx_cnt) / timeout;
+ thr_stats_prev[i].s.pps = pps;
+ thr_stats_prev[i].s.rx_cnt = rx_cnt;
+ thr_stats_prev[i].s.tx_cnt = tx_cnt;
+ thr_stats_prev[i].s.rx_drops = rx_drops;
+ thr_stats_prev[i].s.tx_drops = tx_drops;
+ }
+ if (stats_enabled) {
+ printf("----------------------------------------\n");
+ for (i = 0; i < num_workers; i++) {
+ if (i == 0)
+ printf("RX thread: ");
+ else if (i == (num_workers - 1))
+ printf("TX thread: ");
+ else
+ printf("Worker %d: ", i - 1);
+
+ printf("%" PRIu64 " pps, "
+ "%" PRIu64 " rx drops, "
+ "%" PRIu64 " tx drops\n",
+ thr_stats_prev[i].s.pps,
+ thr_stats_prev[i].s.rx_drops,
+ thr_stats_prev[i].s.tx_drops);
+ }
+ pps = (total_pkts - pkts_prev) / timeout;
+ if (pps > maximum_pps)
+ maximum_pps = pps;
+ printf("TOTAL: %" PRIu64 " pps, "
+ "%" PRIu64 " rx drops, "
+ "%" PRIu64 " tx drops, "
+ "%" PRIu64 " max pps\n",
+ pps, total_rx_drops, total_tx_drops,
+ maximum_pps);
+
+ pkts_prev = total_pkts;
+ }
+ elapsed += timeout;
+ } while (!odp_atomic_load_u32(&global->exit_threads) && (loop_forever ||
+ (elapsed < duration)));
+
+ if (stats_enabled)
+ printf("TEST RESULT: %" PRIu64 " maximum packets per second.\n",
+ maximum_pps);
+
+ return total_pkts > 0 ? 0 : -1;
+}
+
+/*
+ * Print system and application info
+ */
+static void print_info(char *progname, appl_args_t *appl_args)
+{
+ odp_sys_info_print();
+
+ printf("Running ODP appl: \"%s\"\n"
+ "-----------------\n"
+ "Using IFs: %s %s\n"
+ "Worker stages: %d\n"
+ "Extra work: %d\n\n",
+ progname, appl_args->if_names[0], appl_args->if_names[1],
+ appl_args->num_workers, appl_args->extra_work);
+
+ fflush(NULL);
+}
+
+/*
+ * Print usage information
+ */
+static void usage(char *progname)
+{
+ printf("\n"
+ "OpenDataPlane simple pipeline example application.\n"
+ "\n"
+ "Usage: %s [options]\n"
+ "\n"
+ " E.g. %s -i eth0,eth1 -e -w 3\n\n"
+ " ---- ---- ---- ---- ----\n"
+ " | RX | -> | W1 | -> | W2 | -> | W3 | -> | TX |\n"
+ " ---- ---- ---- ---- ----\n\n"
+ " In the above example,\n"
+ " each application stage is executed by a separate CPU thread and the stages\n"
+ " are connected using plain queues. The RX stage receives packets from eth0 and\n"
+ " enqueues them to the first worker stage (W1). The workers stages calculate\n"
+ " CRC-32C over packet data. After the final worker stage (W3) has processed\n"
+ " packets they are enqueued to the TX stage, which transmits the packets out\n"
+ " from interface eth1.\n"
+ "\n"
+ "Mandatory OPTIONS:\n"
+ " -i, --interface <name> Two eth interfaces (comma-separated, no spaces)\n"
+ "\n"
+ "Optional OPTIONS:\n"
+ " -a, --accuracy <sec> Time in seconds get print statistics\n"
+ " (default is 10 seconds).\n"
+ " -d, --dst_change <arg> 0: Don't change packets' dst eth addresses\n"
+ " 1: Change packets' dst eth addresses (default)\n"
+ " -s, --src_change <arg> 0: Don't change packets' src eth addresses\n"
+ " 1: Change packets' src eth addresses (default)\n"
+ " -r, --dst_addr <addr> Destination address\n"
+ " Requires also the -d flag to be set\n"
+ " -t, --time <sec> Time in seconds to run\n"
+ " -w, --workers <num> Number of worker stages (default 0)\n"
+ " -e, --extra-work Calculate CRC-32C over packet data in worker stage\n"
+ " -h, --help Display help and exit\n\n"
+ "\n", NO_PATH(progname), NO_PATH(progname)
+ );
+}
+
+/*
+ * Parse and store the command line arguments
+ *
+ * argc Argument count
+ * argv Argument vector
+ * appl_args[out] Storage for application arguments
+ */
+static void parse_args(int argc, char *argv[], appl_args_t *appl_args)
+{
+ char *token;
+ size_t len;
+ int opt;
+ int long_index;
+ int i;
+ int if_count = 0;
+ static const struct option longopts[] = {
+ {"accuracy", required_argument, NULL, 'a'},
+ {"extra-work", no_argument, NULL, 'e'},
+ {"dst_addr", required_argument, NULL, 'r'},
+ {"dst_change", required_argument, NULL, 'd'},
+ {"src_change", required_argument, NULL, 's'},
+ {"interface", required_argument, NULL, 'i'},
+ {"time", required_argument, NULL, 't'},
+ {"workers", required_argument, NULL, 'w'},
+ {"help", no_argument, NULL, 'h'},
+ {NULL, 0, NULL, 0}
+ };
+
+ static const char *shortopts = "+a:d:er:s:t:i:w:h";
+
+ appl_args->accuracy = 10; /* get and print pps stats second */
+ appl_args->dst_change = 1; /* change eth dst address by default */
+ appl_args->src_change = 1; /* change eth src address by default */
+ appl_args->time = 0; /* loop forever if time to run is 0 */
+ appl_args->extra_work = 0;
+
+ while (1) {
+ opt = getopt_long(argc, argv, shortopts, longopts, &long_index);
+
+ if (opt == -1)
+ break; /* No more options */
+
+ switch (opt) {
+ case 'a':
+ appl_args->accuracy = atoi(optarg);
+ break;
+ case 'd':
+ appl_args->dst_change = atoi(optarg);
+ break;
+ case 'e':
+ appl_args->extra_work = 1;
+ break;
+ case 'r':
+ len = strlen(optarg);
+ if (len == 0) {
+ usage(argv[0]);
+ exit(EXIT_FAILURE);
+ }
+ len += 1; /* add room for '\0' */
+
+ if (odph_eth_addr_parse(&appl_args->dst_addr,
+ optarg) != 0) {
+ printf("invalid MAC address\n");
+ usage(argv[0]);
+ exit(EXIT_FAILURE);
+ }
+
+ appl_args->dst_set = 1;
+
+ break;
+ case 's':
+ appl_args->src_change = atoi(optarg);
+ break;
+ case 't':
+ appl_args->time = atoi(optarg);
+ break;
+ case 'i':
+ len = strlen(optarg);
+ if (len == 0) {
+ usage(argv[0]);
+ exit(EXIT_FAILURE);
+ }
+ len += 1; /* add room for '\0' */
+
+ appl_args->if_str = malloc(len);
+ if (appl_args->if_str == NULL) {
+ usage(argv[0]);
+ exit(EXIT_FAILURE);
+ }
+
+ /* count the number of tokens separated by ',' */
+ strcpy(appl_args->if_str, optarg);
+ for (token = strtok(appl_args->if_str, ","), i = 0;
+ token != NULL;
+ token = strtok(NULL, ","), i++)
+ ;
+
+ if_count = i;
+
+ if (if_count != 2) {
+ usage(argv[0]);
+ exit(EXIT_FAILURE);
+ }
+
+ /* allocate storage for the if names */
+ appl_args->if_names = calloc(if_count, sizeof(char *));
+
+ /* store the if names (reset names string) */
+ strcpy(appl_args->if_str, optarg);
+ for (token = strtok(appl_args->if_str, ","), i = 0;
+ token != NULL; token = strtok(NULL, ","), i++) {
+ appl_args->if_names[i] = token;
+ }
+ break;
+ case 'w':
+ appl_args->num_workers = atoi(optarg);
+ break;
+ case 'h':
+ usage(argv[0]);
+ exit(EXIT_SUCCESS);
+ break;
+ default:
+ break;
+ }
+ }
+
+ if (if_count != 2) {
+ usage(argv[0]);
+ exit(EXIT_FAILURE);
+ }
+
+ optind = 1; /* reset 'extern optind' from the getopt lib */
+}
+
+int main(int argc, char **argv)
+{
+ odp_cpumask_t thr_mask_rx;
+ odp_cpumask_t thr_mask_tx;
+ odp_cpumask_t thr_mask_worker;
+ odp_init_t init_param;
+ odp_instance_t instance;
+ odp_pool_t pool;
+ odp_pool_capability_t pool_capa;
+ odp_pool_param_t pool_param;
+ odp_queue_capability_t queue_capa;
+ odp_queue_param_t queue_param;
+ odp_shm_t shm;
+ odph_helper_options_t helper_options;
+ odph_thread_t thr_tbl[ODP_THREAD_COUNT_MAX];
+ odph_thread_param_t thr_param[ODP_THREAD_COUNT_MAX];
+ odph_thread_common_param_t thr_common;
+ odph_ethaddr_t new_addr;
+ stats_t *stats[ODP_THREAD_COUNT_MAX];
+ thread_args_t *thr_args;
+ uint32_t pkt_len, seg_len, pkt_num;
+ int num_threads, num_workers;
+ int i;
+ int ret;
+
+ /* Let helper collect its own arguments (e.g. --odph_proc) */
+ argc = odph_parse_options(argc, argv);
+ if (odph_options(&helper_options)) {
+ printf("Error: reading ODP helper options failed.\n");
+ exit(EXIT_FAILURE);
+ }
+
+ odp_init_param_init(&init_param);
+ init_param.mem_model = helper_options.mem_model;
+
+ if (odp_init_global(&instance, &init_param, NULL)) {
+ printf("Error: ODP global init failed.\n");
+ exit(1);
+ }
+
+ if (odp_init_local(instance, ODP_THREAD_CONTROL)) {
+ printf("Error: ODP local init failed.\n");
+ exit(1);
+ }
+
+ /* Reserve memory for global data */
+ shm = odp_shm_reserve("simple_pipeline", sizeof(global_data_t),
+ ODP_CACHE_LINE_SIZE, 0);
+ if (shm == ODP_SHM_INVALID) {
+ printf("Error: shared mem reserve failed.\n");
+ exit(EXIT_FAILURE);
+ }
+
+ global = odp_shm_addr(shm);
+ if (global == NULL) {
+ printf("Error: shared mem alloc failed.\n");
+ exit(EXIT_FAILURE);
+ }
+
+ memset(global, 0, sizeof(global_data_t));
+ odp_atomic_init_u32(&global->exit_threads, 0);
+
+ signal(SIGINT, sig_handler);
+
+ /* Parse and store the application arguments */
+ parse_args(argc, argv, &global->appl);
+
+ num_threads = setup_thread_masks(&thr_mask_rx, &thr_mask_tx,
+ &thr_mask_worker,
+ global->appl.num_workers);
+ num_workers = num_threads - 2;
+
+ /* Print both system and application information */
+ print_info(NO_PATH(argv[0]), &global->appl);
+
+ /* Create queues for pipeline */
+ if (odp_queue_capability(&queue_capa)) {
+ printf("Error: reading queue capability failed.\n");
+ exit(EXIT_FAILURE);
+ }
+ if (queue_capa.plain.max_num < (unsigned int)num_threads) {
+ printf("Error: insufficient number of queues supported.\n");
+ exit(EXIT_FAILURE);
+ }
+
+ odp_queue_param_init(&queue_param);
+ queue_param.type = ODP_QUEUE_TYPE_PLAIN;
+ queue_param.enq_mode = ODP_QUEUE_OP_MT_UNSAFE;
+ queue_param.deq_mode = ODP_QUEUE_OP_MT_UNSAFE;
+ queue_param.size = QUEUE_SIZE;
+ if (queue_capa.plain.max_size &&
+ queue_param.size > queue_capa.plain.max_size)
+ queue_param.size = queue_capa.plain.max_size;
+ for (i = 0; i < num_threads; i++) {
+ odp_queue_t queue = odp_queue_create("plain_queue",
+ &queue_param);
+
+ if (queue == ODP_QUEUE_INVALID) {
+ printf("Error: queue create failed.\n");
+ exit(EXIT_FAILURE);
+ }
+ global->queue[i] = queue;
+ }
+
+ /* Create packet pool */
+ if (odp_pool_capability(&pool_capa)) {
+ printf("Error: reading pool capability failed.\n");
+ exit(EXIT_FAILURE);
+ }
+
+ pkt_len = POOL_PKT_LEN;
+ seg_len = POOL_PKT_LEN;
+ pkt_num = POOL_PKT_NUM;
+
+ if (pool_capa.pkt.max_len && pkt_len > pool_capa.pkt.max_len)
+ pkt_len = pool_capa.pkt.max_len;
+
+ if (pool_capa.pkt.max_seg_len && seg_len > pool_capa.pkt.max_seg_len)
+ seg_len = pool_capa.pkt.max_seg_len;
+
+ if (pool_capa.pkt.max_num && pkt_num > pool_capa.pkt.max_num)
+ pkt_num = pool_capa.pkt.max_num;
+
+ odp_pool_param_init(&pool_param);
+ pool_param.pkt.seg_len = seg_len;
+ pool_param.pkt.len = pkt_len;
+ pool_param.pkt.num = pkt_num;
+ pool_param.type = ODP_POOL_PACKET;
+
+ pool = odp_pool_create("packet pool", &pool_param);
+ if (pool == ODP_POOL_INVALID) {
+ printf("Error: packet pool create failed.\n");
+ exit(1);
+ }
+
+ global->if0 = create_pktio(global->appl.if_names[0], pool,
+ &global->if0in, &global->if0out);
+ global->if1 = create_pktio(global->appl.if_names[1], pool,
+ &global->if1in, &global->if1out);
+
+ /* Save TX interface Ethernet address */
+ if (odp_pktio_mac_addr(global->if1, global->src_addr.addr,
+ ODPH_ETHADDR_LEN) != ODPH_ETHADDR_LEN) {
+ printf("Error: TX interface Ethernet address unknown\n");
+ exit(EXIT_FAILURE);
+ }
+
+ /* Save destination Ethernet address */
+ if (global->appl.dst_change) {
+ /* 02:00:00:00:00:XX */
+ memset(&new_addr, 0, sizeof(odph_ethaddr_t));
+ if (global->appl.dst_set) {
+ memcpy(&new_addr, &global->appl.dst_addr,
+ sizeof(odph_ethaddr_t));
+ } else {
+ new_addr.addr[0] = 0x02;
+ new_addr.addr[5] = 1;
+ }
+ global->dst_addr = new_addr;
+ }
+
+ if (odp_pktio_start(global->if0)) {
+ printf("Error: unable to start input interface\n");
+ exit(1);
+ }
+ if (odp_pktio_start(global->if1)) {
+ printf("Error: unable to start output interface\n");
+ exit(1);
+ }
+
+ odp_barrier_init(&global->init_barrier, num_threads + 1);
+ odp_barrier_init(&global->term_barrier, num_threads + 1);
+
+ for (i = 0; i < num_threads; i++)
+ stats[i] = &global->thread[i].stats;
+
+ memset(thr_tbl, 0, sizeof(thr_tbl));
+ odph_thread_common_param_init(&thr_common);
+
+ thr_common.instance = instance;
+
+ /* RX thread */
+ thr_args = &global->thread[0];
+ thr_args->tx_queue = global->queue[0];
+ odph_thread_param_init(&thr_param[0]);
+ thr_param[0].start = rx_thread;
+ thr_param[0].arg = thr_args;
+ thr_param[0].thr_type = ODP_THREAD_WORKER;
+ thr_common.cpumask = &thr_mask_rx;
+ odph_thread_create(thr_tbl, &thr_common, thr_param, 1);
+
+ /* Worker threads */
+ for (i = 0; i < num_workers; i++) {
+ thr_args = &global->thread[i + 1];
+ thr_args->rx_queue = global->queue[i];
+ thr_args->tx_queue = global->queue[i + 1];
+
+ odph_thread_param_init(&thr_param[i]);
+ thr_param[i].start = worker_thread;
+ thr_param[i].arg = thr_args;
+ thr_param[i].thr_type = ODP_THREAD_WORKER;
+ }
+
+ if (num_workers) {
+ thr_common.cpumask = &thr_mask_worker;
+ odph_thread_create(&thr_tbl[1], &thr_common, thr_param,
+ num_workers);
+ }
+
+ /* TX thread */
+ thr_args = &global->thread[num_threads - 1];
+ thr_args->rx_queue = global->queue[num_workers];
+ odph_thread_param_init(&thr_param[0]);
+ thr_param[0].start = tx_thread;
+ thr_param[0].arg = thr_args;
+ thr_param[0].thr_type = ODP_THREAD_WORKER;
+ thr_common.cpumask = &thr_mask_tx;
+ odph_thread_create(&thr_tbl[num_threads - 1], &thr_common, thr_param,
+ 1);
+
+ ret = print_speed_stats(num_threads, stats, global->appl.time,
+ global->appl.accuracy);
+
+ if (odp_pktio_stop(global->if0)) {
+ printf("Error: failed to stop interface %s\n", argv[1]);
+ exit(EXIT_FAILURE);
+ }
+ if (odp_pktio_stop(global->if1)) {
+ printf("Error: failed to stop interface %s\n", argv[2]);
+ exit(EXIT_FAILURE);
+ }
+
+ odp_atomic_store_u32(&global->exit_threads, 1);
+ odp_barrier_wait(&global->term_barrier);
+
+ odph_thread_join(thr_tbl, num_threads);
+
+ if (odp_pktio_close(global->if0)) {
+ printf("Error: failed to close interface %s\n", argv[1]);
+ exit(EXIT_FAILURE);
+ }
+ if (odp_pktio_close(global->if1)) {
+ printf("Error: failed to close interface %s\n", argv[2]);
+ exit(EXIT_FAILURE);
+ }
+
+ for (i = 0; i < num_threads; i++) {
+ if (odp_queue_destroy(global->queue[i])) {
+ printf("Error: failed to destroy queue %d\n", i);
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ if (odp_pool_destroy(pool)) {
+ printf("Error: pool destroy\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (odp_shm_free(shm)) {
+ printf("Error: shm free global data\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (odp_term_local()) {
+ printf("Error: term local\n");
+ exit(EXIT_FAILURE);
+ }
+
+ if (odp_term_global(instance)) {
+ printf("Error: term global\n");
+ exit(EXIT_FAILURE);
+ }
+
+ return ret;
+}
diff --git a/example/simple_pipeline/simple_pipeline_run.sh b/example/simple_pipeline/simple_pipeline_run.sh
new file mode 100755
index 000000000..ba66e506e
--- /dev/null
+++ b/example/simple_pipeline/simple_pipeline_run.sh
@@ -0,0 +1,37 @@
+#!/bin/bash
+#
+# SPDX-License-Identifier: BSD-3-Clause
+# Copyright (c) 2019 Nokia
+#
+
+# Exit code expected by automake for skipped tests
+TEST_SKIPPED=77
+
+if [ -f ./pktio_env ]; then
+ . ./pktio_env
+else
+ echo "BUG: unable to find pktio_env!"
+ echo "pktio_env has to be in current directory"
+ exit 1
+fi
+
+if [ $(nproc --all) -lt 3 ]; then
+ echo "Not enough CPU cores. Skipping test."
+ exit $TEST_SKIPPED
+fi
+
+setup_interfaces
+
+./odp_simple_pipeline${EXEEXT} -i $IF0,$IF1 -e -t 1 -a 1
+STATUS=$?
+
+if [ "$STATUS" -ne 0 ]; then
+ echo "Error: status was: $STATUS, expected 0"
+ exit 1
+fi
+
+validate_result
+
+cleanup_interfaces
+
+exit 0
diff --git a/example/simple_pipeline/udp64.pcap b/example/simple_pipeline/udp64.pcap
new file mode 100644
index 000000000..45f9d6e63
--- /dev/null
+++ b/example/simple_pipeline/udp64.pcap
Binary files differ