From 027a46bc0223c068a5e33ef48e68a556956ba9b1 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Tue, 23 Sep 2025 15:39:33 +0200 Subject: [PATCH 1/2] ROX-30714: track cgroup ID to container ID mapping This change greatly reduces the amount of data sent in the ringbuffer and the effort the BPF hooks need to retrieve the cgroup of the current process. In exchange for these benefits, we now need to lookup and keep track of the cgroups and container IDs that exist on the system ourselves by iterating over the cgroupsfs. TODO: Add integration tests with containers. --- fact-ebpf/src/bpf/maps.h | 1 - fact-ebpf/src/bpf/process.h | 71 +----------- fact-ebpf/src/bpf/types.h | 2 +- fact/src/bpf.rs | 20 +++- fact/src/cgroup.rs | 174 +++++++++++++++++++++++++++++ fact/src/event.rs | 213 +++++++++++++----------------------- fact/src/host_info.rs | 27 +++++ fact/src/lib.rs | 13 ++- 8 files changed, 306 insertions(+), 215 deletions(-) create mode 100644 fact/src/cgroup.rs diff --git a/fact-ebpf/src/bpf/maps.h b/fact-ebpf/src/bpf/maps.h index 7b160f80..d4e9c79a 100644 --- a/fact-ebpf/src/bpf/maps.h +++ b/fact-ebpf/src/bpf/maps.h @@ -12,7 +12,6 @@ */ struct helper_t { char buf[PATH_MAX * 2]; - const unsigned char* array[16]; }; struct { diff --git a/fact-ebpf/src/bpf/process.h b/fact-ebpf/src/bpf/process.h index 0032d3b2..3a5b702a 100644 --- a/fact-ebpf/src/bpf/process.h +++ b/fact-ebpf/src/bpf/process.h @@ -11,71 +11,6 @@ #include // clang-format on -__always_inline static const char* get_memory_cgroup(struct helper_t* helper) { - if (!bpf_core_enum_value_exists(enum cgroup_subsys_id, memory_cgrp_id)) { - return NULL; - } - - struct task_struct* task = (struct task_struct*)bpf_get_current_task(); - - // We're guessing which cgroup controllers are enabled for this task. The - // assumption is that memory controller is present more often than - // cpu & cpuacct. - struct kernfs_node* kn = BPF_CORE_READ(task, cgroups, subsys[memory_cgrp_id], cgroup, kn); - if (kn == NULL) { - return NULL; - } - - int i = 0; - for (; i < 16; i++) { - helper->array[i] = (const unsigned char*)BPF_CORE_READ(kn, name); - if (bpf_core_field_exists(kn->__parent)) { - kn = BPF_CORE_READ(kn, __parent); - } else { - struct { - struct kernfs_node* parent; - }* kn_old = (void*)kn; - kn = BPF_CORE_READ(kn_old, parent); - } - if (kn == NULL) { - break; - } - } - - if (i == 16) { - i--; - } - - int offset = 0; - for (; i >= 0 && offset < PATH_MAX; i--) { - // Skip empty directories - if (helper->array[i] == NULL) { - continue; - } - - helper->buf[offset & (PATH_MAX - 1)] = '/'; - if (++offset >= PATH_MAX) { - return NULL; - } - - int len = bpf_probe_read_kernel_str(&helper->buf[offset & (PATH_MAX - 1)], PATH_MAX, helper->array[i]); - if (len < 0) { - // We should have skipped all empty entries, any other error is a genuine - // problem, stop processing. - return NULL; - } - - if (len == 1) { - offset--; - continue; - } - - offset += len - 1; - } - - return helper->buf; -} - __always_inline static void process_fill_lineage(process_t* p, struct helper_t* helper) { struct task_struct* task = (struct task_struct*)bpf_get_current_task(); struct path path; @@ -112,6 +47,7 @@ __always_inline static int64_t process_fill(process_t* p) { p->gid = (uid_gid >> 32) & 0xFFFFFFFF; p->login_uid = BPF_CORE_READ(task, loginuid.val); p->pid = (bpf_get_current_pid_tgid() >> 32) & 0xFFFFFFFF; + p->cgroup_id = bpf_get_current_cgroup_id(); u_int64_t err = bpf_get_current_comm(p->comm, TASK_COMM_LEN); if (err != 0) { bpf_printk("Failed to fill task comm"); @@ -144,11 +80,6 @@ __always_inline static int64_t process_fill(process_t* p) { } bpf_probe_read_str(p->exe_path, PATH_MAX, exe_path); - const char* cg = get_memory_cgroup(helper); - if (cg != NULL) { - bpf_probe_read_str(p->memory_cgroup, PATH_MAX, cg); - } - p->in_root_mount_ns = get_mount_ns() == host_mount_ns; process_fill_lineage(p, helper); diff --git a/fact-ebpf/src/bpf/types.h b/fact-ebpf/src/bpf/types.h index e9e15aa6..80b14ce5 100644 --- a/fact-ebpf/src/bpf/types.h +++ b/fact-ebpf/src/bpf/types.h @@ -22,7 +22,7 @@ typedef struct process_t { char args[4096]; unsigned int args_len; char exe_path[PATH_MAX]; - char memory_cgroup[PATH_MAX]; + unsigned long long cgroup_id; unsigned int uid; unsigned int gid; unsigned int login_uid; diff --git a/fact/src/bpf.rs b/fact/src/bpf.rs index fa25ab6b..059a7ee9 100644 --- a/fact/src/bpf.rs +++ b/fact/src/bpf.rs @@ -14,7 +14,9 @@ use tokio::{ task::JoinHandle, }; -use crate::{config::FactConfig, event::Event, host_info, metrics::EventCounter}; +use crate::{ + cgroup::ContainerIdCache, config::FactConfig, event::Event, host_info, metrics::EventCounter, +}; use fact_ebpf::{event_t, metrics_t, path_prefix_t, LPM_SIZE_MAX}; @@ -98,6 +100,7 @@ impl Bpf { mut fd: AsyncFd>, mut running: Receiver, event_counter: EventCounter, + cid_cache: ContainerIdCache, ) -> JoinHandle<()> { info!("Starting BPF worker..."); tokio::spawn(async move { @@ -108,7 +111,7 @@ impl Bpf { let ringbuf = guard.get_inner_mut(); while let Some(event) = ringbuf.next() { let event: &event_t = unsafe { &*(event.as_ptr() as *const _) }; - let event = match Event::try_from(event) { + let event = match Event::new(event, &cid_cache).await { Ok(event) => Arc::new(event), Err(e) => { error!("Failed to parse event: '{e}'"); @@ -173,15 +176,22 @@ mod bpf_tests { let (run_tx, run_rx) = watch::channel(true); // Create a metrics exporter, but don't start it let exporter = Exporter::new(bpf.get_metrics().unwrap()); - - Bpf::start_worker(tx, bpf.fd, run_rx, exporter.metrics.bpf_worker.clone()); + let cid_cache = ContainerIdCache::new(); + + Bpf::start_worker( + tx, + bpf.fd, + run_rx, + exporter.metrics.bpf_worker.clone(), + cid_cache, + ); // Create a file let file = NamedTempFile::new_in(monitored_path).expect("Failed to create temporary file"); println!("Created {file:?}"); - let expected = Event::new( + let expected = Event::from_raw_parts( file_activity_type_t::FILE_ACTIVITY_CREATION, host_info::get_hostname(), file.path().to_path_buf(), diff --git a/fact/src/cgroup.rs b/fact/src/cgroup.rs new file mode 100644 index 00000000..2bd82112 --- /dev/null +++ b/fact/src/cgroup.rs @@ -0,0 +1,174 @@ +use std::{ + collections::HashMap, + os::unix::fs::DirEntryExt, + path::PathBuf, + sync::Arc, + time::{Duration, SystemTime}, +}; + +use log::warn; +use tokio::{ + sync::{watch::Receiver, Mutex}, + task::JoinHandle, + time, +}; + +use crate::host_info::get_cgroup_paths; + +#[derive(Debug)] +struct ContainerIdEntry { + container_id: Option, + pub last_seen: SystemTime, +} + +type ContainerIdMap = HashMap; + +#[derive(Debug, Clone, Default)] +pub struct ContainerIdCache(Arc>); + +impl ContainerIdCache { + pub fn new() -> Self { + let mut map = HashMap::new(); + ContainerIdCache::update_unlocked(&mut map); + ContainerIdCache(Arc::new(Mutex::new(map))) + } + + fn update_unlocked(map: &mut ContainerIdMap) { + for root in get_cgroup_paths() { + ContainerIdCache::walk_cgroupfs(&root, map, None); + } + } + + async fn update(&mut self) { + let mut map = self.0.lock().await; + ContainerIdCache::update_unlocked(&mut map); + } + + async fn prune(&mut self) { + let now = SystemTime::now(); + self.0.lock().await.retain(|_, value| { + now.duration_since(value.last_seen).unwrap() < Duration::from_secs(30) + }) + } + + pub async fn get_container_id(&self, cgroup_id: u64) -> Option { + let mut map = self.0.lock().await; + match map.get(&cgroup_id) { + Some(entry) => entry.container_id.clone(), + None => { + // Update the container ID cache and try again + ContainerIdCache::update_unlocked(&mut map); + map.get(&cgroup_id).map(|s| s.container_id.clone())? + } + } + } + + pub fn start_worker(mut self, mut running: Receiver) -> JoinHandle<()> { + tokio::spawn(async move { + let mut update_interval = time::interval(time::Duration::from_secs(30)); + loop { + tokio::select! { + _ = update_interval.tick() => { + self.update().await; + self.prune().await; + }, + _ = running.changed() => { + if !*running.borrow() { + return; + } + } + } + } + }) + } + + fn walk_cgroupfs(path: &PathBuf, map: &mut ContainerIdMap, parent_id: Option<&str>) { + for entry in std::fs::read_dir(path).unwrap() { + let entry = match entry { + Ok(entry) => entry, + Err(e) => { + warn!("Failed to read {}: {e}", path.display()); + continue; + } + }; + + let p = entry.path(); + if !p.is_dir() { + continue; + } + + let container_id = match map.get_mut(&entry.ino()) { + Some(e) => { + e.last_seen = SystemTime::now(); + e.container_id.clone() + } + None => { + let last_component = p + .file_name() + .map(|f| f.to_str().unwrap_or("")) + .unwrap_or(""); + let container_id = match ContainerIdCache::extract_container_id(last_component) + { + Some(cid) => Some(cid), + None => parent_id.map(|f| f.to_owned()), + }; + let last_seen = SystemTime::now(); + map.insert( + entry.ino(), + ContainerIdEntry { + container_id: container_id.clone(), + last_seen, + }, + ); + container_id + } + }; + ContainerIdCache::walk_cgroupfs(&p, map, container_id.as_deref()); + } + } + + pub fn extract_container_id(cgroup: &str) -> Option { + if cgroup.is_empty() { + return None; + } + + let cgroup = cgroup.strip_suffix(".scope").unwrap_or(cgroup); + if cgroup.len() < 64 { + return None; + } + + let (prefix, id) = cgroup.split_at(cgroup.len() - 64); + + if !prefix.is_empty() && !prefix.ends_with('-') { + return None; + } + + if id.chars().all(|c| c.is_ascii_hexdigit()) { + Some(id.split_at(12).0.to_owned()) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn extract_container_id() { + let tests = [ + ("e73c55f3e7f5b6a9cfc32a89bf13e44d348bcc4fa7b079f804d61fb1532ddbe5", Some("e73c55f3e7f5")), + ("cri-containerd-219d7afb8e7450929eaeb06f2d27cbf7183bfa5b55b7275696f3df4154a979af.scope", Some("219d7afb8e74")), + ("kubelet-kubepods-burstable-pod469726a5_079d_4d15_a259_1f654b534b44.slice", None), + ("libpod-conmon-a2d2a36121868d946af912b931fc5f6b42bf84c700cef67784422b1e2c8585ee.scope", Some("a2d2a3612186")), + ("init.scope", None), + ("app-flatpak-com.github.IsmaelMartinez.teams_for_linux-384393947.scope", None), + ]; + + for (cgroup, expected) in tests { + let cid = ContainerIdCache::extract_container_id(cgroup); + assert_eq!(cid.as_deref(), expected); + } + } +} diff --git a/fact/src/event.rs b/fact/src/event.rs index a72abee3..ae46bd24 100644 --- a/fact/src/event.rs +++ b/fact/src/event.rs @@ -8,7 +8,7 @@ use uuid::Uuid; use fact_ebpf::{event_t, file_activity_type_t, lineage_t, process_t}; -use crate::host_info; +use crate::{cgroup::ContainerIdCache, host_info}; fn slice_to_string(s: &[c_char]) -> anyhow::Result { Ok(unsafe { CStr::from_ptr(s.as_ptr()) }.to_str()?.to_owned()) @@ -66,6 +66,48 @@ pub struct Process { } impl Process { + async fn new(proc: &process_t, cid_cache: &ContainerIdCache) -> anyhow::Result { + let comm = slice_to_string(proc.comm.as_slice())?; + let exe_path = slice_to_string(proc.exe_path.as_slice())?; + let container_id = cid_cache.get_container_id(proc.cgroup_id).await; + let in_root_mount_ns = proc.in_root_mount_ns != 0; + + let lineage = proc.lineage[..proc.lineage_len as usize] + .iter() + .map(Lineage::try_from) + .collect::, _>>()?; + + let mut converted_args = Vec::new(); + let args_len = proc.args_len as usize; + let mut offset = 0; + while offset < args_len { + let arg = unsafe { CStr::from_ptr(proc.args.as_ptr().add(offset)) } + .to_str()? + .to_owned(); + if arg.is_empty() { + break; + } + offset += arg.len() + 1; + converted_args.push(arg); + } + + let username = host_info::get_username(proc.uid); + + Ok(Process { + comm, + args: converted_args, + exe_path, + container_id, + uid: proc.uid, + username, + gid: proc.gid, + login_uid: proc.login_uid, + pid: proc.pid, + in_root_mount_ns, + lineage, + }) + } + /// Create a representation of the current process as best as /// possible. #[cfg(test)] @@ -79,7 +121,7 @@ impl Process { .unwrap(); let args = std::env::args().collect::>(); let cgroup = std::fs::read_to_string("/proc/self/cgroup").expect("Failed to read cgroup"); - let container_id = Process::extract_container_id(&cgroup); + let container_id = ContainerIdCache::extract_container_id(&cgroup); let uid = unsafe { libc::getuid() }; let gid = unsafe { libc::getgid() }; let pid = std::process::id(); @@ -104,30 +146,6 @@ impl Process { lineage: vec![], } } - - fn extract_container_id(cgroup: &str) -> Option { - let cgroup = if let Some(i) = cgroup.rfind(".scope") { - cgroup.split_at(i).0 - } else { - cgroup - }; - - if cgroup.is_empty() || cgroup.len() < 65 { - return None; - } - - let cgroup = cgroup.split_at(cgroup.len() - 65).1; - let (c, cgroup) = cgroup.split_at(1); - if c != "/" && c != "-" { - return None; - } - - if cgroup.chars().all(|c| c.is_ascii_hexdigit()) { - Some(cgroup.split_at(12).0.to_owned()) - } else { - None - } - } } #[cfg(test)] @@ -143,53 +161,6 @@ impl PartialEq for Process { } } -impl TryFrom for Process { - type Error = anyhow::Error; - - fn try_from(value: process_t) -> Result { - let comm = slice_to_string(value.comm.as_slice())?; - let exe_path = slice_to_string(value.exe_path.as_slice())?; - let memory_cgroup = unsafe { CStr::from_ptr(value.memory_cgroup.as_ptr()) }.to_str()?; - let container_id = Process::extract_container_id(memory_cgroup); - let in_root_mount_ns = value.in_root_mount_ns != 0; - - let lineage = value.lineage[..value.lineage_len as usize] - .iter() - .map(Lineage::try_from) - .collect::, _>>()?; - - let mut converted_args = Vec::new(); - let args_len = value.args_len as usize; - let mut offset = 0; - while offset < args_len { - let arg = unsafe { CStr::from_ptr(value.args.as_ptr().add(offset)) } - .to_str()? - .to_owned(); - if arg.is_empty() { - break; - } - offset += arg.len() + 1; - converted_args.push(arg); - } - - let username = host_info::get_username(value.uid); - - Ok(Process { - comm, - args: converted_args, - exe_path, - container_id, - uid: value.uid, - username, - gid: value.gid, - login_uid: value.login_uid, - pid: value.pid, - in_root_mount_ns, - lineage, - }) - } -} - impl From for fact_api::ProcessSignal { fn from(value: Process) -> Self { let Process { @@ -235,20 +206,6 @@ impl From for fact_api::ProcessSignal { } } -trait FileEvent { - fn get_filename(&self) -> &PathBuf; -} - -trait IsMonitored { - fn is_monitored(&self, paths: &[PathBuf]) -> bool; -} - -impl IsMonitored for T { - fn is_monitored(&self, paths: &[PathBuf]) -> bool { - paths.is_empty() || paths.iter().any(|p| self.get_filename().starts_with(p)) - } -} - #[derive(Debug, Clone, Serialize)] pub enum Event { Open(EventOpen), @@ -256,8 +213,21 @@ pub enum Event { } impl Event { + pub async fn new(event: &event_t, cid_cache: &ContainerIdCache) -> anyhow::Result { + match event.type_ { + file_activity_type_t::FILE_ACTIVITY_OPEN => { + Ok(EventOpen::new(event, cid_cache).await?.into()) + } + file_activity_type_t::FILE_ACTIVITY_CREATION => { + Ok(EventCreation::new(event, cid_cache).await?.into()) + } + invalid => unreachable!("Invalid event type: {invalid:?}"), + } + } + #[cfg(test)] - pub fn new( + #[allow(non_upper_case_globals)] + pub fn from_raw_parts( event_type: file_activity_type_t, hostname: &'static str, filename: PathBuf, @@ -266,35 +236,14 @@ impl Event { ) -> Self { match event_type { file_activity_type_t::FILE_ACTIVITY_OPEN => { - EventOpen::new(hostname, filename, host_file, process).into() + EventOpen::from_raw_parts(hostname, filename, host_file, process).into() } file_activity_type_t::FILE_ACTIVITY_CREATION => { - EventCreation::new(hostname, filename, host_file, process).into() + EventCreation::from_raw_parts(hostname, filename, host_file, process).into() } invalid => unreachable!("Invalid event type: {invalid:?}"), } } - - pub fn is_monitored(&self, paths: &[PathBuf]) -> bool { - match self { - Event::Open(e) => e.is_monitored(paths), - Event::Creation(e) => e.is_monitored(paths), - } - } -} - -impl TryFrom<&event_t> for Event { - type Error = anyhow::Error; - - fn try_from(value: &event_t) -> Result { - match value.type_ { - file_activity_type_t::FILE_ACTIVITY_OPEN => Ok(EventOpen::try_from(value)?.into()), - file_activity_type_t::FILE_ACTIVITY_CREATION => { - Ok(EventCreation::try_from(value)?.into()) - } - id => unreachable!("Invalid event type: {id:?}"), - } - } } impl From for FileActivity { @@ -329,8 +278,23 @@ macro_rules! basic_file_event { } impl $event_type { + async fn new(event: &event_t, cid_cache: &ContainerIdCache) -> anyhow::Result { + let timestamp = host_info::get_boot_time() + event.timestamp; + let filename = slice_to_string(event.filename.as_slice())?.into(); + let host_file = slice_to_string(event.host_file.as_slice())?.into(); + let process = Process::new(&event.process, cid_cache).await?; + + Ok($event_type { + timestamp, + hostname: host_info::get_hostname(), + process, + filename, + host_file, + }) + } + #[cfg(test)] - pub fn new( + pub fn from_raw_parts( hostname: &'static str, filename: PathBuf, host_file: PathBuf, @@ -350,12 +314,6 @@ macro_rules! basic_file_event { } } - impl FileEvent for $event_type { - fn get_filename(&self) -> &PathBuf { - &self.filename - } - } - #[cfg(test)] impl PartialEq for $event_type { fn eq(&self, other: &Self) -> bool { @@ -365,25 +323,6 @@ macro_rules! basic_file_event { && self.process == other.process } } - - impl TryFrom<&event_t> for $event_type { - type Error = anyhow::Error; - - fn try_from(value: &event_t) -> Result { - let timestamp = host_info::get_boot_time() + value.timestamp; - let filename = slice_to_string(value.filename.as_slice())?.into(); - let host_file = slice_to_string(value.host_file.as_slice())?.into(); - let process = value.process.try_into()?; - - Ok($event_type { - timestamp, - hostname: host_info::get_hostname(), - process, - filename, - host_file, - }) - } - } }; } diff --git a/fact/src/host_info.rs b/fact/src/host_info.rs index 42efa019..9bd556f2 100644 --- a/fact/src/host_info.rs +++ b/fact/src/host_info.rs @@ -177,3 +177,30 @@ impl SystemInfo { Ok(SystemInfo { kernel, arch }) } } + +pub fn get_cgroup_paths() -> Vec { + let Ok(file) = File::open("/proc/mounts") else { + warn!("Failed to open /proc/mounts"); + return Vec::new(); + }; + + BufReader::new(file) + .lines() + .filter_map(|line| match line { + Ok(line) => Some(line), + Err(e) => { + warn!("Failed to read line from /proc/mounts: {e}"); + None + } + }) + .filter_map(|line| { + let mut parts = line.split(' '); + let fs_type = parts.next()?; + if fs_type == "cgroup" || fs_type == "cgroup2" { + parts.next().map(PathBuf::from) + } else { + None + } + }) + .collect() +} diff --git a/fact/src/lib.rs b/fact/src/lib.rs index 221f7f45..8aa97dfa 100644 --- a/fact/src/lib.rs +++ b/fact/src/lib.rs @@ -2,6 +2,7 @@ use std::{io::Write, str::FromStr}; use anyhow::Context; use bpf::Bpf; +use cgroup::ContainerIdCache; use host_info::{get_distro, get_hostname, SystemInfo}; use log::{debug, info, warn, LevelFilter}; use metrics::exporter::Exporter; @@ -12,6 +13,7 @@ use tokio::{ }; mod bpf; +mod cgroup; pub mod config; mod event; mod grpc; @@ -73,6 +75,9 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> { debug!("Skipping pre-flight checks"); } + let cid_cache = ContainerIdCache::new(); + cid_cache.clone().start_worker(run_rx.clone()); + let mut bpf = Bpf::new(&config)?; if config.health_check() { @@ -88,7 +93,13 @@ pub async fn run(config: FactConfig) -> anyhow::Result<()> { output.start(&config)?; // Gather events from the ring buffer and print them out. - Bpf::start_worker(tx, bpf.fd, run_rx, exporter.metrics.bpf_worker.clone()); + Bpf::start_worker( + tx, + bpf.fd, + run_rx, + exporter.metrics.bpf_worker.clone(), + cid_cache, + ); let mut sigterm = signal(SignalKind::terminate())?; tokio::select! { From 13456b9e9a86b8ff7b6930dddeae3d8c3df02271 Mon Sep 17 00:00:00 2001 From: Mauro Ezequiel Moltrasio Date: Thu, 25 Sep 2025 11:39:06 +0200 Subject: [PATCH 2/2] Add integration test for events generated in containers Because capturing information for a process running in a container is not fully possible from `/proc`, some information is passed to the Process constructor directly to overwrite it. This should be fine though, because we are using a specific container that should not have major changes on how it executes the command we are expecting. --- Cargo.toml | 2 +- fact/src/cgroup.rs | 12 ++++++------ fact/src/event.rs | 17 +++++++---------- tests/conftest.py | 23 +++++++++++++++++------ tests/event.py | 34 +++++++++++++++++++++++----------- tests/logs.py | 4 ++++ tests/test_file_open.py | 41 +++++++++++++++++++++++++++++++++++++++++ 7 files changed, 99 insertions(+), 34 deletions(-) create mode 100644 tests/logs.py diff --git a/Cargo.toml b/Cargo.toml index 7888fc11..a3fc7a75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ log = { version = "0.4.22", default-features = false } prometheus-client = { version = "0.24.0", default-features = false } prost = "0.13.5" prost-types = "0.13.5" -serde = { version = "1.0.219", features = ["derive"] } +serde = { version = "1.0.219", features = ["derive", "rc"] } serde_json = "1.0.142" tokio = { version = "1.40.0", default-features = false, features = [ "macros", diff --git a/fact/src/cgroup.rs b/fact/src/cgroup.rs index 2bd82112..a1be8b0d 100644 --- a/fact/src/cgroup.rs +++ b/fact/src/cgroup.rs @@ -17,7 +17,7 @@ use crate::host_info::get_cgroup_paths; #[derive(Debug)] struct ContainerIdEntry { - container_id: Option, + container_id: Option>, pub last_seen: SystemTime, } @@ -51,7 +51,7 @@ impl ContainerIdCache { }) } - pub async fn get_container_id(&self, cgroup_id: u64) -> Option { + pub async fn get_container_id(&self, cgroup_id: u64) -> Option> { let mut map = self.0.lock().await; match map.get(&cgroup_id) { Some(entry) => entry.container_id.clone(), @@ -82,7 +82,7 @@ impl ContainerIdCache { }) } - fn walk_cgroupfs(path: &PathBuf, map: &mut ContainerIdMap, parent_id: Option<&str>) { + fn walk_cgroupfs(path: &PathBuf, map: &mut ContainerIdMap, parent_id: Option>) { for entry in std::fs::read_dir(path).unwrap() { let entry = match entry { Ok(entry) => entry, @@ -109,8 +109,8 @@ impl ContainerIdCache { .unwrap_or(""); let container_id = match ContainerIdCache::extract_container_id(last_component) { - Some(cid) => Some(cid), - None => parent_id.map(|f| f.to_owned()), + Some(cid) => Some(Arc::new(cid)), + None => parent_id.clone(), }; let last_seen = SystemTime::now(); map.insert( @@ -123,7 +123,7 @@ impl ContainerIdCache { container_id } }; - ContainerIdCache::walk_cgroupfs(&p, map, container_id.as_deref()); + ContainerIdCache::walk_cgroupfs(&p, map, container_id); } } diff --git a/fact/src/event.rs b/fact/src/event.rs index ae46bd24..ffe86373 100644 --- a/fact/src/event.rs +++ b/fact/src/event.rs @@ -1,6 +1,6 @@ #[cfg(test)] use std::time::{SystemTime, UNIX_EPOCH}; -use std::{ffi::CStr, os::raw::c_char, path::PathBuf}; +use std::{ffi::CStr, os::raw::c_char, path::PathBuf, sync::Arc}; use fact_api::FileActivity; use serde::Serialize; @@ -55,7 +55,7 @@ pub struct Process { comm: String, args: Vec, exe_path: String, - container_id: Option, + container_id: Option>, uid: u32, username: &'static str, gid: u32, @@ -121,7 +121,7 @@ impl Process { .unwrap(); let args = std::env::args().collect::>(); let cgroup = std::fs::read_to_string("/proc/self/cgroup").expect("Failed to read cgroup"); - let container_id = ContainerIdCache::extract_container_id(&cgroup); + let container_id = ContainerIdCache::extract_container_id(&cgroup).map(Arc::new); let uid = unsafe { libc::getuid() }; let gid = unsafe { libc::getgid() }; let pid = std::process::id(); @@ -177,19 +177,16 @@ impl From for fact_api::ProcessSignal { lineage, } = value; - let container_id = container_id.unwrap_or("".to_string()); - - let args = args - .into_iter() - .reduce(|acc, i| acc + " " + &i) - .unwrap_or("".to_owned()); + let container_id = container_id + .map(Arc::unwrap_or_clone) + .unwrap_or("".to_string()); Self { id: Uuid::new_v4().to_string(), container_id, creation_time: None, name: comm, - args, + args: args.join(" "), exec_file_path: exe_path, pid, uid, diff --git a/tests/conftest.py b/tests/conftest.py index 186076e4..dee3f155 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,6 +9,7 @@ import requests from server import FileActivityService +from logs import dump_logs @pytest.fixture @@ -46,6 +47,18 @@ def docker_client(): return docker.from_env() +@pytest.fixture(scope='session', autouse=True) +def docker_api_client(): + """ + Create a docker API client, which is a lower level object and has + access to more methods than the regular client. + + Returns: + A docker.APIClient object created with default values. + """ + return docker.APIClient() + + @pytest.fixture def server(): """ @@ -74,12 +87,6 @@ def get_image(request, docker_client): docker_client.images.pull(image) -def dump_logs(container, file): - logs = container.logs().decode('utf-8') - with open(file, 'w') as f: - f.write(logs) - - @pytest.fixture def fact(request, docker_client, monitored_dir, server, logs_dir): """ @@ -120,6 +127,10 @@ def fact(request, docker_client, monitored_dir, server, logs_dir): 'bind': '/host/usr/lib/os-release', 'mode': 'ro', }, + '/sys/fs/cgroup/': { + 'bind': '/host/sys/fs/cgroup', + 'mode': 'ro', + } }, ) diff --git a/tests/event.py b/tests/event.py index 6c4a15f5..be094312 100644 --- a/tests/event.py +++ b/tests/event.py @@ -36,7 +36,12 @@ class Process: Represents a process with its attributes. """ - def __init__(self, pid: int | None = None): + def __init__(self, + pid: int | None = None, + comm: str | None = None, + exe_path: str | None = None, + args: list[str] | None = None, + ): self._pid: int = pid if pid is not None else os.getpid() proc_dir = os.path.join('/proc', str(self._pid)) @@ -54,16 +59,23 @@ def get_id(line: str, wanted_id: str) -> int | None: elif (gid := get_id(line, 'Gid')) is not None: self._gid: int = gid - self._exe_path: str = os.path.realpath(os.path.join(proc_dir, 'exe')) - - with open(os.path.join(proc_dir, 'cmdline'), 'rb') as f: - content = f.read(4096) - args = [arg.decode('utf-8') - for arg in content.split(b'\x00') if arg] - self._args: str = ' '.join(args) - - with open(os.path.join(proc_dir, 'comm'), 'r') as f: - self._name: str = f.read().strip() + self._exe_path: str = os.path.realpath(os.path.join( + proc_dir, 'exe')) if exe_path is None else exe_path + + if args is None: + with open(os.path.join(proc_dir, 'cmdline'), 'rb') as f: + content = f.read(4096) + args = [arg.decode('utf-8') + for arg in content.split(b'\x00') if arg] + self._args: str = ' '.join(args) + else: + self._args = ' '.join(args) + + if comm is None: + with open(os.path.join(proc_dir, 'comm'), 'r') as f: + self._name: str = f.read().strip() + else: + self._name = comm with open(os.path.join(proc_dir, 'cgroup'), 'r') as f: self._container_id: str = extract_container_id(f.read()) diff --git a/tests/logs.py b/tests/logs.py new file mode 100644 index 00000000..bbc71ef8 --- /dev/null +++ b/tests/logs.py @@ -0,0 +1,4 @@ +def dump_logs(container, file): + logs = container.logs().decode('utf-8') + with open(file, 'w') as f: + f.write(logs) diff --git a/tests/test_file_open.py b/tests/test_file_open.py index de8894a5..f41d6165 100644 --- a/tests/test_file_open.py +++ b/tests/test_file_open.py @@ -1,8 +1,12 @@ +import json import multiprocessing as mp import os import subprocess +import pytest + from event import Event, EventType, Process +from logs import dump_logs def test_open(fact, monitored_dir, server): @@ -145,3 +149,40 @@ def do_test(fut: str, stop_event: mp.Event): finally: stop_event.set() proc.join(1) + + +CONTAINER_CMD = 'mkdir -p {monitored_dir}; echo "Some content" > {monitored_dir}/test.txt ; sleep 5' + + +@pytest.fixture(scope='function') +def test_container(fact, docker_client, monitored_dir, logs_dir): + image = 'fedora:42' + command = f"bash -c '{CONTAINER_CMD.format(monitored_dir=monitored_dir)}'" + container_log = os.path.join(logs_dir, 'fedora.log') + container = docker_client.containers.run( + image, + detach=True, + command=command, + ) + yield container + container.stop(timeout=1) + container.wait(timeout=1) + dump_logs(container, container_log) + container.remove() + + +def test_container_event(fact, monitored_dir, server, test_container, docker_api_client): + fut = os.path.join(monitored_dir, 'test.txt') + + inspect = docker_api_client.inspect_container(test_container.id) + p = Process(pid=inspect['State']['Pid'], + comm='bash', + exe_path='/usr/bin/bash', + args=['bash', '-c', + CONTAINER_CMD.format(monitored_dir=monitored_dir)] + ) + + creation = Event(process=p, event_type=EventType.CREATION, file=fut) + print(f'Waiting for event: {creation}') + + server.wait_events([creation])