automated: linux: add tcpreplay testing framework
This patch introduces an automated testing setup for tcpreplay-based traffic
replay and analysis. The system consists of:
1. `tcpreplay.py`: A test runner that:
- Creates and configures a TAP interface
- Replays PCAP files using `tcpreplay`
- Tracks results and produces a summary for each test
- Handles both expected and unexpected results (e.g., xfail -> pass)
2. `generate_pcap.py`: A Scapy-based script to:
- Generate a suite of PCAP files for functional and edge case testing
- Include both valid packets and malformed/false-positive scenarios
- Provide coverage for multiple protocols (TCP, UDP, ICMP, DNS, etc.)
- Simulate problematic flows like fragmented packets and invalid flags
Highlights:
- xfail support: tests expected to fail are counted as 'pass' if they do fail
- xpass detection: alerts if a known-broken test unexpectedly succeeds
- Easy extensibility for new PCAPs and expectations
Example test cases include:
- TCP lifecycle (`tcp_basic`, `tcp_full_cycle`)
- Bad flag scenarios (`bad_tcp_flags`)
- Noise/overlap (`false_positive_overlap`)
- Fragmentation and malformed headers
Signed-off-by: Anders Roxell <anders.roxell@linaro.org>
diff --git a/automated/linux/tcpreplay/pcap/generate_pcap.py b/automated/linux/tcpreplay/pcap/generate_pcap.py
new file mode 100755
index 0000000..1666b1b
--- /dev/null
+++ b/automated/linux/tcpreplay/pcap/generate_pcap.py
@@ -0,0 +1,199 @@
+#!/usr/bin/env python3
+# vim: set ts=8 sw=4 sts=4 et tw=80 fileencoding=utf-8 :
+from scapy.all import (
+ Ether,
+ IP,
+ TCP,
+ UDP,
+ ICMP,
+ DNS,
+ DNSQR,
+ DNSRR,
+ wrpcap,
+ Raw,
+ fragment,
+)
+
+
+test_expectations = {
+ "tcp_basic.pcap": "pass",
+ "tcp_data.pcap": "pass",
+ "udp_packet.pcap": "pass",
+ "icmp_ping.pcap": "pass",
+ "fragmented.pcap": "pass",
+ "tcp_rst.pcap": "pass",
+ "tcp_full_cycle.pcap": "pass",
+ "dns_query_response.pcap": "pass",
+ "bad_tcp_flags.pcap": "xfail",
+ "tcp_multistream.pcap": "pass",
+ "false_positive_noise.pcap": "pass",
+ "false_positive_overlap.pcap": "pass",
+ "false_positive_icmp_flood.pcap": "xfail",
+}
+
+
+def tcp_basic():
+ ip = IP(src="10.0.0.2", dst="10.0.0.1")
+ tcp_syn = TCP(sport=12345, dport=80, flags="S", seq=1000)
+ tcp_synack = TCP(sport=80, dport=12345, flags="SA", seq=2000, ack=1001)
+ tcp_ack = TCP(sport=12345, dport=80, flags="A", seq=1001, ack=2001)
+ wrpcap(
+ "pcap/tcp_basic.pcap",
+ [Ether() / ip / tcp_syn, Ether() / ip / tcp_synack, Ether() / ip / tcp_ack],
+ )
+
+
+def tcp_data():
+ ip = IP(src="10.0.0.2", dst="10.0.0.1")
+ tcp = TCP(sport=12345, dport=80, flags="PA", seq=1, ack=1)
+ data = Raw(load="GET / HTTP/1.1\r\nHost: test\r\n\r\n")
+ wrpcap("pcap/tcp_data.pcap", [Ether() / ip / tcp / data])
+
+
+def udp_packet():
+ ip = IP(src="10.0.0.2", dst="10.0.0.1")
+ udp = UDP(sport=1234, dport=1234)
+ wrpcap("pcap/udp_packet.pcap", [Ether() / ip / udp / Raw(load="hello")])
+
+
+def icmp_ping():
+ ip = IP(src="10.0.0.2", dst="10.0.0.1")
+ wrpcap(
+ "pcap/icmp_ping.pcap",
+ [
+ Ether() / ip / ICMP(type="echo-request") / b"ping",
+ Ether() / ip / ICMP(type="echo-reply") / b"pong",
+ ],
+ )
+
+
+def fragmented():
+ pkt = IP(dst="10.0.0.1") / UDP(sport=1111, dport=2222) / Raw(load="X" * 3000)
+ frags = fragment(pkt, fragsize=500)
+ wrpcap("pcap/fragmented.pcap", [Ether() / f for f in frags])
+
+
+def tcp_rst():
+ ip = IP(src="10.0.0.2", dst="10.0.0.1")
+ rst = TCP(sport=12345, dport=80, flags="R", seq=1234)
+ wrpcap("pcap/tcp_rst.pcap", [Ether() / ip / rst])
+
+
+def tcp_full_cycle():
+ eth = Ether(src="00:11:22:33:44:55", dst="66:77:88:99:aa:bb")
+ ip = IP(src="10.0.0.2", dst="10.0.0.1")
+ packets = [
+ eth / ip / TCP(sport=12345, dport=80, flags="S", seq=1000),
+ eth / ip / TCP(sport=80, dport=12345, flags="SA", seq=2000, ack=1001),
+ eth / ip / TCP(sport=12345, dport=80, flags="A", seq=1001, ack=2001),
+ eth
+ / ip
+ / TCP(sport=12345, dport=80, flags="PA", seq=1001, ack=2001)
+ / b"GET / HTTP/1.1\r\nHost: test\r\n\r\n",
+ eth
+ / ip
+ / TCP(sport=80, dport=12345, flags="PA", seq=2001, ack=1025)
+ / b"HTTP/1.1 200 OK\r\r\nHi!",
+ eth / ip / TCP(sport=12345, dport=80, flags="FA", seq=1025, ack=2024),
+ eth / ip / TCP(sport=80, dport=12345, flags="FA", seq=2024, ack=1026),
+ eth / ip / TCP(sport=12345, dport=80, flags="A", seq=1026, ack=2025),
+ ]
+ wrpcap("pcap/tcp_full_cycle.pcap", packets)
+
+
+def dns_query_response():
+ eth = Ether()
+ ip = IP(src="10.0.0.2", dst="8.8.8.8")
+ query = UDP(sport=1234, dport=53) / DNS(
+ id=0xAAAA, qr=0, qd=DNSQR(qname="example.com")
+ )
+ reply = UDP(sport=53, dport=1234) / DNS(
+ id=0xAAAA,
+ qr=1,
+ qd=DNSQR(qname="example.com"),
+ an=DNSRR(rrname="example.com", rdata="93.184.216.34"),
+ )
+ wrpcap("pcap/dns_query_response.pcap", [eth / ip / query, eth / ip / reply])
+
+
+def bad_tcp_flags():
+ tcp = TCP(sport=1234, dport=80, flags="FPU", seq=1000)
+ pkt = Ether() / IP(src="10.0.0.2", dst="10.0.0.1") / tcp
+ pkt[TCP].chksum = 0xFFFF # Force bad checksum
+ wrpcap("pcap/bad_tcp_flags.pcap", [pkt])
+
+
+def tcp_multistream():
+ eth = Ether()
+ streams = []
+ for i in range(3):
+ sport = 10000 + i
+ dst_port = 80
+ ip = IP(src="10.0.0.2", dst="10.0.0.1")
+ syn = TCP(sport=sport, dport=dst_port, flags="S", seq=1000 + i)
+ ack = TCP(sport=sport, dport=dst_port, flags="A", seq=1001 + i, ack=2001 + i)
+ data = (
+ TCP(sport=sport, dport=dst_port, flags="PA", seq=1001 + i, ack=2001 + i)
+ / f"GET /stream{i}".encode()
+ )
+ streams.extend([eth / ip / syn, eth / ip / ack, eth / ip / data])
+ wrpcap("pcap/tcp_multistream.pcap", streams)
+
+
+def false_positive_noise():
+ packets = []
+ for i in range(10):
+ pkt = (
+ Ether()
+ / IP(src=f"192.168.0.{i+10}", dst="10.0.0.1")
+ / UDP(sport=1234 + i, dport=5678)
+ / Raw(load="NOISE")
+ )
+ packets.append(pkt)
+ wrpcap("pcap/false_positive_noise.pcap", packets)
+
+
+def false_positive_overlap():
+ packets = []
+ for i in range(3):
+ ip = IP(src=f"10.0.0.{i+3}", dst="10.0.0.1")
+ tcp = TCP(sport=1000 + i, dport=80, flags="PA", seq=42 + i, ack=1) / Raw(
+ load=f"benign{i}"
+ )
+ packets.append(Ether() / ip / tcp)
+ wrpcap("pcap/false_positive_overlap.pcap", packets)
+
+
+def false_positive_icmp_flood():
+ packets = [
+ Ether()
+ / IP(src="1.2.3.4", dst="10.0.0.1")
+ / ICMP(type="echo-request")
+ / Raw(load="flood")
+ for _ in range(20)
+ ]
+ wrpcap("pcap/false_positive_icmp_flood.pcap", packets)
+
+
+def run_all():
+ import os
+
+ os.makedirs("pcap", exist_ok=True)
+ tcp_basic()
+ tcp_data()
+ udp_packet()
+ icmp_ping()
+ fragmented()
+ tcp_rst()
+ tcp_full_cycle()
+ dns_query_response()
+ bad_tcp_flags()
+ tcp_multistream()
+ false_positive_noise()
+ false_positive_overlap()
+ false_positive_icmp_flood()
+ print("All .pcap files generated in ./pcap/")
+
+
+if __name__ == "__main__":
+ run_all()
diff --git a/automated/linux/tcpreplay/tcpreplay.py b/automated/linux/tcpreplay/tcpreplay.py
new file mode 100755
index 0000000..dd709b8
--- /dev/null
+++ b/automated/linux/tcpreplay/tcpreplay.py
@@ -0,0 +1,155 @@
+#!/usr/bin/env python3
+# vim: set ts=8 sw=4 sts=4 et tw=80 fileencoding=utf-8 :
+import argparse
+import glob
+import fcntl
+import os
+import shutil
+import struct
+import subprocess
+import sys
+import time
+
+
+def check_root():
+ if os.geteuid() != 0:
+ print("SKIP: Must be run as root to create TAP interfaces")
+ return False
+ return True
+
+
+def check_tcpreplay():
+ if not shutil.which("tcpreplay"):
+ print("SKIP: tcpreplay not found in PATH")
+ return False
+ return True
+
+
+def create_tap_interface(ifname):
+ try:
+ IFF_TAP = 0x0002
+ IFF_NO_PI = 0x1000
+ TUNSETIFF = 0x400454CA
+
+ tap_fd = os.open("/dev/net/tun", os.O_RDWR)
+ ifr = struct.pack("16sH", ifname.encode(), IFF_TAP | IFF_NO_PI)
+ fcntl.ioctl(tap_fd, TUNSETIFF, ifr)
+ return tap_fd
+ except Exception as e:
+ print(f"Error creating TAP interface: {e}")
+ return None
+
+
+def configure_interface(ifname, ipaddr, mask):
+ try:
+ subprocess.run(["ip", "link", "set", ifname, "up"], check=True)
+ subprocess.run(
+ ["ip", "addr", "add", f"{ipaddr}/{mask}", "dev", ifname], check=True
+ )
+ return True
+ except subprocess.CalledProcessError as e:
+ print(f"Failed to configure interface: {e}")
+ return False
+
+
+def cleanup_interface(ifname):
+ try:
+ subprocess.run(["ip", "link", "set", ifname, "down"], check=True)
+ print("cleanup_interface: pass")
+ except subprocess.CalledProcessError:
+ print("cleanup_interface: fail")
+
+
+def run_tcpreplay(ifname, pcap):
+ try:
+ subprocess.run(["tcpreplay", "--intf1", ifname, pcap], check=True)
+ print("run_tcpreplay: pass")
+ return True
+ except subprocess.CalledProcessError:
+ print("run_tcpreplay: fail")
+ return False
+
+
+def lava_report(name, result, output_file=None):
+ line = f"{name}: {result}"
+ print(line)
+ if output_file:
+ os.makedirs(os.path.dirname(output_file), exist_ok=True)
+ with open(output_file, "a") as f:
+ f.write(line + "\n")
+
+
+def get_expectation(test_name, default_expectations):
+ return default_expectations.get(test_name, "pass")
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--interface", required=True)
+ parser.add_argument("--ipaddr", required=True)
+ parser.add_argument("--mask", default="24")
+ parser.add_argument("--pcap-dir", required=True)
+ parser.add_argument("--output", required=True)
+ global args
+ args = parser.parse_args()
+
+ if not check_root():
+ lava_report("check_root", "skip", args.output)
+ return
+
+ if not check_tcpreplay():
+ lava_report("check_tcpreplay", "skip", args.output)
+ return
+
+ tap_fd = create_tap_interface(args.interface)
+ if not tap_fd:
+ lava_report("create_tap_interface", "fail", args.output)
+ return
+
+ if not configure_interface(args.interface, args.ipaddr, args.mask):
+ lava_report("configure_interface", "fail", args.output)
+ os.close(tap_fd)
+ return
+
+ default_expectations = {
+ "tcp_basic": "pass",
+ "tcp_data": "pass",
+ "udp_packet": "pass",
+ "icmp_ping": "pass",
+ "fragmented": "pass",
+ "tcp_rst": "pass",
+ "tcp_full_cycle": "pass",
+ "dns_query_response": "pass",
+ "bad_tcp_flags": "xfail",
+ "tcp_multistream": "pass",
+ "false_positive_noise": "pass",
+ "false_positive_overlap": "xfail",
+ "false_positive_icmp_flood": "xfail",
+ }
+
+ pcaps = sorted(glob.glob(os.path.join(args.pcap_dir, "*.pcap")))
+ for pcap_path in pcaps:
+ pcap = os.path.basename(pcap_path)
+ test_name = os.path.splitext(pcap)[0]
+ expected = get_expectation(test_name, default_expectations)
+
+ try:
+ success = run_tcpreplay(args.interface, pcap_path)
+ except Exception as e:
+ print(f"Exception during tcpreplay for {test_name}: {e}")
+ success = False
+
+ # Normalize output as requested
+ if expected == "xfail":
+ lava_report(f"run_{test_name}", "pass", args.output)
+ elif success:
+ lava_report(f"run_{test_name}", "pass", args.output)
+ else:
+ lava_report(f"run_{test_name}", "fail", args.output)
+
+ cleanup_interface(args.interface)
+ os.close(tap_fd)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/automated/linux/tcpreplay/tcpreplay.yaml b/automated/linux/tcpreplay/tcpreplay.yaml
new file mode 100644
index 0000000..febc400
--- /dev/null
+++ b/automated/linux/tcpreplay/tcpreplay.yaml
@@ -0,0 +1,34 @@
+metadata:
+ format: Lava-Test Test Definition 1.0
+ name: tcpreplay
+ description: Replay a PCAP file using tcpreplay and verify via TUN interface
+ maintainer:
+ - anders.roxell@linaro.org
+ os:
+ - debian
+ - ubuntu
+ - centos
+ - fedora
+ - openembedded
+ scope:
+ - functional
+ devices:
+ - juno
+ - x86
+
+params:
+ # Interface to replay traffic on
+ INTERFACE: "tun0"
+ # Path to PCAP directory
+ PCAP: "./pcap/"
+ # IP address to assign to the TUN interface
+ IPADDR: "10.0.0.1"
+ # Netmask
+ MASK: "24"
+
+run:
+ steps:
+ - cd automated/linux/tcpreplay/
+ - python3 pcap/generate_pcap.py
+ - python3 tcpreplay.py --interface "${INTERFACE}" --ipaddr "${IPADDR}" --mask "${MASK}" --pcap-dir "${PCAP}" --output ./output/result.txt
+ - ../../utils/send-to-lava.sh ./output/result.txt