diff --git a/ci/xtask/src/commands.rs b/ci/xtask/src/commands.rs index 3645e1472c6..c9fb5079cf3 100644 --- a/ci/xtask/src/commands.rs +++ b/ci/xtask/src/commands.rs @@ -1,2 +1,3 @@ pub mod generate_pipeline; pub mod hello; +pub mod xdp_test; diff --git a/ci/xtask/src/commands/generate_pipeline.rs b/ci/xtask/src/commands/generate_pipeline.rs index 415902165a0..636a09dfaf5 100644 --- a/ci/xtask/src/commands/generate_pipeline.rs +++ b/ci/xtask/src/commands/generate_pipeline.rs @@ -142,6 +142,7 @@ fn generate_private_pipeline() -> Result { pipeline.add_step(default_local_cluster_step(10)); pipeline.add_step(default_docs_check_step()); pipeline.add_step(default_localnet_step()); + pipeline.add_step(default_xdp_test_step()); pipeline.add_step(buildkite::Step::Wait(buildkite::WaitStep {})); @@ -232,6 +233,7 @@ struct PullRequestPipelineFlags { stable_sbf: bool, shuttle: bool, coverage: bool, + xdp_tests: bool, } impl PullRequestPipelineFlags { @@ -250,6 +252,13 @@ impl PullRequestPipelineFlags { file.ends_with("Cargo.toml") || file.ends_with("Cargo.lock") || file.ends_with(".rs") }); + let xdp_changed = changed_files.iter().any(|file| { + file == "Cargo.lock" + || file == "Cargo.toml" + || file.starts_with("xdp/") + || file.starts_with("xdp-ebpf/") + }); + Self { shellcheck: changed_files.iter().any(|file| file.ends_with(".sh")), checks: trigger_all @@ -338,6 +347,7 @@ impl PullRequestPipelineFlags { || file.ends_with("ci/test-coverage.sh") || file.starts_with("ci/coverage/") }), + xdp_tests: trigger_all || xdp_changed, } } } @@ -385,6 +395,9 @@ async fn generate_pull_request_pipeline( if flags.localnet { pipeline.add_step(default_localnet_step()); } + if flags.xdp_tests { + pipeline.add_step(default_xdp_test_step()); + } pipeline.add_step(buildkite::Step::Wait(buildkite::WaitStep {})); @@ -420,6 +433,7 @@ fn generate_full_pipeline() -> Result { pipeline.add_step(default_local_cluster_step(10)); pipeline.add_step(default_docs_check_step()); pipeline.add_step(default_localnet_step()); + pipeline.add_step(default_xdp_test_step()); pipeline.add_step(buildkite::Step::Wait(buildkite::WaitStep {})); @@ -643,6 +657,35 @@ fn default_localnet_step() -> buildkite::Step { }) } +fn default_xdp_test_step() -> buildkite::Step { + buildkite::Step::Command(buildkite::CommandStep { + name: String::from("xdp-test"), + // The container runs as root for network namespace privileges; keep + // root-owned Cargo artifacts out of the mounted checkout. + command: String::from(concat!( + "ci/docker-run-default-image.sh env ", + "CARGO_TARGET_DIR=/tmp/agave-xdp-target ", + "cargo xtask xdp-test --release-with-debug", + )), + agents: Some(HashMap::from([( + String::from("queue"), + String::from("default"), + )])), + timeout_in_minutes: Some(25), + env: Some(HashMap::from([ + ( + String::from("EXTRA_DOCKER_RUN_ARGS"), + String::from("--cap-add NET_ADMIN --cap-add NET_RAW --cap-add SYS_ADMIN"), + ), + ( + String::from("SOLANA_DOCKER_RUN_NOSETUID"), + String::from("1"), + ), + ])), + ..Default::default() + }) +} + fn default_stable_sbf_step() -> buildkite::Step { buildkite::Step::Command(buildkite::CommandStep { name: String::from("stable-sbf"), @@ -858,6 +901,7 @@ mod tests { assert!(!f.stable_sbf); assert!(!f.shuttle); assert!(!f.coverage); + assert!(!f.xdp_tests); } #[test] @@ -874,10 +918,11 @@ mod tests { assert!(f.stable_sbf); assert!(f.shuttle); assert!(f.coverage); + assert!(f.xdp_tests); } #[test] - fn test_rust_change_triggers_all() { + fn test_rust_change_triggers_standard_rust_jobs() { let f = flags(&["core/src/lib.rs"]); assert!(f.checks); assert!(f.feature_check); @@ -890,6 +935,13 @@ mod tests { assert!(f.stable_sbf); assert!(f.shuttle); assert!(f.coverage); + assert!(!f.xdp_tests); + } + + #[test] + fn test_xdp_change_triggers_xdp_tests() { + let f = flags(&["xdp/tests/transmitter_smoke.rs"]); + assert!(f.xdp_tests); } #[test] @@ -907,6 +959,7 @@ mod tests { assert!(!f.stable_sbf); assert!(!f.shuttle); assert!(!f.coverage); + assert!(!f.xdp_tests); } #[test] @@ -924,5 +977,6 @@ mod tests { assert!(!f.stable_sbf); assert!(!f.shuttle); assert!(!f.coverage); + assert!(!f.xdp_tests); } } diff --git a/ci/xtask/src/commands/xdp_test.rs b/ci/xtask/src/commands/xdp_test.rs new file mode 100644 index 00000000000..0aa21fd0831 --- /dev/null +++ b/ci/xtask/src/commands/xdp_test.rs @@ -0,0 +1,101 @@ +use { + anyhow::{bail, ensure, Context, Result}, + clap::Args, + log::info, + std::{ + env, + path::{Path, PathBuf}, + process::Command, + }, +}; + +const DEFAULT_TESTS: &[&str] = &[ + "netlink_snapshot", + "route_monitor", + "router_snapshot", + "transmitter_smoke", +]; + +#[derive(Args)] +pub struct CommandArgs { + #[arg( + long, + help = "Build and run the tests with the release-with-debug profile" + )] + pub release_with_debug: bool, + + #[arg( + long, + help = "Optional command prefix used to run cargo with privileges, for example: sudo -n -E" + )] + runner: Option, + + #[arg(long = "test", value_name = "TEST")] + tests: Vec, + + #[arg(last = true)] + run_args: Vec, +} + +pub fn run(args: CommandArgs) -> Result<()> { + let repo_root = repo_root(); + let tests = test_selection(&args.tests); + let mut cmd = command_with_runner(args.runner.as_deref(), &cargo_bin())?; + cmd.current_dir(&repo_root); + cmd.arg("test") + .arg("-p") + .arg("agave-xdp") + .arg("--features") + .arg("agave-unstable-api"); + if args.release_with_debug { + cmd.arg("--profile").arg("release-with-debug"); + } + for test in &tests { + cmd.arg("--test").arg(test); + } + cmd.arg("--") + .arg("--include-ignored") + .arg("--test-threads=1"); + for arg in &args.run_args { + cmd.arg(arg); + } + + info!("running local xdp tests from {}", repo_root.display()); + let status = cmd.status().context("failed to run cargo test")?; + ensure!(status.success(), "xdp tests failed with {status}"); + Ok(()) +} + +fn test_selection(selected: &[String]) -> Vec { + if selected.is_empty() { + return DEFAULT_TESTS + .iter() + .map(|test| (*test).to_string()) + .collect(); + } + selected.to_vec() +} + +fn repo_root() -> PathBuf { + let root = Path::new(env!("CARGO_MANIFEST_DIR")).join("../.."); + root.canonicalize().unwrap_or(root) +} + +fn command_with_runner(runner: Option<&str>, program: &Path) -> Result { + let Some(runner) = runner else { + return Ok(Command::new(program)); + }; + let mut parts = runner.split_whitespace(); + let Some(runner_program) = parts.next() else { + bail!("runner cannot be empty"); + }; + let mut cmd = Command::new(runner_program); + cmd.args(parts).arg(program); + Ok(cmd) +} + +fn cargo_bin() -> PathBuf { + env::var_os("CARGO") + .map(PathBuf::from) + .unwrap_or_else(|| PathBuf::from("cargo")) +} diff --git a/ci/xtask/src/main.rs b/ci/xtask/src/main.rs index 1a66585ae4a..f13b610f9c2 100644 --- a/ci/xtask/src/main.rs +++ b/ci/xtask/src/main.rs @@ -28,6 +28,8 @@ enum Commands { Publish(xtask_shared::commands::publish::CommandArgs), #[command(about = "Generate Buildkite pipeline")] GeneratePipeline(commands::generate_pipeline::CommandArgs), + #[command(about = "Run XDP integration tests")] + XdpTest(commands::xdp_test::CommandArgs), } #[derive(Args, Debug)] @@ -78,6 +80,9 @@ async fn try_main(xtask: Xtask) -> Result<()> { Commands::GeneratePipeline(args) => { commands::generate_pipeline::run(args).await?; } + Commands::XdpTest(args) => { + commands::xdp_test::run(args)?; + } } Ok(()) diff --git a/xdp/Cargo.toml b/xdp/Cargo.toml index cd916284dbc..c8c11dbc9e1 100644 --- a/xdp/Cargo.toml +++ b/xdp/Cargo.toml @@ -26,5 +26,25 @@ arrayvec = { workspace = true } aya = { workspace = true } caps = { workspace = true } +[[test]] +name = "netlink_snapshot" +path = "tests/netlink_snapshot.rs" +required-features = ["agave-unstable-api"] + +[[test]] +name = "route_monitor" +path = "tests/route_monitor.rs" +required-features = ["agave-unstable-api"] + +[[test]] +name = "router_snapshot" +path = "tests/router_snapshot.rs" +required-features = ["agave-unstable-api"] + +[[test]] +name = "transmitter_smoke" +path = "tests/transmitter_smoke.rs" +required-features = ["agave-unstable-api"] + [lints] workspace = true diff --git a/xdp/tests/README.md b/xdp/tests/README.md new file mode 100644 index 00000000000..545d8e125d3 --- /dev/null +++ b/xdp/tests/README.md @@ -0,0 +1,123 @@ +# XDP Integration Tests + +These tests are run through `cargo xtask xdp-test`. + +The tests run directly on the host and require root or equivalent network admin privileges because the harness creates a temporary network namespace, `veth` interfaces, routes, and neighbors: + +```bash +cargo xtask xdp-test --runner "sudo -n -E" +``` + +To run a single test locally, use this form: + +```bash +cargo xtask xdp-test --runner "sudo -n -E" --test -- --exact --nocapture +``` + +The default suite currently runs: + +- `netlink_snapshot` +- `route_monitor` +- `router_snapshot` +- `transmitter_smoke` + +## Test Topology + +Each portable test runs in a fresh temporary network namespace created with `unshare(CLONE_NEWNET)`. The tests bring `lo` up, create the interfaces needed by that test, and restore the original namespace when the test exits. + +The initial topology is one veth pair inside that namespace: + +```text +temporary test network namespace + + route and neighbor state under test + | + v + axdp0 10.0.0.1/24 02:aa:bb:cc:dd:01 + | + | veth peer + | + axdp1 10.0.0.2/24 02:aa:bb:cc:dd:02 + + neighbor: 10.0.0.2 -> 02:aa:bb:cc:dd:02 dev axdp0 + route example: 203.0.113.0/24 via 10.0.0.2 dev axdp0 +``` + +The copy-mode transmitter tests use the same primary veth pair. The transmitter binds AF_XDP TX to `axdp0`; the test binds a raw packet socket to `axdp1` and verifies the emitted Ethernet/IP/UDP frame: + +```text +temporary test network namespace + + XdpSender -> copy-mode AF_XDP TX socket + | + v + axdp0 10.0.0.1/24 02:aa:bb:cc:dd:01 + | + | veth peer + | + axdp1 10.0.0.2/24 02:aa:bb:cc:dd:02 + ^ + | + raw packet receiver +``` + +GRE tests add a tunnel on top of the primary veth pair. The transmitter sends the inner UDP packet to the overlay destination; the route resolves through `gxdp0`, the XDP transmit path wraps the packet in GRE, and the raw packet receiver observes the outer packet on `axdp1`. + +```text +inner packet: + 192.0.2.1: -> 192.0.2.99: + +GRE overlay route: + 192.0.2.0/24 dev gxdp0 src 192.0.2.1 + +GRE tunnel: + gxdp0 + local underlay: 10.0.0.1 (axdp0) + remote underlay: 10.0.0.2 (axdp1) + overlay source: 192.0.2.1/32 + ttl: 64 + +outer packet observed by receiver on axdp1: + Ethernet: 02:aa:bb:cc:dd:01 -> 02:aa:bb:cc:dd:02 + IPv4: 10.0.0.1 -> 10.0.0.2 + GRE: inner IPv4/UDP packet +``` + +## Individual Tests + +Use the single-test command form above with these test binaries and names: + +| Test binary | Test name | +| --- | --- | +| `netlink_snapshot` | `netlink_snapshot_reads_the_prepared_namespace` | +| `netlink_snapshot` | `netlink_snapshot_reads_gre_tunnel_metadata` | +| `route_monitor` | `route_monitor_publishes_live_route_updates` | +| `route_monitor` | `route_monitor_publishes_live_neighbor_updates` | +| `route_monitor` | `route_monitor_publishes_link_removals` | +| `route_monitor` | `route_monitor_publishes_live_gre_route_updates` | +| `router_snapshot` | `router_snapshot_resolves_gre_routes_from_netlink` | +| `transmitter_smoke` | `transmitter_sends_udp_payload_over_veth_in_copy_mode` | +| `transmitter_smoke` | `transmitter_sends_udp_payload_over_gre_tunnel_in_copy_mode` | + +## Test Coverage + +`netlink_snapshot`: + +- `netlink_snapshot_reads_the_prepared_namespace`: reads interfaces, routes, and neighbors from the temporary namespace and verifies the prepared veth route and permanent neighbor are visible through netlink. +- `netlink_snapshot_reads_gre_tunnel_metadata`: reads a GRE tunnel interface from netlink and verifies its local endpoint, remote endpoint, TTL, and TOS metadata. + +`route_monitor`: + +- `route_monitor_publishes_live_route_updates`: verifies the route monitor publishes an added route with the expected next hop and later removes it after the route is deleted. +- `route_monitor_publishes_live_neighbor_updates`: verifies the route monitor publishes initial, replaced, and removed neighbor state for an existing route. +- `route_monitor_publishes_link_removals`: verifies deleting a link removes the route that depended on that link from the published router. +- `route_monitor_publishes_live_gre_route_updates`: verifies the route monitor publishes a GRE overlay route, including the underlay MAC and GRE tunnel metadata, and removes it when the GRE link is deleted. + +`router_snapshot`: + +- `router_snapshot_resolves_gre_routes_from_netlink`: verifies router snapshots resolve GRE overlay routes with the expected preferred source, underlay MAC, tunnel endpoints, TTL, and TOS. + +`transmitter_smoke`: + +- `transmitter_sends_udp_payload_over_veth_in_copy_mode`: builds the copy-mode transmitter, sends a UDP payload through `XdpSender`, and verifies the raw Ethernet/IP/UDP frame received on the peer veth. +- `transmitter_sends_udp_payload_over_gre_tunnel_in_copy_mode`: builds the copy-mode transmitter for a GRE route, sends a UDP payload through `XdpSender`, and verifies the GRE-encapsulated outer and inner packet fields. diff --git a/xdp/tests/common/mod.rs b/xdp/tests/common/mod.rs new file mode 100644 index 00000000000..163d4b17c7d --- /dev/null +++ b/xdp/tests/common/mod.rs @@ -0,0 +1,288 @@ +#![cfg(target_os = "linux")] +#![allow(dead_code)] + +use { + agave_xdp::netlink::MacAddress, + std::{ + ffi::CString, + fs::File, + os::fd::AsRawFd, + path::{Path, PathBuf}, + process::Command, + sync::OnceLock, + thread, + time::{Duration, Instant}, + }, +}; + +const LEFT_IFACE: &str = "axdp0"; +const RIGHT_IFACE: &str = "axdp1"; +const GRE_IFACE: &str = "gxdp0"; + +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct TestLinks { + pub left_name: String, + pub right_name: String, + pub left_if_index: u32, + pub right_if_index: u32, + pub left_ip: std::net::Ipv4Addr, + pub right_ip: std::net::Ipv4Addr, + pub left_mac: MacAddress, + pub right_mac: MacAddress, +} + +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct TestGreTunnel { + pub name: String, + pub if_index: u32, + pub local_ip: std::net::Ipv4Addr, + pub remote_ip: std::net::Ipv4Addr, + pub overlay_ip: std::net::Ipv4Addr, +} + +pub struct NetNsGuard { + old_ns: File, +} + +impl NetNsGuard { + pub fn new() -> Self { + require_root(); + + let tid = unsafe { libc::syscall(libc::SYS_gettid) }; + let old_ns_path = format!("/proc/self/task/{tid}/ns/net"); + let old_ns = File::open(&old_ns_path) + .unwrap_or_else(|err| panic!("failed to open {old_ns_path}: {err}")); + + if unsafe { libc::unshare(libc::CLONE_NEWNET) } != 0 { + let err = std::io::Error::last_os_error(); + panic!("failed to unshare network namespace: {err}"); + } + + let netns = Self { old_ns }; + netns.ip(&["link", "set", "lo", "up"]); + netns + } + + pub fn ip(&self, args: &[&str]) { + run_command(ip_command(), args); + } +} + +impl Drop for NetNsGuard { + fn drop(&mut self) { + if unsafe { libc::setns(self.old_ns.as_raw_fd(), libc::CLONE_NEWNET) } == 0 { + return; + } + + let err = std::io::Error::last_os_error(); + if std::thread::panicking() { + eprintln!("failed to restore original network namespace: {err}"); + } else { + panic!("failed to restore original network namespace: {err}"); + } + } +} + +pub fn setup_veth_pair() -> TestLinks { + setup_veth_pair_named( + LEFT_IFACE, + RIGHT_IFACE, + std::net::Ipv4Addr::new(10, 0, 0, 1), + std::net::Ipv4Addr::new(10, 0, 0, 2), + MacAddress([0x02, 0xaa, 0xbb, 0xcc, 0xdd, 0x01]), + MacAddress([0x02, 0xaa, 0xbb, 0xcc, 0xdd, 0x02]), + ) +} + +pub fn setup_gre_tunnel(underlay: &TestLinks) -> TestGreTunnel { + setup_gre_tunnel_named( + GRE_IFACE, + underlay.left_ip, + underlay.right_ip, + std::net::Ipv4Addr::new(192, 0, 2, 1), + ) +} + +pub fn setup_gre_tunnel_named( + name: &str, + local_ip: std::net::Ipv4Addr, + remote_ip: std::net::Ipv4Addr, + overlay_ip: std::net::Ipv4Addr, +) -> TestGreTunnel { + let local = local_ip.to_string(); + let remote = remote_ip.to_string(); + run_ip(&[ + "tunnel", "add", name, "mode", "gre", "local", &local, "remote", &remote, "ttl", "64", + ]); + add_ipv4_addr(&format!("{overlay_ip}/32"), name); + set_link_up(name); + + TestGreTunnel { + name: name.to_string(), + if_index: if_index(name), + local_ip, + remote_ip, + overlay_ip, + } +} + +pub fn setup_veth_pair_named( + left_name: &str, + right_name: &str, + left_ip: std::net::Ipv4Addr, + right_ip: std::net::Ipv4Addr, + left_mac: MacAddress, + right_mac: MacAddress, +) -> TestLinks { + run_ip(&[ + "link", "add", left_name, "type", "veth", "peer", "name", right_name, + ]); + set_link_mac(left_name, &left_mac.to_string()); + set_link_mac(right_name, &right_mac.to_string()); + add_ipv4_addr(&format!("{left_ip}/24"), left_name); + add_ipv4_addr(&format!("{right_ip}/24"), right_name); + set_link_up(left_name); + set_link_up(right_name); + + TestLinks { + left_name: left_name.to_string(), + right_name: right_name.to_string(), + left_if_index: if_index(left_name), + right_if_index: if_index(right_name), + left_ip, + right_ip, + left_mac, + right_mac, + } +} + +pub fn add_route(destination: &str, via: std::net::Ipv4Addr, dev: &str) { + let via = via.to_string(); + run_ip(&["route", "replace", destination, "via", &via, "dev", dev]); +} + +pub fn add_route_to_dev(destination: &str, dev: &str) { + run_ip(&["route", "replace", destination, "dev", dev]); +} + +pub fn add_route_to_dev_with_src(destination: &str, dev: &str, src: std::net::Ipv4Addr) { + let src = src.to_string(); + run_ip(&["route", "replace", destination, "dev", dev, "src", &src]); +} + +#[allow(dead_code)] +pub fn delete_route(destination: &str) { + run_ip(&["route", "del", destination]); +} + +pub fn replace_neighbor(ip: std::net::Ipv4Addr, mac: MacAddress, dev: &str) { + let ip = ip.to_string(); + let mac = mac.to_string(); + run_ip(&[ + "neigh", + "replace", + &ip, + "lladdr", + &mac, + "dev", + dev, + "nud", + "permanent", + ]); +} + +pub fn delete_neighbor(ip: std::net::Ipv4Addr, dev: &str) { + let ip = ip.to_string(); + run_ip(&["neigh", "del", &ip, "dev", dev]); +} + +#[allow(dead_code)] +pub fn wait_until(description: &str, timeout: Duration, mut predicate: F) -> T +where + F: FnMut() -> Option, +{ + let start = Instant::now(); + loop { + if let Some(value) = predicate() { + return value; + } + + if start.elapsed() >= timeout { + panic!("timed out waiting for {description}"); + } + + thread::sleep(Duration::from_millis(10)); + } +} + +fn require_root() { + assert_eq!( + unsafe { libc::geteuid() }, + 0, + "XDP integration tests require root. Use `cargo xtask xdp-test --runner \"sudo -n -E\"`.", + ); +} + +fn set_link_mac(dev: &str, mac: &str) { + run_ip(&["link", "set", "dev", dev, "address", mac]); +} + +fn set_link_up(dev: &str) { + run_ip(&["link", "set", "dev", dev, "up"]); +} + +fn add_ipv4_addr(addr: &str, dev: &str) { + run_ip(&["addr", "add", addr, "dev", dev]); +} + +pub fn if_index(dev: &str) -> u32 { + let dev = CString::new(dev).expect("interface name must not contain NUL"); + let index = unsafe { libc::if_nametoindex(dev.as_ptr()) }; + assert_ne!(index, 0, "failed to resolve ifindex for interface"); + index +} + +fn run_ip(args: &[&str]) { + run_command(ip_command(), args); +} + +fn run_command(program: &Path, args: &[&str]) { + let output = Command::new(program) + .args(args) + .output() + .unwrap_or_else(|err| panic!("failed to run {program:?} {args:?}: {err}")); + if output.status.success() { + return; + } + + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + panic!( + "{program:?} {args:?} failed: {} +stdout: +{} +stderr: +{}", + output.status, stdout, stderr + ); +} + +fn ip_command() -> &'static PathBuf { + static IP_COMMAND: OnceLock = OnceLock::new(); + IP_COMMAND.get_or_init(|| { + let mut candidates = std::env::var_os("IP") + .into_iter() + .map(PathBuf::from) + .chain([ + PathBuf::from("/usr/sbin/ip"), + PathBuf::from("/sbin/ip"), + PathBuf::from("ip"), + ]); + + candidates + .find(|path| path == Path::new("ip") || path.exists()) + .unwrap_or_else(|| PathBuf::from("ip")) + }) +} diff --git a/xdp/tests/netlink_snapshot.rs b/xdp/tests/netlink_snapshot.rs new file mode 100644 index 00000000000..c4f9574a996 --- /dev/null +++ b/xdp/tests/netlink_snapshot.rs @@ -0,0 +1,67 @@ +#![cfg(target_os = "linux")] + +mod common; + +use { + agave_xdp::{ + netlink::{netlink_get_interfaces, netlink_get_neighbors, netlink_get_routes}, + route::RouteTable, + }, + libc::{AF_INET, NUD_PERMANENT}, + std::net::{IpAddr, Ipv4Addr}, +}; + +#[test] +fn netlink_snapshot_reads_the_prepared_namespace() { + let _netns = common::NetNsGuard::new(); + let links = common::setup_veth_pair(); + + let routed_prefix = "203.0.113.0/24"; + common::replace_neighbor(links.right_ip, links.right_mac, &links.left_name); + common::add_route(routed_prefix, links.right_ip, &links.left_name); + + let interfaces = netlink_get_interfaces(AF_INET as u8).expect("read interfaces from netlink"); + assert!(interfaces + .iter() + .any(|interface| interface.if_index == links.left_if_index)); + assert!(interfaces + .iter() + .any(|interface| interface.if_index == links.right_if_index)); + + let routes = + netlink_get_routes(AF_INET as u8, u32::from(RouteTable::Main)).expect("read routes"); + assert!(routes.iter().any(|route| { + route.destination == Some(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 0))) + && route.gateway == Some(IpAddr::V4(links.right_ip)) + && route.out_if_index == Some(links.left_if_index as i32) + && route.dst_len == 24 + })); + + let neighbors = + netlink_get_neighbors(None, AF_INET as u8).expect("read neighbor table from netlink"); + assert!(neighbors.iter().any(|neighbor| { + neighbor.destination == Some(IpAddr::V4(links.right_ip)) + && neighbor.lladdr == Some(links.right_mac) + && neighbor.ifindex == links.left_if_index as i32 + && neighbor.state == NUD_PERMANENT + })); +} + +#[test] +fn netlink_snapshot_reads_gre_tunnel_metadata() { + let _netns = common::NetNsGuard::new(); + let links = common::setup_veth_pair(); + let gre = common::setup_gre_tunnel(&links); + + let interfaces = netlink_get_interfaces(AF_INET as u8).expect("read interfaces from netlink"); + let tunnel = interfaces + .iter() + .find(|interface| interface.if_index == gre.if_index) + .and_then(|interface| interface.gre_tunnel.as_ref()) + .expect("read GRE tunnel metadata from netlink"); + + assert_eq!(tunnel.local, IpAddr::V4(gre.local_ip)); + assert_eq!(tunnel.remote, IpAddr::V4(gre.remote_ip)); + assert_eq!(tunnel.ttl, 64); + assert_eq!(tunnel.tos, 0); +} diff --git a/xdp/tests/route_monitor.rs b/xdp/tests/route_monitor.rs new file mode 100644 index 00000000000..54fbd64f6b0 --- /dev/null +++ b/xdp/tests/route_monitor.rs @@ -0,0 +1,260 @@ +#![cfg(target_os = "linux")] + +mod common; + +use { + agave_xdp::{ + netlink::MacAddress, + route::{RouteError, RouteTable, Router}, + route_monitor::RouteMonitor, + }, + arc_swap::ArcSwap, + std::{ + net::{IpAddr, Ipv4Addr}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, + }, +}; + +fn start_route_monitor() -> ( + Arc>, + Arc, + std::thread::JoinHandle<()>, +) { + let router = Router::new().expect("build initial router"); + let atomic_router = Arc::new(ArcSwap::from_pointee(router)); + let exit = Arc::new(AtomicBool::new(false)); + let handle = RouteMonitor::start( + Arc::clone(&atomic_router), + RouteTable::Main, + Arc::clone(&exit), + Duration::ZERO, + || {}, + ); + (atomic_router, exit, handle) +} + +#[test] +fn route_monitor_publishes_live_route_updates() { + let _netns = common::NetNsGuard::new(); + let links = common::setup_veth_pair(); + + let (atomic_router, exit, handle) = start_route_monitor(); + + let routed_destination = Ipv4Addr::new(203, 0, 113, 7); + assert!(matches!( + atomic_router.load().route_v4(routed_destination), + Err(RouteError::NoRouteFound(_)) + )); + + common::replace_neighbor(links.right_ip, links.right_mac, &links.left_name); + common::add_route("203.0.113.0/24", links.right_ip, &links.left_name); + + common::wait_until( + "the route monitor to publish a newly added route", + Duration::from_secs(2), + || { + let router = atomic_router.load(); + match router.route_v4(routed_destination) { + Ok(next_hop) + if next_hop.if_index == links.left_if_index + && next_hop.ip_addr == IpAddr::V4(links.right_ip) + && next_hop.mac_addr == Some(links.right_mac) => + { + Some(()) + } + _ => None, + } + }, + ); + + common::delete_route("203.0.113.0/24"); + common::wait_until( + "the route monitor to publish a removed route", + Duration::from_secs(2), + || { + matches!( + atomic_router.load().route_v4(routed_destination), + Err(RouteError::NoRouteFound(_)) + ) + .then_some(()) + }, + ); + + exit.store(true, Ordering::Relaxed); + handle.join().expect("join route monitor thread"); +} + +#[test] +fn route_monitor_publishes_live_neighbor_updates() { + let _netns = common::NetNsGuard::new(); + let links = common::setup_veth_pair(); + + let (atomic_router, exit, handle) = start_route_monitor(); + let routed_destination = Ipv4Addr::new(203, 0, 113, 7); + + common::add_route("203.0.113.0/24", links.right_ip, &links.left_name); + let initial_mac = links.right_mac; + common::replace_neighbor(links.right_ip, initial_mac, &links.left_name); + + common::wait_until( + "the route monitor to publish the initial neighbor", + Duration::from_secs(2), + || { + let router = atomic_router.load(); + match router.route_v4(routed_destination) { + Ok(next_hop) + if next_hop.if_index == links.left_if_index + && next_hop.ip_addr == IpAddr::V4(links.right_ip) + && next_hop.mac_addr == Some(initial_mac) => + { + Some(()) + } + _ => None, + } + }, + ); + + let updated_mac = MacAddress([0x02, 0xaa, 0xbb, 0xcc, 0xdd, 0x44]); + common::replace_neighbor(links.right_ip, updated_mac, &links.left_name); + + common::wait_until( + "the route monitor to publish a replaced neighbor", + Duration::from_secs(2), + || { + let router = atomic_router.load(); + match router.route_v4(routed_destination) { + Ok(next_hop) if next_hop.mac_addr == Some(updated_mac) => Some(()), + _ => None, + } + }, + ); + + common::delete_neighbor(links.right_ip, &links.left_name); + common::wait_until( + "the route monitor to publish a removed neighbor", + Duration::from_secs(2), + || { + let router = atomic_router.load(); + match router.route_v4(routed_destination) { + Ok(next_hop) + if next_hop.if_index == links.left_if_index + && next_hop.ip_addr == IpAddr::V4(links.right_ip) + && next_hop.mac_addr.is_none() => + { + Some(()) + } + _ => None, + } + }, + ); + + exit.store(true, Ordering::Relaxed); + handle.join().expect("join route monitor thread"); +} + +#[test] +fn route_monitor_publishes_link_removals() { + let netns = common::NetNsGuard::new(); + let links = common::setup_veth_pair(); + + common::replace_neighbor(links.right_ip, links.right_mac, &links.left_name); + common::add_route("203.0.113.0/24", links.right_ip, &links.left_name); + + let (atomic_router, exit, handle) = start_route_monitor(); + let routed_destination = Ipv4Addr::new(203, 0, 113, 7); + common::wait_until( + "the route monitor to publish the initial link-backed route", + Duration::from_secs(2), + || { + let router = atomic_router.load(); + match router.route_v4(routed_destination) { + Ok(next_hop) + if next_hop.if_index == links.left_if_index + && next_hop.ip_addr == IpAddr::V4(links.right_ip) => + { + Some(()) + } + _ => None, + } + }, + ); + + netns.ip(&["link", "del", &links.left_name]); + common::wait_until( + "the route monitor to publish a removed link", + Duration::from_secs(2), + || { + matches!( + atomic_router.load().route_v4(routed_destination), + Err(RouteError::NoRouteFound(_)) + ) + .then_some(()) + }, + ); + + exit.store(true, Ordering::Relaxed); + handle.join().expect("join route monitor thread"); +} + +#[test] +fn route_monitor_publishes_live_gre_route_updates() { + let netns = common::NetNsGuard::new(); + let links = common::setup_veth_pair(); + + let (atomic_router, exit, handle) = start_route_monitor(); + let overlay_destination = Ipv4Addr::new(192, 0, 2, 99); + assert!(matches!( + atomic_router.load().route_v4(overlay_destination), + Err(RouteError::NoRouteFound(_)) + )); + + common::replace_neighbor(links.right_ip, links.right_mac, &links.left_name); + common::add_route_to_dev(&format!("{}/32", links.right_ip), &links.left_name); + let gre = common::setup_gre_tunnel(&links); + common::add_route_to_dev_with_src("192.0.2.0/24", &gre.name, gre.overlay_ip); + + common::wait_until( + "the route monitor to publish a GRE overlay route", + Duration::from_secs(2), + || { + let router = atomic_router.load(); + match router.route_v4(overlay_destination) { + Ok(next_hop) + if next_hop.if_index == gre.if_index + && next_hop.ip_addr == IpAddr::V4(overlay_destination) + && next_hop.mac_addr == Some(links.right_mac) + && next_hop.preferred_src_ip == Some(gre.overlay_ip) + && next_hop.gre.as_ref().is_some_and(|gre_route| { + gre_route.if_index == gre.if_index + && gre_route.mac_addr == links.right_mac + && gre_route.tunnel_info.local == IpAddr::V4(gre.local_ip) + && gre_route.tunnel_info.remote == IpAddr::V4(gre.remote_ip) + }) => + { + Some(()) + } + _ => None, + } + }, + ); + + netns.ip(&["link", "del", &gre.name]); + common::wait_until( + "the route monitor to publish a removed GRE link", + Duration::from_secs(2), + || { + matches!( + atomic_router.load().route_v4(overlay_destination), + Err(RouteError::NoRouteFound(_)) + ) + .then_some(()) + }, + ); + + exit.store(true, Ordering::Relaxed); + handle.join().expect("join route monitor thread"); +} diff --git a/xdp/tests/router_snapshot.rs b/xdp/tests/router_snapshot.rs new file mode 100644 index 00000000000..c363ece8a44 --- /dev/null +++ b/xdp/tests/router_snapshot.rs @@ -0,0 +1,43 @@ +#![cfg(target_os = "linux")] + +mod common; + +use { + agave_xdp::route::{RouteTable, Router, RoutingTables}, + std::net::{IpAddr, Ipv4Addr}, +}; + +#[test] +fn router_snapshot_resolves_gre_routes_from_netlink() { + let _netns = common::NetNsGuard::new(); + let links = common::setup_veth_pair(); + + common::replace_neighbor(links.right_ip, links.right_mac, &links.left_name); + common::add_route_to_dev(&format!("{}/32", links.right_ip), &links.left_name); + let gre = common::setup_gre_tunnel(&links); + common::add_route_to_dev_with_src("192.0.2.0/24", &gre.name, gre.overlay_ip); + + let router_from_tables = + Router::from_tables(RoutingTables::from_netlink(RouteTable::Main).expect("read tables")) + .expect("build router from snapshot tables"); + let router_from_netlink = Router::new().expect("build router directly from netlink"); + let overlay_destination = Ipv4Addr::new(192, 0, 2, 99); + + for router in [&router_from_tables, &router_from_netlink] { + let next_hop = router + .route_v4(overlay_destination) + .expect("resolve GRE overlay route"); + assert_eq!(next_hop.if_index, gre.if_index); + assert_eq!(next_hop.ip_addr, IpAddr::V4(overlay_destination)); + assert_eq!(next_hop.mac_addr, Some(links.right_mac)); + assert_eq!(next_hop.preferred_src_ip, Some(gre.overlay_ip)); + + let gre_route = next_hop.gre.as_ref().expect("route should use GRE"); + assert_eq!(gre_route.if_index, gre.if_index); + assert_eq!(gre_route.mac_addr, links.right_mac); + assert_eq!(gre_route.tunnel_info.local, IpAddr::V4(gre.local_ip)); + assert_eq!(gre_route.tunnel_info.remote, IpAddr::V4(gre.remote_ip)); + assert_eq!(gre_route.tunnel_info.ttl, 64); + assert_eq!(gre_route.tunnel_info.tos, 0); + } +} diff --git a/xdp/tests/transmitter_smoke.rs b/xdp/tests/transmitter_smoke.rs new file mode 100644 index 00000000000..5657993e878 --- /dev/null +++ b/xdp/tests/transmitter_smoke.rs @@ -0,0 +1,425 @@ +#![cfg(target_os = "linux")] + +mod common; + +use { + agave_cpu_utils::cpu_affinity, + agave_xdp::{ + netlink::MacAddress, + transmitter::{BytesTxPacket, TransmitterBuilder, XdpConfig}, + }, + bytes::Bytes, + std::{ + io, mem, + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + os::fd::{AsRawFd, FromRawFd, OwnedFd}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::{Duration, Instant}, + }, +}; + +fn transmitter_cpu() -> Option { + let cores = cpu_affinity(None).expect("linux provides affine cores"); + if cores.len() < 2 { + eprintln!("skipping transmitter smoke test: at least two CPU cores are required"); + return None; + } + cores.first().map(|core| **core) +} + +struct PacketSocket { + fd: OwnedFd, +} + +impl PacketSocket { + fn bind(if_index: u32) -> io::Result { + let fd = unsafe { + libc::socket( + libc::AF_PACKET, + libc::SOCK_RAW | libc::SOCK_CLOEXEC, + (libc::ETH_P_ALL as u16).to_be() as i32, + ) + }; + if fd < 0 { + return Err(io::Error::last_os_error()); + } + let fd = unsafe { OwnedFd::from_raw_fd(fd) }; + let addr = libc::sockaddr_ll { + sll_family: libc::AF_PACKET as u16, + sll_protocol: (libc::ETH_P_ALL as u16).to_be(), + sll_ifindex: if_index as i32, + sll_hatype: 0, + sll_pkttype: 0, + sll_halen: 0, + sll_addr: [0; 8], + }; + let rc = unsafe { + libc::bind( + fd.as_raw_fd(), + &addr as *const _ as *const libc::sockaddr, + mem::size_of::() as libc::socklen_t, + ) + }; + if rc < 0 { + return Err(io::Error::last_os_error()); + } + Ok(Self { fd }) + } + + fn recv_matching_udp( + &self, + expected: &ExpectedUdpPacket<'_>, + timeout: Duration, + ) -> io::Result> { + self.recv_matching_payload("matching UDP frame", timeout, |frame| { + matching_udp_payload(frame, expected) + }) + } + + fn recv_matching_gre_udp( + &self, + expected: &ExpectedGreUdpPacket<'_>, + timeout: Duration, + ) -> io::Result> { + self.recv_matching_payload("matching GRE UDP frame", timeout, |frame| { + matching_gre_udp_payload(frame, expected) + }) + } + + fn recv_matching_payload( + &self, + description: &str, + timeout: Duration, + mut matcher: F, + ) -> io::Result> + where + F: for<'a> FnMut(&'a [u8]) -> Option<&'a [u8]>, + { + let deadline = Instant::now().checked_add(timeout).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "timeout overflows instant") + })?; + let mut frame = [0u8; 2048]; + loop { + let now = Instant::now(); + if now >= deadline { + return Err(io::Error::new( + io::ErrorKind::TimedOut, + format!("timed out waiting for {description}"), + )); + } + let remaining = deadline.saturating_duration_since(now); + let mut pfd = libc::pollfd { + fd: self.fd.as_raw_fd(), + events: libc::POLLIN, + revents: 0, + }; + let rc = unsafe { + libc::poll( + &mut pfd, + 1, + remaining.as_millis().min(i32::MAX as u128) as i32, + ) + }; + if rc < 0 { + let err = io::Error::last_os_error(); + if err.kind() == io::ErrorKind::Interrupted { + continue; + } + return Err(err); + } + if rc == 0 { + continue; + } + + let len = unsafe { + libc::recv( + self.fd.as_raw_fd(), + frame.as_mut_ptr() as *mut libc::c_void, + frame.len(), + 0, + ) + }; + if len < 0 { + let err = io::Error::last_os_error(); + if err.kind() == io::ErrorKind::Interrupted { + continue; + } + return Err(err); + } + let frame = &frame[..len as usize]; + if let Some(payload) = matcher(frame) { + return Ok(payload.to_vec()); + } + } + } +} + +struct ExpectedUdpPacket<'a> { + src_mac: MacAddress, + dst_mac: MacAddress, + src_ip: Ipv4Addr, + dst_ip: Ipv4Addr, + src_port: u16, + dst_port: u16, + payload: &'a [u8], +} + +struct ExpectedGreUdpPacket<'a> { + outer_src_mac: MacAddress, + outer_dst_mac: MacAddress, + outer_src_ip: Ipv4Addr, + outer_dst_ip: Ipv4Addr, + inner_src_ip: Ipv4Addr, + inner_dst_ip: Ipv4Addr, + src_port: u16, + dst_port: u16, + payload: &'a [u8], +} + +struct ExpectedUdpDatagram<'a> { + src_ip: Ipv4Addr, + dst_ip: Ipv4Addr, + src_port: u16, + dst_port: u16, + payload: &'a [u8], +} + +fn matching_udp_payload<'a>(frame: &'a [u8], expected: &ExpectedUdpPacket<'_>) -> Option<&'a [u8]> { + const ETH_HEADER_SIZE: usize = 14; + + if frame.len() < ETH_HEADER_SIZE { + return None; + } + if frame[0..6] != expected.dst_mac.0 || frame[6..12] != expected.src_mac.0 { + return None; + } + if u16::from_be_bytes([frame[12], frame[13]]) != libc::ETH_P_IP as u16 { + return None; + } + + matching_ipv4_udp_payload( + &frame[ETH_HEADER_SIZE..], + &ExpectedUdpDatagram { + src_ip: expected.src_ip, + dst_ip: expected.dst_ip, + src_port: expected.src_port, + dst_port: expected.dst_port, + payload: expected.payload, + }, + ) +} + +fn matching_gre_udp_payload<'a>( + frame: &'a [u8], + expected: &ExpectedGreUdpPacket<'_>, +) -> Option<&'a [u8]> { + const ETH_HEADER_SIZE: usize = 14; + const IPV4_MIN_HEADER_SIZE: usize = 20; + const GRE_HEADER_SIZE: usize = 4; + const GRE_FLAGS_VERSION_BASIC: u16 = 0; + + if frame.len() < ETH_HEADER_SIZE.checked_add(IPV4_MIN_HEADER_SIZE)? { + return None; + } + if frame[0..6] != expected.outer_dst_mac.0 || frame[6..12] != expected.outer_src_mac.0 { + return None; + } + if u16::from_be_bytes([frame[12], frame[13]]) != libc::ETH_P_IP as u16 { + return None; + } + + let outer_ip = &frame[ETH_HEADER_SIZE..]; + let outer_ihl = usize::from(outer_ip[0] & 0x0f).checked_mul(4)?; + let gre_offset = ETH_HEADER_SIZE.checked_add(outer_ihl)?; + let min_frame_len = gre_offset + .checked_add(GRE_HEADER_SIZE)? + .checked_add(IPV4_MIN_HEADER_SIZE)?; + if outer_ihl < IPV4_MIN_HEADER_SIZE || frame.len() < min_frame_len { + return None; + } + if outer_ip[9] != libc::IPPROTO_GRE as u8 { + return None; + } + if outer_ip[12..16] != expected.outer_src_ip.octets() + || outer_ip[16..20] != expected.outer_dst_ip.octets() + { + return None; + } + + let gre = &frame[gre_offset..]; + if u16::from_be_bytes([gre[0], gre[1]]) != GRE_FLAGS_VERSION_BASIC { + return None; + } + if u16::from_be_bytes([gre[2], gre[3]]) != libc::ETH_P_IP as u16 { + return None; + } + + let inner_offset = gre_offset.checked_add(GRE_HEADER_SIZE)?; + matching_ipv4_udp_payload( + frame.get(inner_offset..)?, + &ExpectedUdpDatagram { + src_ip: expected.inner_src_ip, + dst_ip: expected.inner_dst_ip, + src_port: expected.src_port, + dst_port: expected.dst_port, + payload: expected.payload, + }, + ) +} + +fn matching_ipv4_udp_payload<'a>( + ip: &'a [u8], + expected: &ExpectedUdpDatagram<'_>, +) -> Option<&'a [u8]> { + const IPV4_MIN_HEADER_SIZE: usize = 20; + const UDP_HEADER_SIZE: usize = 8; + + let min_udp_len = IPV4_MIN_HEADER_SIZE.checked_add(UDP_HEADER_SIZE)?; + if ip.len() < min_udp_len { + return None; + } + + let ihl = usize::from(ip[0] & 0x0f).checked_mul(4)?; + let min_packet_len = ihl.checked_add(UDP_HEADER_SIZE)?; + if ihl < IPV4_MIN_HEADER_SIZE || ip.len() < min_packet_len { + return None; + } + if ip[9] != libc::IPPROTO_UDP as u8 { + return None; + } + if ip[12..16] != expected.src_ip.octets() || ip[16..20] != expected.dst_ip.octets() { + return None; + } + + let udp = &ip[ihl..]; + if u16::from_be_bytes([udp[0], udp[1]]) != expected.src_port + || u16::from_be_bytes([udp[2], udp[3]]) != expected.dst_port + { + return None; + } + let udp_len = usize::from(u16::from_be_bytes([udp[4], udp[5]])); + if udp_len < UDP_HEADER_SIZE || udp.len() < udp_len { + return None; + } + let payload = &udp[UDP_HEADER_SIZE..udp_len]; + (payload == expected.payload).then_some(payload) +} + +#[test] +fn transmitter_sends_udp_payload_over_veth_in_copy_mode() { + let Some(cpu_id) = transmitter_cpu() else { + return; + }; + + let _netns = common::NetNsGuard::new(); + let links = common::setup_veth_pair(); + common::replace_neighbor(links.right_ip, links.right_mac, &links.left_name); + + let receiver = PacketSocket::bind(links.right_if_index).expect("bind raw packet receiver"); + let dst_port = 45_678; + let src_port = 12_345; + let destination = SocketAddr::V4(SocketAddrV4::new(links.right_ip, dst_port)); + let payload = Bytes::from_static(b"agave-xdp-transmitter-smoke"); + + let exit = Arc::new(AtomicBool::new(false)); + let mut config = XdpConfig::new(Some(links.left_name.clone()), vec![cpu_id], false); + config.tx_channel_cap = 16; + + let (transmitter, sender) = TransmitterBuilder::new(config, Arc::clone(&exit)) + .expect("build copy-mode transmitter") + .build(); + + let packet = BytesTxPacket::new( + SocketAddrV4::new(links.left_ip, src_port), + destination, + None, + payload.clone(), + ); + sender + .try_send(0, packet) + .expect("queue packet through XdpSender::try_send"); + + let received = receiver + .recv_matching_udp( + &ExpectedUdpPacket { + src_mac: links.left_mac, + dst_mac: links.right_mac, + src_ip: links.left_ip, + dst_ip: links.right_ip, + src_port, + dst_port, + payload: payload.as_ref(), + }, + Duration::from_secs(3), + ) + .expect("receive UDP frame from AF_XDP transmitter"); + assert_eq!(received, payload.as_ref()); + + exit.store(true, Ordering::Relaxed); + drop(sender); + transmitter.join().expect("join transmitter threads"); +} + +#[test] +fn transmitter_sends_udp_payload_over_gre_tunnel_in_copy_mode() { + let Some(cpu_id) = transmitter_cpu() else { + return; + }; + + let _netns = common::NetNsGuard::new(); + let links = common::setup_veth_pair(); + common::replace_neighbor(links.right_ip, links.right_mac, &links.left_name); + common::add_route_to_dev(&format!("{}/32", links.right_ip), &links.left_name); + let gre = common::setup_gre_tunnel(&links); + common::add_route_to_dev_with_src("192.0.2.0/24", &gre.name, gre.overlay_ip); + + let receiver = PacketSocket::bind(links.right_if_index).expect("bind raw packet receiver"); + let dst_port = 45_679; + let src_port = 12_346; + let overlay_destination = Ipv4Addr::new(192, 0, 2, 99); + let destination = SocketAddr::V4(SocketAddrV4::new(overlay_destination, dst_port)); + let payload = Bytes::from_static(b"agave-xdp-transmitter-gre-smoke"); + + let exit = Arc::new(AtomicBool::new(false)); + let mut config = XdpConfig::new(Some(links.left_name.clone()), vec![cpu_id], false); + config.tx_channel_cap = 16; + + let (transmitter, sender) = TransmitterBuilder::new(config, Arc::clone(&exit)) + .expect("build copy-mode transmitter") + .build(); + + let packet = BytesTxPacket::new( + SocketAddrV4::new(links.left_ip, src_port), + destination, + None, + payload.clone(), + ); + sender + .try_send(0, packet) + .expect("queue packet through XdpSender::try_send"); + + let received = receiver + .recv_matching_gre_udp( + &ExpectedGreUdpPacket { + outer_src_mac: links.left_mac, + outer_dst_mac: links.right_mac, + outer_src_ip: gre.local_ip, + outer_dst_ip: gre.remote_ip, + inner_src_ip: gre.overlay_ip, + inner_dst_ip: overlay_destination, + src_port, + dst_port, + payload: payload.as_ref(), + }, + Duration::from_secs(3), + ) + .expect("receive GRE-encapsulated UDP frame from AF_XDP transmitter"); + assert_eq!(received, payload.as_ref()); + + exit.store(true, Ordering::Relaxed); + drop(sender); + transmitter.join().expect("join transmitter threads"); +}