automated: linux: add tcpreplay testing framework

This patch introduces an automated testing setup for tcpreplay-based traffic
replay and analysis. The system consists of:

1. `tcpreplay.py`: A test runner that:
   - Creates and configures a TAP interface
   - Replays PCAP files using `tcpreplay`
   - Tracks results and produces a summary for each test
   - Handles both expected and unexpected results (e.g., xfail -> pass)

2. `generate_pcap.py`: A Scapy-based script to:
   - Generate a suite of PCAP files for functional and edge case testing
   - Include both valid packets and malformed/false-positive scenarios
   - Provide coverage for multiple protocols (TCP, UDP, ICMP, DNS, etc.)
   - Simulate problematic flows like fragmented packets and invalid flags

Highlights:
- xfail support: tests expected to fail are counted as 'pass' if they do fail
- xpass detection: alerts if a known-broken test unexpectedly succeeds
- Easy extensibility for new PCAPs and expectations

Example test cases include:
- TCP lifecycle (`tcp_basic`, `tcp_full_cycle`)
- Bad flag scenarios (`bad_tcp_flags`)
- Noise/overlap (`false_positive_overlap`)
- Fragmentation and malformed headers

Signed-off-by: Anders Roxell <anders.roxell@linaro.org>
diff --git a/automated/linux/tcpreplay/pcap/generate_pcap.py b/automated/linux/tcpreplay/pcap/generate_pcap.py
new file mode 100755
index 0000000..1666b1b
--- /dev/null
+++ b/automated/linux/tcpreplay/pcap/generate_pcap.py
@@ -0,0 +1,199 @@
+#!/usr/bin/env python3
+# vim: set ts=8 sw=4 sts=4 et tw=80 fileencoding=utf-8 :
+from scapy.all import (
+    Ether,
+    IP,
+    TCP,
+    UDP,
+    ICMP,
+    DNS,
+    DNSQR,
+    DNSRR,
+    wrpcap,
+    Raw,
+    fragment,
+)
+
+
+test_expectations = {
+    "tcp_basic.pcap": "pass",
+    "tcp_data.pcap": "pass",
+    "udp_packet.pcap": "pass",
+    "icmp_ping.pcap": "pass",
+    "fragmented.pcap": "pass",
+    "tcp_rst.pcap": "pass",
+    "tcp_full_cycle.pcap": "pass",
+    "dns_query_response.pcap": "pass",
+    "bad_tcp_flags.pcap": "xfail",
+    "tcp_multistream.pcap": "pass",
+    "false_positive_noise.pcap": "pass",
+    "false_positive_overlap.pcap": "pass",
+    "false_positive_icmp_flood.pcap": "xfail",
+}
+
+
+def tcp_basic():
+    ip = IP(src="10.0.0.2", dst="10.0.0.1")
+    tcp_syn = TCP(sport=12345, dport=80, flags="S", seq=1000)
+    tcp_synack = TCP(sport=80, dport=12345, flags="SA", seq=2000, ack=1001)
+    tcp_ack = TCP(sport=12345, dport=80, flags="A", seq=1001, ack=2001)
+    wrpcap(
+        "pcap/tcp_basic.pcap",
+        [Ether() / ip / tcp_syn, Ether() / ip / tcp_synack, Ether() / ip / tcp_ack],
+    )
+
+
+def tcp_data():
+    ip = IP(src="10.0.0.2", dst="10.0.0.1")
+    tcp = TCP(sport=12345, dport=80, flags="PA", seq=1, ack=1)
+    data = Raw(load="GET / HTTP/1.1\r\nHost: test\r\n\r\n")
+    wrpcap("pcap/tcp_data.pcap", [Ether() / ip / tcp / data])
+
+
+def udp_packet():
+    ip = IP(src="10.0.0.2", dst="10.0.0.1")
+    udp = UDP(sport=1234, dport=1234)
+    wrpcap("pcap/udp_packet.pcap", [Ether() / ip / udp / Raw(load="hello")])
+
+
+def icmp_ping():
+    ip = IP(src="10.0.0.2", dst="10.0.0.1")
+    wrpcap(
+        "pcap/icmp_ping.pcap",
+        [
+            Ether() / ip / ICMP(type="echo-request") / b"ping",
+            Ether() / ip / ICMP(type="echo-reply") / b"pong",
+        ],
+    )
+
+
+def fragmented():
+    pkt = IP(dst="10.0.0.1") / UDP(sport=1111, dport=2222) / Raw(load="X" * 3000)
+    frags = fragment(pkt, fragsize=500)
+    wrpcap("pcap/fragmented.pcap", [Ether() / f for f in frags])
+
+
+def tcp_rst():
+    ip = IP(src="10.0.0.2", dst="10.0.0.1")
+    rst = TCP(sport=12345, dport=80, flags="R", seq=1234)
+    wrpcap("pcap/tcp_rst.pcap", [Ether() / ip / rst])
+
+
+def tcp_full_cycle():
+    eth = Ether(src="00:11:22:33:44:55", dst="66:77:88:99:aa:bb")
+    ip = IP(src="10.0.0.2", dst="10.0.0.1")
+    packets = [
+        eth / ip / TCP(sport=12345, dport=80, flags="S", seq=1000),
+        eth / ip / TCP(sport=80, dport=12345, flags="SA", seq=2000, ack=1001),
+        eth / ip / TCP(sport=12345, dport=80, flags="A", seq=1001, ack=2001),
+        eth
+        / ip
+        / TCP(sport=12345, dport=80, flags="PA", seq=1001, ack=2001)
+        / b"GET / HTTP/1.1\r\nHost: test\r\n\r\n",
+        eth
+        / ip
+        / TCP(sport=80, dport=12345, flags="PA", seq=2001, ack=1025)
+        / b"HTTP/1.1 200 OK\r\r\nHi!",
+        eth / ip / TCP(sport=12345, dport=80, flags="FA", seq=1025, ack=2024),
+        eth / ip / TCP(sport=80, dport=12345, flags="FA", seq=2024, ack=1026),
+        eth / ip / TCP(sport=12345, dport=80, flags="A", seq=1026, ack=2025),
+    ]
+    wrpcap("pcap/tcp_full_cycle.pcap", packets)
+
+
+def dns_query_response():
+    eth = Ether()
+    ip = IP(src="10.0.0.2", dst="8.8.8.8")
+    query = UDP(sport=1234, dport=53) / DNS(
+        id=0xAAAA, qr=0, qd=DNSQR(qname="example.com")
+    )
+    reply = UDP(sport=53, dport=1234) / DNS(
+        id=0xAAAA,
+        qr=1,
+        qd=DNSQR(qname="example.com"),
+        an=DNSRR(rrname="example.com", rdata="93.184.216.34"),
+    )
+    wrpcap("pcap/dns_query_response.pcap", [eth / ip / query, eth / ip / reply])
+
+
+def bad_tcp_flags():
+    tcp = TCP(sport=1234, dport=80, flags="FPU", seq=1000)
+    pkt = Ether() / IP(src="10.0.0.2", dst="10.0.0.1") / tcp
+    pkt[TCP].chksum = 0xFFFF  # Force bad checksum
+    wrpcap("pcap/bad_tcp_flags.pcap", [pkt])
+
+
+def tcp_multistream():
+    eth = Ether()
+    streams = []
+    for i in range(3):
+        sport = 10000 + i
+        dst_port = 80
+        ip = IP(src="10.0.0.2", dst="10.0.0.1")
+        syn = TCP(sport=sport, dport=dst_port, flags="S", seq=1000 + i)
+        ack = TCP(sport=sport, dport=dst_port, flags="A", seq=1001 + i, ack=2001 + i)
+        data = (
+            TCP(sport=sport, dport=dst_port, flags="PA", seq=1001 + i, ack=2001 + i)
+            / f"GET /stream{i}".encode()
+        )
+        streams.extend([eth / ip / syn, eth / ip / ack, eth / ip / data])
+    wrpcap("pcap/tcp_multistream.pcap", streams)
+
+
+def false_positive_noise():
+    packets = []
+    for i in range(10):
+        pkt = (
+            Ether()
+            / IP(src=f"192.168.0.{i+10}", dst="10.0.0.1")
+            / UDP(sport=1234 + i, dport=5678)
+            / Raw(load="NOISE")
+        )
+        packets.append(pkt)
+    wrpcap("pcap/false_positive_noise.pcap", packets)
+
+
+def false_positive_overlap():
+    packets = []
+    for i in range(3):
+        ip = IP(src=f"10.0.0.{i+3}", dst="10.0.0.1")
+        tcp = TCP(sport=1000 + i, dport=80, flags="PA", seq=42 + i, ack=1) / Raw(
+            load=f"benign{i}"
+        )
+        packets.append(Ether() / ip / tcp)
+    wrpcap("pcap/false_positive_overlap.pcap", packets)
+
+
+def false_positive_icmp_flood():
+    packets = [
+        Ether()
+        / IP(src="1.2.3.4", dst="10.0.0.1")
+        / ICMP(type="echo-request")
+        / Raw(load="flood")
+        for _ in range(20)
+    ]
+    wrpcap("pcap/false_positive_icmp_flood.pcap", packets)
+
+
+def run_all():
+    import os
+
+    os.makedirs("pcap", exist_ok=True)
+    tcp_basic()
+    tcp_data()
+    udp_packet()
+    icmp_ping()
+    fragmented()
+    tcp_rst()
+    tcp_full_cycle()
+    dns_query_response()
+    bad_tcp_flags()
+    tcp_multistream()
+    false_positive_noise()
+    false_positive_overlap()
+    false_positive_icmp_flood()
+    print("All .pcap files generated in ./pcap/")
+
+
+if __name__ == "__main__":
+    run_all()
diff --git a/automated/linux/tcpreplay/tcpreplay.py b/automated/linux/tcpreplay/tcpreplay.py
new file mode 100755
index 0000000..dd709b8
--- /dev/null
+++ b/automated/linux/tcpreplay/tcpreplay.py
@@ -0,0 +1,155 @@
+#!/usr/bin/env python3
+# vim: set ts=8 sw=4 sts=4 et tw=80 fileencoding=utf-8 :
+import argparse
+import glob
+import fcntl
+import os
+import shutil
+import struct
+import subprocess
+import sys
+import time
+
+
+def check_root():
+    if os.geteuid() != 0:
+        print("SKIP: Must be run as root to create TAP interfaces")
+        return False
+    return True
+
+
+def check_tcpreplay():
+    if not shutil.which("tcpreplay"):
+        print("SKIP: tcpreplay not found in PATH")
+        return False
+    return True
+
+
+def create_tap_interface(ifname):
+    try:
+        IFF_TAP = 0x0002
+        IFF_NO_PI = 0x1000
+        TUNSETIFF = 0x400454CA
+
+        tap_fd = os.open("/dev/net/tun", os.O_RDWR)
+        ifr = struct.pack("16sH", ifname.encode(), IFF_TAP | IFF_NO_PI)
+        fcntl.ioctl(tap_fd, TUNSETIFF, ifr)
+        return tap_fd
+    except Exception as e:
+        print(f"Error creating TAP interface: {e}")
+        return None
+
+
+def configure_interface(ifname, ipaddr, mask):
+    try:
+        subprocess.run(["ip", "link", "set", ifname, "up"], check=True)
+        subprocess.run(
+            ["ip", "addr", "add", f"{ipaddr}/{mask}", "dev", ifname], check=True
+        )
+        return True
+    except subprocess.CalledProcessError as e:
+        print(f"Failed to configure interface: {e}")
+        return False
+
+
+def cleanup_interface(ifname):
+    try:
+        subprocess.run(["ip", "link", "set", ifname, "down"], check=True)
+        print("cleanup_interface: pass")
+    except subprocess.CalledProcessError:
+        print("cleanup_interface: fail")
+
+
+def run_tcpreplay(ifname, pcap):
+    try:
+        subprocess.run(["tcpreplay", "--intf1", ifname, pcap], check=True)
+        print("run_tcpreplay: pass")
+        return True
+    except subprocess.CalledProcessError:
+        print("run_tcpreplay: fail")
+        return False
+
+
+def lava_report(name, result, output_file=None):
+    line = f"{name}: {result}"
+    print(line)
+    if output_file:
+        os.makedirs(os.path.dirname(output_file), exist_ok=True)
+        with open(output_file, "a") as f:
+            f.write(line + "\n")
+
+
+def get_expectation(test_name, default_expectations):
+    return default_expectations.get(test_name, "pass")
+
+
+def main():
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--interface", required=True)
+    parser.add_argument("--ipaddr", required=True)
+    parser.add_argument("--mask", default="24")
+    parser.add_argument("--pcap-dir", required=True)
+    parser.add_argument("--output", required=True)
+    global args
+    args = parser.parse_args()
+
+    if not check_root():
+        lava_report("check_root", "skip", args.output)
+        return
+
+    if not check_tcpreplay():
+        lava_report("check_tcpreplay", "skip", args.output)
+        return
+
+    tap_fd = create_tap_interface(args.interface)
+    if not tap_fd:
+        lava_report("create_tap_interface", "fail", args.output)
+        return
+
+    if not configure_interface(args.interface, args.ipaddr, args.mask):
+        lava_report("configure_interface", "fail", args.output)
+        os.close(tap_fd)
+        return
+
+    default_expectations = {
+        "tcp_basic": "pass",
+        "tcp_data": "pass",
+        "udp_packet": "pass",
+        "icmp_ping": "pass",
+        "fragmented": "pass",
+        "tcp_rst": "pass",
+        "tcp_full_cycle": "pass",
+        "dns_query_response": "pass",
+        "bad_tcp_flags": "xfail",
+        "tcp_multistream": "pass",
+        "false_positive_noise": "pass",
+        "false_positive_overlap": "xfail",
+        "false_positive_icmp_flood": "xfail",
+    }
+
+    pcaps = sorted(glob.glob(os.path.join(args.pcap_dir, "*.pcap")))
+    for pcap_path in pcaps:
+        pcap = os.path.basename(pcap_path)
+        test_name = os.path.splitext(pcap)[0]
+        expected = get_expectation(test_name, default_expectations)
+
+        try:
+            success = run_tcpreplay(args.interface, pcap_path)
+        except Exception as e:
+            print(f"Exception during tcpreplay for {test_name}: {e}")
+            success = False
+
+        # Normalize output as requested
+        if expected == "xfail":
+            lava_report(f"run_{test_name}", "pass", args.output)
+        elif success:
+            lava_report(f"run_{test_name}", "pass", args.output)
+        else:
+            lava_report(f"run_{test_name}", "fail", args.output)
+
+    cleanup_interface(args.interface)
+    os.close(tap_fd)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/automated/linux/tcpreplay/tcpreplay.yaml b/automated/linux/tcpreplay/tcpreplay.yaml
new file mode 100644
index 0000000..febc400
--- /dev/null
+++ b/automated/linux/tcpreplay/tcpreplay.yaml
@@ -0,0 +1,34 @@
+metadata:
+  format: Lava-Test Test Definition 1.0
+  name: tcpreplay
+  description: Replay a PCAP file using tcpreplay and verify via TUN interface
+  maintainer:
+    - anders.roxell@linaro.org
+  os:
+    - debian
+    - ubuntu
+    - centos
+    - fedora
+    - openembedded
+  scope:
+    - functional
+  devices:
+    - juno
+    - x86
+
+params:
+  # Interface to replay traffic on
+  INTERFACE: "tun0"
+  # Path to PCAP directory
+  PCAP: "./pcap/"
+  # IP address to assign to the TUN interface
+  IPADDR: "10.0.0.1"
+  # Netmask
+  MASK: "24"
+
+run:
+  steps:
+    - cd automated/linux/tcpreplay/
+    - python3 pcap/generate_pcap.py
+    - python3 tcpreplay.py --interface "${INTERFACE}" --ipaddr "${IPADDR}" --mask "${MASK}" --pcap-dir "${PCAP}" --output ./output/result.txt
+    - ../../utils/send-to-lava.sh ./output/result.txt