diff options
Diffstat (limited to 'test/performance/odp_pktio_ordered.c')
-rw-r--r-- | test/performance/odp_pktio_ordered.c | 1337 |
1 files changed, 1337 insertions, 0 deletions
diff --git a/test/performance/odp_pktio_ordered.c b/test/performance/odp_pktio_ordered.c new file mode 100644 index 000000000..d7e43c4bc --- /dev/null +++ b/test/performance/odp_pktio_ordered.c @@ -0,0 +1,1337 @@ +/* Copyright (c) 2016, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "config.h" + +/** + * @file + * + * @example odp_pktio_ordered.c ODP ordered pktio test application + */ + +/** enable strtok */ +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + +#include <stdlib.h> +#include <getopt.h> +#include <unistd.h> +#include <errno.h> +#include <inttypes.h> + +#include <test_debug.h> +#include "dummy_crc.h" + +#include <odp_api.h> +#include <odp/helper/threads.h> +#include <odp/helper/eth.h> +#include <odp/helper/ip.h> +#include <odp/helper/udp.h> + +/** Jenkins hash support. + * + * Copyright (C) 2006 Bob Jenkins (bob_jenkins@burtleburtle.net) + * + * http://burtleburtle.net/bob/hash/ + * + * These are the credits from Bob's sources: + * + * lookup3.c, by Bob Jenkins, May 2006, Public Domain. + * + * These are functions for producing 32-bit hashes for hash table lookup. + * hashword(), hashlittle(), hashlittle2(), hashbig(), mix(), and final() + * are externally useful functions. Routines to test the hash are included + * if SELF_TEST is defined. You can use this free for any purpose. It's in + * the public domain. It has no warranty. + * + * $FreeBSD$ + */ +#define rot(x, k) (((x) << (k)) | ((x) >> (32 - (k)))) + +#define mix(a, b, c) \ +{ \ + a -= c; a ^= rot(c, 4); c += b; \ + b -= a; b ^= rot(a, 6); a += c; \ + c -= b; c ^= rot(b, 8); b += a; \ + a -= c; a ^= rot(c, 16); c += b; \ + b -= a; b ^= rot(a, 19); a += c; \ + c -= b; c ^= rot(b, 4); b += a; \ +} + +#define final(a, b, c) \ +{ \ + c ^= b; c -= rot(b, 14); \ + a ^= c; a -= rot(c, 11); \ + b ^= a; b -= rot(a, 25); \ + c ^= b; c -= rot(b, 16); \ + a ^= c; a -= rot(c, 4); \ + b ^= a; b -= rot(a, 14); \ + c ^= b; c -= rot(b, 24); \ +} + +#define JHASH_GOLDEN_RATIO 0x9e3779b9 + +/** Maximum number of worker threads */ +#define MAX_WORKERS 64 + +/** Number of packet buffers in the memory pool */ +#define PKT_POOL_SIZE 8192 + +/** Buffer size of the packet pool buffer in bytes*/ +#define PKT_POOL_BUF_SIZE 1856 + +/** Packet user area size in bytes */ +#define PKT_UAREA_SIZE 32 + +/** Maximum number of packets in a burst */ +#define MAX_PKT_BURST 32 + +/** Maximum number of pktio queues per interface */ +#define MAX_QUEUES 32 + +/** Maximum number of pktio interfaces */ +#define MAX_PKTIOS 8 + +/** Maximum number of packet flows */ +#define MAX_FLOWS 128 + +ODP_STATIC_ASSERT(MAX_PKTIOS < MAX_FLOWS, + "MAX_FLOWS must be greater than MAX_PKTIOS\n"); + +/** Minimum valid packet length */ +#define MIN_PACKET_LEN (ODPH_ETHHDR_LEN + ODPH_IPV4HDR_LEN + ODPH_UDPHDR_LEN) + +/** Default number of input queues */ +#define DEF_NUM_RX_QUEUES 1 + +/** Default number of flows */ +#define DEF_NUM_FLOWS 12 + +/** Default number of extra processing rounds */ +#define DEF_EXTRA_ROUNDS 15 + +/** Default statistics print interval in seconds */ +#define DEF_STATS_INT 1 + +/** Get rid of path in filename - only for unix-type paths using '/' */ +#define NO_PATH(file_name) (strrchr((file_name), '/') ? \ + strrchr((file_name), '/') + 1 : (file_name)) + +/** + * Packet input mode + */ +typedef enum pktin_mode_t { + SCHED_ORDERED = 0, + SCHED_ATOMIC, + SCHED_PARALLEL +} pktin_mode_t; + +/** + * Parsed command line application arguments + */ +typedef struct { + int cpu_count; /**< CPU count */ + int if_count; /**< Number of interfaces to be used */ + int addr_count; /**< Number of dst addresses to be used */ + int num_rx_q; /**< Number of input queues per interface */ + int num_flows; /**< Number of packet flows */ + int extra_rounds; /**< Number of extra input processing rounds */ + char **if_names; /**< Array of pointers to interface names */ + odph_ethaddr_t addrs[MAX_PKTIOS]; /**< Array of dst addresses */ + pktin_mode_t in_mode; /**< Packet input mode */ + int time; /**< Time in seconds to run. */ + int accuracy; /**< Statistics print interval */ + char *if_str; /**< Storage for interface names */ +} appl_args_t; + +static int exit_threads; /**< Break workers loop if set to 1 */ + +/** + * Queue context + */ +typedef struct { + odp_bool_t input_queue; /**< Input queue */ + uint64_t idx; /**< Queue index */ + uint64_t seq[MAX_FLOWS]; /**< Per flow sequence numbers */ +} qcontext_t; + +/** + * Flow info stored in the packet user area + */ +typedef struct { + uint64_t seq; /**< Sequence number */ + uint32_t crc; /**< CRC hash */ + uint16_t idx; /**< Flow index */ + uint8_t src_idx; /**< Source port index */ + uint8_t dst_idx; /**< Destination port index */ + +} flow_t; +ODP_STATIC_ASSERT(sizeof(flow_t) <= PKT_UAREA_SIZE, + "Flow data doesn't fit in the packet user area\n"); + +/** + * Statistics + */ +typedef union { + struct { + /** Number of forwarded packets */ + uint64_t packets; + /** Packets dropped due to a receive error */ + uint64_t rx_drops; + /** Packets dropped due to a transmit error */ + uint64_t tx_drops; + /** Packets with invalid sequence number */ + uint64_t invalid_seq; + } s; + + uint8_t padding[ODP_CACHE_LINE_SIZE]; +} stats_t ODP_ALIGNED_CACHE; + +/** + * IPv4 5-tuple + */ +typedef struct { + int32_t src_ip; + int32_t dst_ip; + int16_t src_port; + int16_t dst_port; + int8_t proto; + int8_t pad0; + int16_t pad1; +} ipv4_tuple5_t; + +/** + * Packet headers + */ +typedef struct { + odph_ethhdr_t *eth; + odph_ipv4hdr_t *ipv4; + odph_udphdr_t *udp; +} packet_hdr_t; + +/** + * Thread specific arguments + */ +typedef struct thread_args_t { + stats_t *stats; /**< Pointer to per thread statistics */ +} thread_args_t; + +/** + * Grouping of all global data + */ +typedef struct { + /** Per thread packet stats */ + stats_t stats[MAX_WORKERS]; + /** Application (parsed) arguments */ + appl_args_t appl; + /** Thread specific arguments */ + thread_args_t thread[MAX_WORKERS]; + /** Table of port ethernet addresses */ + odph_ethaddr_t port_eth_addr[MAX_PKTIOS]; + /** Table of dst ethernet addresses */ + odph_ethaddr_t dst_eth_addr[MAX_PKTIOS]; + /** Table of dst ports */ + int dst_port[MAX_PKTIOS]; + /** Table of atomic queues for flows */ + odp_queue_t fqueue[MAX_PKTIOS][MAX_FLOWS]; + /** Table of flow queue contexts */ + qcontext_t flow_qcontext[MAX_PKTIOS][MAX_FLOWS]; + /** Table of input queue contexts */ + qcontext_t input_qcontext[MAX_PKTIOS][MAX_QUEUES]; + /** Table of pktio handles */ + struct { + odp_pktio_t pktio; + odp_pktout_queue_t pktout[MAX_FLOWS]; + odp_queue_t pktin[MAX_QUEUES]; + int num_rx_queue; + int num_tx_queue; + } pktios[MAX_PKTIOS]; +} args_t; + +/** Global pointer to args */ +static args_t *gbl_args; + +/** Global barrier to synchronize main and workers */ +static odp_barrier_t barrier; + +/** + * Lookup the destination port for a given packet + * + * @param pkt ODP packet handle + */ +static inline int lookup_dest_port(odp_packet_t pkt) +{ + int i, src_idx; + odp_pktio_t pktio_src; + + pktio_src = odp_packet_input(pkt); + + for (src_idx = -1, i = 0; gbl_args->pktios[i].pktio + != ODP_PKTIO_INVALID; i++) + if (gbl_args->pktios[i].pktio == pktio_src) + src_idx = i; + + if (src_idx == -1) + LOG_ABORT("Failed to determine pktio input\n"); + + return gbl_args->dst_port[src_idx]; +} + +/** + * Map required packet headers + * + * @param pkt Packet handle + * @param hdr[out] Packet headers + * + * @retval 0 on success + * @retval -1 on failure + */ +static inline int packet_hdr(odp_packet_t pkt, packet_hdr_t *hdr) +{ + uint8_t *udp; + uint16_t eth_type; + uint8_t ihl; + + if (odp_unlikely(odp_packet_seg_len(pkt) < MIN_PACKET_LEN)) + return -1; + + if (odp_unlikely(!odp_packet_has_eth(pkt))) + return -1; + + hdr->eth = odp_packet_l2_ptr(pkt, NULL); + eth_type = odp_be_to_cpu_16(hdr->eth->type); + if (odp_unlikely(eth_type != ODPH_ETHTYPE_IPV4)) + return -1; + + hdr->ipv4 = (odph_ipv4hdr_t *)(hdr->eth + 1); + if (odp_unlikely(hdr->ipv4->proto != ODPH_IPPROTO_UDP)) + return -1; + + ihl = ODPH_IPV4HDR_IHL(hdr->ipv4->ver_ihl); + if (odp_unlikely(ihl < ODPH_IPV4HDR_IHL_MIN)) + return -1; + + udp = (uint8_t *)hdr->ipv4 + (ihl * 4); + + hdr->udp = (odph_udphdr_t *)udp; + + return 0; +} + +/** + * Compute hash from a 5-tuple + * + * @param key IPv4 5-tuple + * + * @return 32-bit hash value + */ +static inline uint64_t calc_ipv4_5tuple_hash(ipv4_tuple5_t *tuple) +{ + uint32_t a, b, c; + + a = tuple->proto + JHASH_GOLDEN_RATIO; + b = tuple->src_ip + JHASH_GOLDEN_RATIO; + c = tuple->dst_ip + JHASH_GOLDEN_RATIO; + + mix(a, b, c); + + a += (tuple->src_port << 16) + tuple->dst_port + JHASH_GOLDEN_RATIO; + final(a, b, c); + + return c; +} + +/** + * Compute packet flow index + * + * @param hdr Packet headers + * + * @return Flow index + */ +static inline uint64_t calc_flow_idx(packet_hdr_t *hdr) +{ + ipv4_tuple5_t tuple; + uint64_t idx; + + tuple.dst_ip = odp_be_to_cpu_32(hdr->ipv4->dst_addr); + tuple.src_ip = odp_be_to_cpu_32(hdr->ipv4->src_addr); + tuple.proto = hdr->ipv4->proto; + tuple.src_port = odp_be_to_cpu_16(hdr->udp->src_port); + tuple.dst_port = odp_be_to_cpu_16(hdr->udp->dst_port); + tuple.pad0 = 0; + tuple.pad1 = 0; + idx = calc_ipv4_5tuple_hash(&tuple); + + return idx % gbl_args->appl.num_flows; +} + +/** + * Fill packet's eth addresses according to the destination port + * + * @param hdr[out] Packet headers + * @param dst_port Destination port + */ +static inline void fill_eth_addrs(packet_hdr_t *hdr, int dst_port) +{ + hdr->eth->src = gbl_args->port_eth_addr[dst_port]; + hdr->eth->dst = gbl_args->dst_eth_addr[dst_port]; +} + +/** + * Process flow queue + * + * @param ev_tbl Array of events + * @param num Number of events in the array + * @param stats Pointer for storing thread statistics + * @param qcontext Source queue context + * @param pktout Arrays of output queues + */ +static inline void process_flow(odp_event_t ev_tbl[], int num, stats_t *stats, + qcontext_t *qcontext, + odp_pktout_queue_t pktout[][MAX_FLOWS]) +{ + odp_packet_t pkt; + flow_t *flow; + uint64_t queue_seq; + int dst_if; + int i; + int sent; + + for (i = 0; i < num; i++) { + pkt = odp_packet_from_event(ev_tbl[i]); + + flow = odp_packet_user_area(pkt); + + queue_seq = qcontext->seq[flow->src_idx]; + + /* Check sequence number */ + if (gbl_args->appl.in_mode != SCHED_PARALLEL && + odp_unlikely(flow->seq != queue_seq)) { + printf("Invalid sequence number: packet_seq=%" PRIu64 "" + " queue_seq=%" PRIu64 ", src_if=%" PRIu8 ", " + "dst_if=%" PRIu8 ", flow=%" PRIu16 "\n", + flow->seq, queue_seq, flow->src_idx, + flow->dst_idx, flow->idx); + qcontext->seq[flow->src_idx] = flow->seq + 1; + stats->s.invalid_seq++; + } else { + qcontext->seq[flow->src_idx]++; + } + + dst_if = flow->dst_idx; + sent = odp_pktout_send(pktout[dst_if][flow->idx], &pkt, 1); + + if (odp_unlikely(sent != 1)) { + stats->s.tx_drops++; + odp_packet_free(pkt); + } + stats->s.packets++; + } +} + +/** + * Process input queue + * + * @param ev_tbl Array of events + * @param num Number of events in the array + * @param stats Pointer for storing thread statistics + * @param qcontext Source queue context + */ +static inline void process_input(odp_event_t ev_tbl[], int num, stats_t *stats, + qcontext_t *qcontext) +{ + flow_t *flow; + flow_t *flow_tbl[MAX_PKT_BURST]; + int ret; + int i, j; + int pkts = 0; + + for (i = 0; i < num; i++) { + odp_packet_t pkt; + packet_hdr_t hdr; + int flow_idx; + + pkt = odp_packet_from_event(ev_tbl[i]); + + odp_packet_prefetch(pkt, 0, MIN_PACKET_LEN); + + ret = packet_hdr(pkt, &hdr); + if (odp_unlikely(ret)) { + odp_packet_free(pkt); + stats->s.rx_drops++; + continue; + } + + flow_idx = calc_flow_idx(&hdr); + + fill_eth_addrs(&hdr, flow_idx); + + flow = odp_packet_user_area(pkt); + flow->idx = flow_idx; + flow->src_idx = qcontext->idx; + flow->dst_idx = lookup_dest_port(pkt); + flow_tbl[pkts] = flow; + + /* Simulate "fat pipe" processing by generating extra work */ + for (j = 0; j < gbl_args->appl.extra_rounds; j++) + flow->crc = dummy_hash_crc32c(odp_packet_data(pkt), + odp_packet_len(pkt), 0); + pkts++; + } + + if (odp_unlikely(!pkts)) + return; + + /* Set sequence numbers */ + if (gbl_args->appl.in_mode == SCHED_ORDERED) + odp_schedule_order_lock(0); + + for (i = 0; i < pkts; i++) { + flow = flow_tbl[i]; + flow->seq = qcontext->seq[flow->idx]++; + } + + if (gbl_args->appl.in_mode == SCHED_ORDERED) + odp_schedule_order_unlock(0); + + for (i = 0; i < pkts; i++) { + flow = flow_tbl[i]; + ret = odp_queue_enq(gbl_args->fqueue[flow->dst_idx][flow->idx], + ev_tbl[i]); + + if (odp_unlikely(ret != 0)) { + LOG_ERR("odp_queue_enq() failed\n"); + stats->s.tx_drops++; + odp_event_free(ev_tbl[i]); + } else { + stats->s.packets++; + } + } +} + +/** + * Worker thread + * + * @param arg Thread arguments of type 'thread_args_t *' + */ +static int run_worker(void *arg) +{ + odp_event_t ev_tbl[MAX_PKT_BURST]; + odp_queue_t queue; + odp_pktout_queue_t pktout[MAX_PKTIOS][MAX_FLOWS]; + qcontext_t *qcontext; + thread_args_t *thr_args = arg; + stats_t *stats = thr_args->stats; + int pkts; + int i, j; + + memset(pktout, 0, sizeof(pktout)); + + for (i = 0; i < gbl_args->appl.if_count; i++) { + for (j = 0; j < gbl_args->appl.num_flows; j++) { + pktout[i][j] = gbl_args->pktios[i].pktout[j % + gbl_args->pktios[i].num_tx_queue]; + } + } + odp_barrier_wait(&barrier); + + /* Loop packets */ + while (!exit_threads) { + pkts = odp_schedule_multi(&queue, ODP_SCHED_NO_WAIT, ev_tbl, + MAX_PKT_BURST); + if (pkts <= 0) + continue; + + qcontext = odp_queue_context(queue); + + if (qcontext->input_queue) + process_input(ev_tbl, pkts, stats, qcontext); + else + process_flow(ev_tbl, pkts, stats, qcontext, pktout); + } + + /* Free remaining events in queues */ + while (1) { + odp_event_t ev; + + ev = odp_schedule(NULL, + odp_schedule_wait_time(ODP_TIME_SEC_IN_NS)); + + if (ev == ODP_EVENT_INVALID) + break; + + odp_event_free(ev); + } + + return 0; +} + +/** + * Create a pktio handle and associate with input queues + * + * @param dev Name of device to open + * @param index Pktio index + * @param num_rx Number of input queues + * @param num_tx Number of output queues + * @param pool Pool to associate with device for packet RX/TX + * + * @retval 0 on success + * @retval -1 on failure + */ +static int create_pktio(const char *dev, int idx, int num_rx, int num_tx, + odp_pool_t pool) +{ + odp_pktio_t pktio; + odp_pktio_param_t pktio_param; + odp_pktio_capability_t capa; + odp_pktio_config_t config; + odp_pktin_queue_param_t pktin_param; + odp_pktout_queue_param_t pktout_param; + odp_pktio_op_mode_t mode_rx; + odp_pktio_op_mode_t mode_tx; + int i; + + odp_pktio_param_init(&pktio_param); + + pktio_param.in_mode = ODP_PKTIN_MODE_SCHED; + + pktio = odp_pktio_open(dev, pool, &pktio_param); + if (pktio == ODP_PKTIO_INVALID) { + LOG_ERR("Error: failed to open %s\n", dev); + return -1; + } + + printf("Created pktio %" PRIu64 " (%s)\n", + odp_pktio_to_u64(pktio), dev); + + if (odp_pktio_capability(pktio, &capa)) { + LOG_ERR("Error: capability query failed %s\n", dev); + odp_pktio_close(pktio); + return -1; + } + + odp_pktio_config_init(&config); + config.parser.layer = ODP_PKTIO_PARSER_LAYER_L2; + odp_pktio_config(pktio, &config); + + odp_pktin_queue_param_init(&pktin_param); + odp_pktout_queue_param_init(&pktout_param); + + mode_tx = ODP_PKTIO_OP_MT; + mode_rx = ODP_PKTIO_OP_MT; + + if (gbl_args->appl.in_mode == SCHED_ATOMIC) { + pktin_param.queue_param.sched.sync = ODP_SCHED_SYNC_ATOMIC; + } else if (gbl_args->appl.in_mode == SCHED_PARALLEL) { + pktin_param.queue_param.sched.sync = ODP_SCHED_SYNC_PARALLEL; + } else { + pktin_param.queue_param.sched.sync = ODP_SCHED_SYNC_ORDERED; + pktin_param.queue_param.sched.lock_count = 1; + } + pktin_param.queue_param.sched.prio = ODP_SCHED_PRIO_DEFAULT; + pktin_param.queue_param.sched.group = ODP_SCHED_GROUP_ALL; + + if (num_rx > (int)capa.max_input_queues) { + printf("Allocating %i shared input queues, %i requested\n", + capa.max_input_queues, num_rx); + num_rx = capa.max_input_queues; + mode_rx = ODP_PKTIO_OP_MT; + } + + if (num_tx > (int)capa.max_output_queues) { + printf("Allocating %i shared output queues, %i requested\n", + capa.max_output_queues, num_tx); + num_tx = capa.max_output_queues; + mode_tx = ODP_PKTIO_OP_MT; + } + + pktin_param.hash_enable = 1; + pktin_param.hash_proto.proto.ipv4_udp = 1; + pktin_param.num_queues = num_rx; + pktin_param.op_mode = mode_rx; + + pktout_param.op_mode = mode_tx; + pktout_param.num_queues = num_tx; + + if (odp_pktin_queue_config(pktio, &pktin_param)) { + LOG_ERR("Error: input queue config failed %s\n", dev); + return -1; + } + + if (odp_pktout_queue_config(pktio, &pktout_param)) { + LOG_ERR("Error: output queue config failed %s\n", dev); + return -1; + } + + if (odp_pktin_event_queue(pktio, gbl_args->pktios[idx].pktin, + num_rx) != num_rx) { + LOG_ERR("Error: pktin event queue query failed %s\n", + dev); + return -1; + } + + /* Set queue contexts */ + for (i = 0; i < num_rx; i++) { + gbl_args->input_qcontext[idx][i].idx = idx; + gbl_args->input_qcontext[idx][i].input_queue = 1; + + if (odp_queue_context_set(gbl_args->pktios[idx].pktin[i], + &gbl_args->input_qcontext[idx][i], + sizeof(qcontext_t))) { + LOG_ERR("Error: pktin queue context set failed %s\n", + dev); + return -1; + } + } + + if (odp_pktout_queue(pktio, + gbl_args->pktios[idx].pktout, + num_tx) != num_tx) { + LOG_ERR("Error: pktout queue query failed %s\n", dev); + return -1; + } + + printf("Created %i input and %i output queues on (%s)\n", + num_rx, num_tx, dev); + + gbl_args->pktios[idx].num_rx_queue = num_rx; + gbl_args->pktios[idx].num_tx_queue = num_tx; + gbl_args->pktios[idx].pktio = pktio; + + return 0; +} + +/** + * Print statistics + * + * @param num_workers Number of worker threads + * @param thr_stats Pointer to stats storage + * @param duration Number of seconds to loop in + * @param timeout Number of seconds for stats calculation + * + */ +static int print_speed_stats(int num_workers, stats_t *thr_stats, + int duration, int timeout) +{ + uint64_t pkts = 0; + uint64_t pkts_prev = 0; + uint64_t pps; + uint64_t rx_drops, tx_drops, invalid_seq; + uint64_t maximum_pps = 0; + int i; + int elapsed = 0; + int stats_enabled = 1; + int loop_forever = (duration == 0); + + if (timeout <= 0) { + stats_enabled = 0; + timeout = 1; + } + /* Wait for all threads to be ready*/ + odp_barrier_wait(&barrier); + + do { + pkts = 0; + rx_drops = 0; + tx_drops = 0; + invalid_seq = 0; + + sleep(timeout); + + for (i = 0; i < num_workers; i++) { + pkts += thr_stats[i].s.packets; + rx_drops += thr_stats[i].s.rx_drops; + tx_drops += thr_stats[i].s.tx_drops; + invalid_seq += thr_stats[i].s.invalid_seq; + } + if (stats_enabled) { + pps = (pkts - pkts_prev) / timeout; + if (pps > maximum_pps) + maximum_pps = pps; + printf("%" PRIu64 " pps, %" PRIu64 " max pps, ", pps, + maximum_pps); + + printf("%" PRIu64 " rx drops, %" PRIu64 " tx drops, ", + rx_drops, tx_drops); + + printf("%" PRIu64 " invalid seq\n", invalid_seq); + + pkts_prev = pkts; + } + elapsed += timeout; + } while (loop_forever || (elapsed < duration)); + + if (stats_enabled) + printf("TEST RESULT: %" PRIu64 " maximum packets per second.\n", + maximum_pps); + + return (pkts > 100 && !invalid_seq) ? 0 : -1; +} + +/** + * Find the destination port for a given input port + * + * @param port Input port index + */ +static int find_dest_port(int port) +{ + /* Even number of ports */ + if (gbl_args->appl.if_count % 2 == 0) + return (port % 2 == 0) ? port + 1 : port - 1; + + /* Odd number of ports */ + if (port == gbl_args->appl.if_count - 1) + return 0; + else + return port + 1; +} + +/** + * Initialize port forwarding table + */ +static void init_forwarding_tbl(void) +{ + int rx_idx; + + for (rx_idx = 0; rx_idx < gbl_args->appl.if_count; rx_idx++) + gbl_args->dst_port[rx_idx] = find_dest_port(rx_idx); +} + +/** + * Prinf usage information + */ +static void usage(char *progname) +{ + printf("\n" + "OpenDataPlane ordered pktio application.\n" + "\n" + "Usage: %s OPTIONS\n" + " E.g. %s -i eth0,eth1\n" + " In the above example,\n" + " eth0 will send pkts to eth1 and vice versa\n" + "\n" + "Mandatory OPTIONS:\n" + " -i, --interface Eth interfaces (comma-separated, no spaces)\n" + " Interface count min 1, max %i\n" + "\n" + "Optional OPTIONS:\n" + " -m, --mode Packet input mode\n" + " 0: Scheduled ordered queues (default)\n" + " 1: Scheduled atomic queues\n" + " 2: Scheduled parallel queues (packet order not maintained)\n" + " -r, --num_rx_q Number of RX queues per interface\n" + " -f, --num_flows Number of packet flows\n" + " -e, --extra_input <number> Number of extra input processing rounds\n" + " -c, --count <number> CPU count.\n" + " -t, --time <number> Time in seconds to run.\n" + " -a, --accuracy <number> Statistics print interval in seconds\n" + " (default is 1 second).\n" + " -d, --dst_addr Destination addresses (comma-separated, no spaces)\n" + " -h, --help Display help and exit.\n\n" + "\n", NO_PATH(progname), NO_PATH(progname), MAX_PKTIOS + ); +} + +/** + * Parse and store the command line arguments + * + * @param argc argument count + * @param argv[] argument vector + * @param appl_args Store application arguments here + */ +static void parse_args(int argc, char *argv[], appl_args_t *appl_args) +{ + int opt; + int long_index; + char *token; + char *addr_str; + size_t len; + int i; + static const struct option longopts[] = { + {"count", required_argument, NULL, 'c'}, + {"time", required_argument, NULL, 't'}, + {"accuracy", required_argument, NULL, 'a'}, + {"interface", required_argument, NULL, 'i'}, + {"mode", required_argument, NULL, 'm'}, + {"dst_addr", required_argument, NULL, 'd'}, + {"num_rx_q", required_argument, NULL, 'r'}, + {"num_flows", required_argument, NULL, 'f'}, + {"extra_input", required_argument, NULL, 'e'}, + {"help", no_argument, NULL, 'h'}, + {NULL, 0, NULL, 0} + }; + + static const char *shortopts = "+c:+t:+a:i:m:d:r:f:e:h"; + + /* let helper collect its own arguments (e.g. --odph_proc) */ + odph_parse_options(argc, argv, shortopts, longopts); + + appl_args->time = 0; /* loop forever if time to run is 0 */ + appl_args->accuracy = DEF_STATS_INT; + appl_args->num_rx_q = DEF_NUM_RX_QUEUES; + appl_args->num_flows = DEF_NUM_FLOWS; + appl_args->extra_rounds = DEF_EXTRA_ROUNDS; + + opterr = 0; /* do not issue errors on helper options */ + + while (1) { + opt = getopt_long(argc, argv, shortopts, longopts, &long_index); + + if (opt == -1) + break; /* No more options */ + + switch (opt) { + case 'c': + appl_args->cpu_count = atoi(optarg); + break; + case 't': + appl_args->time = atoi(optarg); + break; + case 'a': + appl_args->accuracy = atoi(optarg); + break; + /* parse packet-io interface names */ + case 'd': + len = strlen(optarg); + if (len == 0) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + len += 1; /* add room for '\0' */ + + addr_str = malloc(len); + if (addr_str == NULL) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + /* store the mac addresses names */ + strcpy(addr_str, optarg); + for (token = strtok(addr_str, ","), i = 0; + token != NULL; token = strtok(NULL, ","), i++) { + if (i >= MAX_PKTIOS) { + printf("too many MAC addresses\n"); + usage(argv[0]); + exit(EXIT_FAILURE); + } + if (odph_eth_addr_parse(&appl_args->addrs[i], + token) != 0) { + printf("invalid MAC address\n"); + usage(argv[0]); + exit(EXIT_FAILURE); + } + } + appl_args->addr_count = i; + if (appl_args->addr_count < 1) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + free(addr_str); + break; + case 'i': + len = strlen(optarg); + if (len == 0) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + len += 1; /* add room for '\0' */ + + appl_args->if_str = malloc(len); + if (appl_args->if_str == NULL) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + /* count the number of tokens separated by ',' */ + strcpy(appl_args->if_str, optarg); + for (token = strtok(appl_args->if_str, ","), i = 0; + token != NULL; + token = strtok(NULL, ","), i++) + ; + + appl_args->if_count = i; + + if (appl_args->if_count < 1 || + appl_args->if_count > MAX_PKTIOS) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + /* allocate storage for the if names */ + appl_args->if_names = + calloc(appl_args->if_count, sizeof(char *)); + + /* store the if names (reset names string) */ + strcpy(appl_args->if_str, optarg); + for (token = strtok(appl_args->if_str, ","), i = 0; + token != NULL; token = strtok(NULL, ","), i++) { + appl_args->if_names[i] = token; + } + break; + case 'm': + i = atoi(optarg); + if (i == 1) + appl_args->in_mode = SCHED_ATOMIC; + else if (i == 2) + appl_args->in_mode = SCHED_PARALLEL; + else + appl_args->in_mode = SCHED_ORDERED; + break; + case 'r': + appl_args->num_rx_q = atoi(optarg); + break; + case 'f': + appl_args->num_flows = atoi(optarg); + break; + case 'e': + appl_args->extra_rounds = atoi(optarg); + break; + case 'h': + usage(argv[0]); + exit(EXIT_SUCCESS); + break; + default: + break; + } + } + + if (appl_args->cpu_count > MAX_WORKERS) { + printf("Too many workers requested %d, max: %d\n", + appl_args->cpu_count, MAX_WORKERS); + exit(EXIT_FAILURE); + } + + if (appl_args->num_flows > MAX_FLOWS) { + printf("Too many flows requested %d, max: %d\n", + appl_args->num_flows, MAX_FLOWS); + exit(EXIT_FAILURE); + } + + if (appl_args->if_count == 0 || appl_args->num_flows == 0 || + appl_args->num_rx_q == 0) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + if (appl_args->addr_count != 0 && + appl_args->addr_count != appl_args->if_count) { + printf("Number of destination addresses differs from number" + " of interfaces\n"); + usage(argv[0]); + exit(EXIT_FAILURE); + } + + optind = 1; /* reset 'extern optind' from the getopt lib */ +} + +/** + * Print system and application info + */ +static void print_info(char *progname, appl_args_t *appl_args) +{ + int i; + + odp_sys_info_print(); + + printf("Running ODP appl: \"%s\"\n" + "-----------------\n" + "IF-count: %i\n" + "Using IFs: ", + progname, appl_args->if_count); + for (i = 0; i < appl_args->if_count; ++i) + printf(" %s", appl_args->if_names[i]); + printf("\n\n"); + fflush(NULL); +} + +static void gbl_args_init(args_t *args) +{ + int pktio, queue; + + memset(args, 0, sizeof(args_t)); + + for (pktio = 0; pktio < MAX_PKTIOS; pktio++) { + args->pktios[pktio].pktio = ODP_PKTIO_INVALID; + + for (queue = 0; queue < MAX_QUEUES; queue++) + args->pktios[pktio].pktin[queue] = ODP_QUEUE_INVALID; + } +} + +/** + * ODP ordered pktio application + */ +int main(int argc, char *argv[]) +{ + odp_cpumask_t cpumask; + odp_instance_t instance; + odp_pool_t pool; + odp_pool_param_t params; + odp_shm_t shm; + odp_queue_capability_t capa; + odph_ethaddr_t new_addr; + odph_odpthread_t thread_tbl[MAX_WORKERS]; + stats_t *stats; + char cpumaskstr[ODP_CPUMASK_STR_SIZE]; + int cpu; + int i, j; + int if_count; + int ret; + int num_workers; + int in_mode; + + /* Init ODP before calling anything else */ + if (odp_init_global(&instance, NULL, NULL)) { + LOG_ERR("Error: ODP global init failed.\n"); + exit(EXIT_FAILURE); + } + + /* Init this thread */ + if (odp_init_local(instance, ODP_THREAD_CONTROL)) { + LOG_ERR("Error: ODP local init failed.\n"); + exit(EXIT_FAILURE); + } + + /* Reserve memory for args from shared mem */ + shm = odp_shm_reserve("shm_args", sizeof(args_t), + ODP_CACHE_LINE_SIZE, 0); + + if (shm == ODP_SHM_INVALID) { + LOG_ERR("Error: shared mem reserve failed.\n"); + exit(EXIT_FAILURE); + } + + gbl_args = odp_shm_addr(shm); + + if (gbl_args == NULL) { + LOG_ERR("Error: shared mem alloc failed.\n"); + odp_shm_free(shm); + exit(EXIT_FAILURE); + } + gbl_args_init(gbl_args); + + /* Parse and store the application arguments */ + parse_args(argc, argv, &gbl_args->appl); + + if (gbl_args->appl.in_mode == SCHED_ORDERED) { + /* At least one ordered lock required */ + odp_queue_capability(&capa); + if (capa.max_ordered_locks < 1) { + LOG_ERR("Error: Ordered locks not available.\n"); + exit(EXIT_FAILURE); + } + } + /* Print both system and application information */ + print_info(NO_PATH(argv[0]), &gbl_args->appl); + + /* Default to system CPU count unless user specified */ + num_workers = MAX_WORKERS; + if (gbl_args->appl.cpu_count) + num_workers = gbl_args->appl.cpu_count; + + /* Get default worker cpumask */ + num_workers = odp_cpumask_default_worker(&cpumask, num_workers); + (void)odp_cpumask_to_str(&cpumask, cpumaskstr, sizeof(cpumaskstr)); + + if_count = gbl_args->appl.if_count; + + printf("Num worker threads: %i\n", num_workers); + printf("First CPU: %i\n", odp_cpumask_first(&cpumask)); + printf("CPU mask: %s\n\n", cpumaskstr); + + /* Create packet pool */ + odp_pool_param_init(¶ms); + params.pkt.seg_len = PKT_POOL_BUF_SIZE; + params.pkt.len = PKT_POOL_BUF_SIZE; + params.pkt.num = PKT_POOL_SIZE; + params.pkt.uarea_size = PKT_UAREA_SIZE; + params.type = ODP_POOL_PACKET; + + pool = odp_pool_create("packet pool", ¶ms); + + if (pool == ODP_POOL_INVALID) { + LOG_ERR("Error: packet pool create failed.\n"); + exit(EXIT_FAILURE); + } + odp_pool_print(pool); + + init_forwarding_tbl(); + + for (i = 0; i < if_count; ++i) { + const char *dev = gbl_args->appl.if_names[i]; + int num_rx, num_tx; + + num_rx = gbl_args->appl.num_rx_q; + num_tx = gbl_args->appl.num_flows; + + if (create_pktio(dev, i, num_rx, num_tx, pool)) + exit(EXIT_FAILURE); + + /* Save interface ethernet address */ + if (odp_pktio_mac_addr(gbl_args->pktios[i].pktio, + gbl_args->port_eth_addr[i].addr, + ODPH_ETHADDR_LEN) != ODPH_ETHADDR_LEN) { + LOG_ERR("Error: interface ethernet address unknown\n"); + exit(EXIT_FAILURE); + } + + odp_pktio_print(gbl_args->pktios[i].pktio); + + /* Save destination eth address */ + /* 02:00:00:00:00:XX */ + memset(&new_addr, 0, sizeof(odph_ethaddr_t)); + if (gbl_args->appl.addr_count) { + memcpy(&new_addr, &gbl_args->appl.addrs[i], + sizeof(odph_ethaddr_t)); + } else { + new_addr.addr[0] = 0x02; + new_addr.addr[5] = i; + } + gbl_args->dst_eth_addr[i] = new_addr; + } + + gbl_args->pktios[i].pktio = ODP_PKTIO_INVALID; + + /* Allocate the same number of flows to each interface */ + for (i = 0; i < if_count; i++) { + odp_pktio_capability_t capa; + + if (odp_pktio_capability(gbl_args->pktios[i].pktio, &capa)) { + LOG_ERR("Error: pktio capability failed.\n"); + exit(EXIT_FAILURE); + } + + if ((unsigned)gbl_args->appl.num_flows > capa.max_output_queues) + gbl_args->appl.num_flows = capa.max_output_queues; + } + + /* Create atomic queues for packet tagging */ + for (i = 0; i < if_count; i++) { + for (j = 0; j < gbl_args->appl.num_flows; j++) { + odp_queue_t queue; + odp_queue_param_t qparam; + char qname[ODP_QUEUE_NAME_LEN]; + + snprintf(qname, sizeof(qname), "flow_%d_%d", i, j); + + odp_queue_param_init(&qparam); + qparam.type = ODP_QUEUE_TYPE_SCHED; + qparam.sched.prio = ODP_SCHED_PRIO_DEFAULT; + qparam.sched.sync = ODP_SCHED_SYNC_ATOMIC; + qparam.sched.group = ODP_SCHED_GROUP_ALL; + + gbl_args->flow_qcontext[i][j].idx = i; + gbl_args->flow_qcontext[i][j].input_queue = 0; + qparam.context = &gbl_args->flow_qcontext[i][j]; + qparam.context_len = sizeof(qcontext_t); + + queue = odp_queue_create(qname, &qparam); + if (queue == ODP_QUEUE_INVALID) { + LOG_ERR("Error: flow queue create failed.\n"); + exit(EXIT_FAILURE); + } + + gbl_args->fqueue[i][j] = queue; + } + } + + in_mode = gbl_args->appl.in_mode; + printf("\nApplication parameters\n" + "----------------------\n" + "Input queues: %d\n" + "Mode: %s\n" + "Flows: %d\n" + "Extra rounds: %d\n\n", gbl_args->appl.num_rx_q, + (in_mode == SCHED_ATOMIC) ? "PKTIN_SCHED_ATOMIC" : + (in_mode == SCHED_PARALLEL ? "PKTIN_SCHED_PARALLEL" : + "PKTIN_SCHED_ORDERED"), gbl_args->appl.num_flows, + gbl_args->appl.extra_rounds); + + memset(thread_tbl, 0, sizeof(thread_tbl)); + + stats = gbl_args->stats; + + odp_barrier_init(&barrier, num_workers + 1); + + /* Create worker threads */ + cpu = odp_cpumask_first(&cpumask); + for (i = 0; i < num_workers; ++i) { + odp_cpumask_t thd_mask; + odph_odpthread_params_t thr_params; + + memset(&thr_params, 0, sizeof(thr_params)); + thr_params.start = run_worker; + thr_params.arg = &gbl_args->thread[i]; + thr_params.thr_type = ODP_THREAD_WORKER; + thr_params.instance = instance; + + gbl_args->thread[i].stats = &stats[i]; + + odp_cpumask_zero(&thd_mask); + odp_cpumask_set(&thd_mask, cpu); + odph_odpthreads_create(&thread_tbl[i], &thd_mask, + &thr_params); + cpu = odp_cpumask_next(&cpumask, cpu); + } + + /* Start packet receive and transmit */ + for (i = 0; i < if_count; ++i) { + odp_pktio_t pktio; + + pktio = gbl_args->pktios[i].pktio; + ret = odp_pktio_start(pktio); + if (ret) { + LOG_ERR("Error: unable to start %s\n", + gbl_args->appl.if_names[i]); + exit(EXIT_FAILURE); + } + } + + ret = print_speed_stats(num_workers, stats, gbl_args->appl.time, + gbl_args->appl.accuracy); + + /* Stop receiving new packet */ + for (i = 0; i < if_count; i++) + odp_pktio_stop(gbl_args->pktios[i].pktio); + + exit_threads = 1; + + /* Master thread waits for other threads to exit */ + for (i = 0; i < num_workers; ++i) + odph_odpthreads_join(&thread_tbl[i]); + + for (i = 0; i < if_count; i++) { + odp_pktio_close(gbl_args->pktios[i].pktio); + + for (j = 0; j < gbl_args->appl.num_flows; j++) + odp_queue_destroy(gbl_args->fqueue[i][j]); + } + + free(gbl_args->appl.if_names); + free(gbl_args->appl.if_str); + + if (odp_pool_destroy(pool)) { + LOG_ERR("Error: pool destroy\n"); + exit(EXIT_FAILURE); + } + + if (odp_shm_free(shm)) { + LOG_ERR("Error: shm free\n"); + exit(EXIT_FAILURE); + } + + if (odp_term_local()) { + LOG_ERR("Error: term local\n"); + exit(EXIT_FAILURE); + } + + if (odp_term_global(instance)) { + LOG_ERR("Error: term global\n"); + exit(EXIT_FAILURE); + } + + return ret; +} |