From ed417d173d1494f379076c87a3a1c9ebb69c77f5 Mon Sep 17 00:00:00 2001 From: saying121 Date: Thu, 25 Sep 2025 16:57:54 +0800 Subject: [PATCH 1/4] feat(sampling): add profiler --- Cargo.lock | 8 ++- Cargo.toml | 6 +- src/services/mod.rs | 1 + src/services/sampling.rs | 128 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 139 insertions(+), 4 deletions(-) create mode 100644 src/services/sampling.rs diff --git a/Cargo.lock b/Cargo.lock index 8babd2e..503e4db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2099,7 +2099,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "perf-event-rs" version = "0.0.0" -source = "git+https://github.com/OptimatistOpenSource/perf-event-rs.git?rev=423ca26f53b27193d2321028dae5fd362a9673e9#423ca26f53b27193d2321028dae5fd362a9673e9" +source = "git+https://github.com/OptimatistOpenSource/perf-event-rs.git?rev=d6881f34b8a9cde1d70dab5fb1415271e6b0bb25#d6881f34b8a9cde1d70dab5fb1415271e6b0bb25" dependencies = [ "bindgen", "libc", @@ -2290,10 +2290,12 @@ dependencies = [ "local-ip-address", "mimalloc", "nix", + "num_cpus", "nvml-wrapper", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", + "perf-event-rs", "prost", "psh-proto", "psh-system", @@ -2310,9 +2312,11 @@ dependencies = [ [[package]] name = "psh-proto" version = "0.1.0" -source = "git+https://github.com/OptimatistOpenSource/psh-proto.git?rev=ca2919053029cb584b478611f8bf8496bf3cf7f7#ca2919053029cb584b478611f8bf8496bf3cf7f7" +source = "git+ssh://git@github.com/OptimatistOpenSource/psh-proto.git?rev=17c48f6818eb0ff49070a6990a1b14e3a13b2c87#17c48f6818eb0ff49070a6990a1b14e3a13b2c87" dependencies = [ + "perf-event-rs", "prost", + "serde", "tonic", "tonic-build", ] diff --git a/Cargo.toml b/Cargo.toml index b0c2684..fdf3873 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,8 @@ influxdb-line-protocol = { workspace = true } psh-proto = { workspace = true } mimalloc = { workspace = true } nvml-wrapper = { workspace = true } +perf-event-rs = { workspace = true } +num_cpus = { workspace = true } [lints] workspace = true @@ -58,7 +60,7 @@ workspace = true host-op-perf = { path = "crates/op/host-op-perf" } host-op-system = { path = "crates/op/host-op-system" } psh-system = { path = "crates/psh-system" } -perf-event-rs = { git = "https://github.com/OptimatistOpenSource/perf-event-rs.git", rev = "423ca26f53b27193d2321028dae5fd362a9673e9" } +perf-event-rs = { git = "https://github.com/OptimatistOpenSource/perf-event-rs.git", rev = "d6881f34b8a9cde1d70dab5fb1415271e6b0bb25" } tokio = "^1" libc = "^0.2" chrono = "^0.4" @@ -87,7 +89,7 @@ local-ip-address = "^0.6" TinyUFO = "0.4" crossbeam = "0.8" influxdb-line-protocol = "2" -psh-proto = { git = "https://github.com/OptimatistOpenSource/psh-proto.git", rev = "ca2919053029cb584b478611f8bf8496bf3cf7f7" } +psh-proto = { git = "ssh://git@github.com/OptimatistOpenSource/psh-proto.git", rev = "17c48f6818eb0ff49070a6990a1b14e3a13b2c87" } mimalloc = "0.1" nvml-wrapper = "0.10.0" diff --git a/src/services/mod.rs b/src/services/mod.rs index 403a784..30178a3 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -14,3 +14,4 @@ pub mod host_info; pub mod rpc; +pub mod sampling; diff --git a/src/services/sampling.rs b/src/services/sampling.rs new file mode 100644 index 0000000..7b8c0b3 --- /dev/null +++ b/src/services/sampling.rs @@ -0,0 +1,128 @@ +use anyhow::Result; +use perf_event_rs::{ + EventScope, HardwareEvent, + config::{Cpu, Process}, + sampling::{self, Config, ExtraRecord, OverflowBy, Sampler}, +}; +use psh_proto::{ + PerfDataProto, + perf_data_proto::{PerfEvent, PerfFileAttr}, +}; + +#[derive(Default)] +pub struct Profiler { + samplers: Vec, +} + +impl Profiler { + pub fn new>>( + process: Process, + mmap_pages: usize, + overflow_by: OverflowBy, + stack_depth: D, + ) -> Result { + let econf = sampling::ExtraConfig { + comm: true, + comm_exec: true, + precise_ip: sampling::SampleIpSkid::Zero, + inherit: true, + inherit_stat: true, + build_id: true, + // inherit_thread: true, + extra_record_with_sample_id: true, + sample_record_fields: sampling::SampleRecordFields { + id: true, + sample_id: true, + time: true, + ip: true, + pid_and_tid: true, + period: true, + ips: stack_depth.into(), + ..Default::default() + }, + extra_record_types: vec![ExtraRecord::Mmap, ExtraRecord::Mmap2], + ..Default::default() + }; + + let cpu_num = num_cpus::get(); + let mut samplers = Vec::with_capacity(cpu_num); + + for cpu in 0..cpu_num { + let s = Sampler::new( + &process, + &Cpu::Id(cpu as _), + mmap_pages.next_power_of_two() + 1, + &Config::extra_new( + &HardwareEvent::CpuCycles.into(), + &EventScope::all(), + &overflow_by, + &econf, + ), + )?; + samplers.push(s); + } + + Ok(Self { samplers }) + } + + /// Get current sampling task data + pub fn perf_data_proto(&mut self) -> PerfDataProto { + let file_attrs: Vec<_> = self + .samplers + .iter() + .map(|sampler| { + let attr = sampler.perf_event_attr(); + + let perf_event_attr = attr.into(); + PerfFileAttr { + attr: Some(perf_event_attr), + ids: vec![], + } + }) + .collect(); + + let events = self + .samplers + .iter_mut() + .flat_map(|v| v.iter()) + .map(|v| PerfEvent { + header: None, + timestamp: None, + event_type: Some(v.body.into()), + }) + .collect(); + + PerfDataProto { + file_attrs, + events, + event_types: vec![], + timestamp_sec: None, + stats: None, + metadata_mask: vec![], + tracing_data: None, + build_ids: vec![], + uint32_metadata: vec![], + uint64_metadata: vec![], + cpu_topology: None, + numa_topology: vec![], + pmu_mappings: vec![], + group_desc: vec![], + hybrid_topology: vec![], + string_metadata: None, + } + } + + pub fn enable(&self) -> Result<()> { + for ele in &self.samplers { + ele.enable()?; + } + Ok(()) + } + + pub fn disable(&self) -> Result<()> { + for ele in &self.samplers { + ele.disable()?; + } + Ok(()) + } +} From 1e5b02796c5b66499d94abf5edefafa3d5299be3 Mon Sep 17 00:00:00 2001 From: saying121 Date: Thu, 25 Sep 2025 16:58:23 +0800 Subject: [PATCH 2/4] feat(rpc): support profiling task --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/main.rs | 65 +++++++++++++++++++++++++++++++------- src/runtime/data_export.rs | 14 ++++---- src/runtime/mod.rs | 9 +++--- src/services/rpc.rs | 64 +++++++++++++++++++++++++++++-------- 6 files changed, 119 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 503e4db..8673b6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2312,7 +2312,7 @@ dependencies = [ [[package]] name = "psh-proto" version = "0.1.0" -source = "git+ssh://git@github.com/OptimatistOpenSource/psh-proto.git?rev=17c48f6818eb0ff49070a6990a1b14e3a13b2c87#17c48f6818eb0ff49070a6990a1b14e3a13b2c87" +source = "git+https://github.com/OptimatistOpenSource/psh-proto.git?rev=17c48f6818eb0ff49070a6990a1b14e3a13b2c87#17c48f6818eb0ff49070a6990a1b14e3a13b2c87" dependencies = [ "perf-event-rs", "prost", diff --git a/Cargo.toml b/Cargo.toml index fdf3873..63eaa86 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,7 +89,7 @@ local-ip-address = "^0.6" TinyUFO = "0.4" crossbeam = "0.8" influxdb-line-protocol = "2" -psh-proto = { git = "ssh://git@github.com/OptimatistOpenSource/psh-proto.git", rev = "17c48f6818eb0ff49070a6990a1b14e3a13b2c87" } +psh-proto = { git = "https://github.com/OptimatistOpenSource/psh-proto.git", rev = "17c48f6818eb0ff49070a6990a1b14e3a13b2c87" } mimalloc = "0.1" nvml-wrapper = "0.10.0" diff --git a/src/main.rs b/src/main.rs index 7bd2d82..9d0f77d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,11 +32,16 @@ use log::log_init; use mimalloc::MiMalloc; use nix::unistd::geteuid; use opentelemetry_otlp::ExportConfig; -use psh_proto::HeartbeatReq; -use runtime::{Task, TaskRuntime}; +use psh_proto::{ + HeartbeatReq, PerfDataProto, + export_data_req::{Data, data::DataType}, +}; +use runtime::{TaskRuntime, WasmTask}; use services::rpc::RpcClient; use tokio::{runtime::Runtime, try_join}; +use self::services::{rpc::WhichTask, sampling::Profiler}; + #[global_allocator] static GLOBAL: MiMalloc = mimalloc::MiMalloc; @@ -86,7 +91,7 @@ fn main() -> Result<()> { let task_rt = TaskRuntime::new()?; if let Some(args) = wasm_with_args { - let task = Task { + let task = WasmTask { id: None, wasm_component: fs::read(&args[0])?, wasm_component_args: args, @@ -143,14 +148,52 @@ async fn async_tasks(remote_cfg: RemoteConfig, mut task_rt: TaskRuntime) -> Resu loop { let idle = task_rt.is_idle(); if idle { - if let Some(mut task) = client.get_task(instance_id.clone()).await? { - let task_id = task - .id - .as_ref() - .map(|it| it.to_string()) - .expect("No task id provided"); - task.wasm_component_args.insert(0, task_id); - task_rt.schedule(task)? + if let Some(task) = client.get_task(instance_id.clone()).await? { + match task { + WhichTask::Wasm(mut task) => { + let task_id = task + .id + .as_ref() + .map(|it| it.to_string()) + .expect("No task id provided"); + task.wasm_component_args.insert(0, task_id); + task_rt.schedule(task)?; + } + WhichTask::Profiling(profiling_task) => { + let mut profiler = Profiler::new( + profiling_task.process, + profiling_task.mmap_pages as _, + profiling_task.overflow_by, + profiling_task.stack_depth, + )?; + let task_time_slice = { + let delta = profiling_task.end_time.timestamp_millis() + - Utc::now().timestamp_millis(); + delta.max(0) as u64 + }; + + let perf_data = tokio::task::spawn_blocking( + move || -> anyhow::Result { + profiler.enable()?; + std::thread::sleep(Duration::from_millis(task_time_slice)); + profiler.disable()?; + Ok(profiler.perf_data_proto()) + }, + ) + .await??; + if let Some(task_id) = profiling_task.id { + let data = Data { + data_type: Some(DataType::PerfData(perf_data)), + }; + client + .export_data(psh_proto::ExportDataReq { + task_id, + data: vec![data], + }) + .await?; + } + } + }; } } diff --git a/src/runtime/data_export.rs b/src/runtime/data_export.rs index 03abfab..241e9c6 100644 --- a/src/runtime/data_export.rs +++ b/src/runtime/data_export.rs @@ -26,7 +26,10 @@ use profiling::data_export::{ common::FieldValue as WitFieldValue, measurement::Point, metric::Sample, }; use prost::Message; -use psh_proto::{Data, DataType, ExportDataReq}; +use psh_proto::{ + ExportDataReq, LineProtocolData, + export_data_req::{Data, data::DataType}, +}; use wasmtime::component::Linker; use crate::{TOKIO_RUNTIME, services::rpc::RpcClient}; @@ -172,8 +175,7 @@ impl profiling::data_export::file::Host for DataExportCtx { }; let data = Data { - ty: DataType::File as _, - bytes, + data_type: Some(DataType::File(psh_proto::FileData { bytes })), }; ctx.exporter.schedule(data); @@ -201,8 +203,7 @@ impl profiling::data_export::metric::Host for DataExportCtx { }; let data = Data { - ty: DataType::LineProtocol as _, - bytes, + data_type: Some(DataType::LineProtocol(LineProtocolData { bytes })), }; ctx.exporter.schedule(data); @@ -238,8 +239,7 @@ impl profiling::data_export::measurement::Host for DataExportCtx { }; let data = Data { - ty: DataType::LineProtocol as _, - bytes, + data_type: Some(DataType::LineProtocol(LineProtocolData { bytes })), }; ctx.exporter.schedule(data); diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 0302d4c..5da255b 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -39,7 +39,8 @@ pub use state::PshState; use crate::services::rpc::RpcClient; -pub struct Task { +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct WasmTask { pub id: Option, pub wasm_component: Vec, pub wasm_component_args: Vec, @@ -47,8 +48,8 @@ pub struct Task { } pub struct TaskRuntime { - tx: Sender, - rx: Option>, + tx: Sender, + rx: Option>, len: Arc, finished_task_id: Arc>>, } @@ -65,7 +66,7 @@ impl TaskRuntime { }) } - pub fn schedule(&self, task: Task) -> Result<()> { + pub fn schedule(&self, task: WasmTask) -> Result<()> { self.len.fetch_add(1, Ordering::Release); self.tx.send(task)?; Ok(()) diff --git a/src/services/rpc.rs b/src/services/rpc.rs index bd648f6..573ea09 100644 --- a/src/services/rpc.rs +++ b/src/services/rpc.rs @@ -13,7 +13,10 @@ // see . use anyhow::{Result, bail}; +use chrono::DateTime; use chrono::{TimeZone, Utc, offset::LocalResult}; +use perf_event_rs::sampling::OverflowBy; +use psh_proto::task::TaskType; use psh_proto::{ ExportDataReq, GetTaskReq, HeartbeatReq, TaskDoneReq, Unit, psh_service_client::PshServiceClient, @@ -26,7 +29,7 @@ use tonic::{ transport::{Channel, ClientTlsConfig, Endpoint}, }; -use crate::{config::RpcConfig, runtime::Task, services::host_info::new_info_req}; +use crate::{config::RpcConfig, runtime::WasmTask, services::host_info::new_info_req}; #[derive(Clone)] pub struct RpcClient { @@ -80,6 +83,20 @@ where } } +pub enum WhichTask { + Wasm(WasmTask), + Profiling(ProfilingTask), +} + +pub struct ProfilingTask { + pub id: Option, + pub process: perf_event_rs::config::Process, + pub mmap_pages: u64, + pub overflow_by: OverflowBy, + pub stack_depth: Option, + pub end_time: DateTime, +} + impl RpcClient { pub async fn new(config: &RpcConfig, token: String) -> Result { let ep = Endpoint::from_shared(config.addr.clone())? @@ -132,7 +149,7 @@ impl RpcClient { Ok(()) } - pub async fn get_task(&mut self, instance_id: String) -> Result> { + pub async fn get_task(&mut self, instance_id: String) -> Result> { let get_task_req = GetTaskReq { instance_id }; let token = &self.token; @@ -142,20 +159,41 @@ impl RpcClient { self.client.get_task(req).await }) .await?; - let task = match response.into_inner().task { - Some(task) => task, - None => return Ok(None), + let Some(task): Option = response.into_inner().task else { + return Ok(None); }; - let end_time = match Utc.timestamp_millis_opt(task.end_time as _) { - LocalResult::Single(t) => t, - _ => bail!("Invalid task end time"), + let LocalResult::Single(end_time) = Utc.timestamp_millis_opt(task.end_time as _) else { + bail!("Invalid task end time") }; - let task = Task { - id: Some(task.id), - wasm_component: task.wasm, - wasm_component_args: task.wasm_args, - end_time, + let Some(task_type) = task.task_type else { + return Ok(None); + }; + + let task = match task_type { + TaskType::Profiling(profiling_task) => { + let Some(process) = profiling_task.process else { + return Ok(None); + }; + let Some(overflow_by) = profiling_task.overflow_by else { + return Ok(None); + }; + let process: perf_event_rs::config::Process = process.into(); + WhichTask::Profiling(ProfilingTask { + id: task.id.into(), + process, + mmap_pages: profiling_task.mmap_pages, + overflow_by: overflow_by.into(), + stack_depth: profiling_task.stack_depth.map(|v| v as _), + end_time, + }) + } + TaskType::Wasm(wasm_task) => WhichTask::Wasm(WasmTask { + id: Some(task.id), + wasm_component: wasm_task.wasm, + wasm_component_args: wasm_task.wasm_args, + end_time, + }), }; Ok(Some(task)) From 931641014dc54dc86314e0173a0a2552be57117a Mon Sep 17 00:00:00 2001 From: saying121 Date: Sun, 28 Sep 2025 14:12:02 +0800 Subject: [PATCH 3/4] feat(task): upload elf file --- Cargo.lock | 2 +- Cargo.toml | 4 ++-- src/main.rs | 28 +++++++++++++++++++++++----- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8673b6e..7bff538 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2312,7 +2312,7 @@ dependencies = [ [[package]] name = "psh-proto" version = "0.1.0" -source = "git+https://github.com/OptimatistOpenSource/psh-proto.git?rev=17c48f6818eb0ff49070a6990a1b14e3a13b2c87#17c48f6818eb0ff49070a6990a1b14e3a13b2c87" +source = "git+https://github.com/OptimatistOpenSource/psh-proto.git?rev=c82356c52925ac0abf9cd93c7245c3518d78e096#c82356c52925ac0abf9cd93c7245c3518d78e096" dependencies = [ "perf-event-rs", "prost", diff --git a/Cargo.toml b/Cargo.toml index 63eaa86..4a8c89c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ chrono = { workspace = true } clap = { workspace = true, features = ["derive", "wrap_help"] } tonic = { workspace = true } prost = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "fs"] } nix = { workspace = true, features = ["user", "hostname"] } wasmtime = { workspace = true } wasmtime-wasi = { workspace = true } @@ -89,7 +89,7 @@ local-ip-address = "^0.6" TinyUFO = "0.4" crossbeam = "0.8" influxdb-line-protocol = "2" -psh-proto = { git = "https://github.com/OptimatistOpenSource/psh-proto.git", rev = "17c48f6818eb0ff49070a6990a1b14e3a13b2c87" } +psh-proto = { git = "https://github.com/OptimatistOpenSource/psh-proto.git", rev = "c82356c52925ac0abf9cd93c7245c3518d78e096" } mimalloc = "0.1" nvml-wrapper = "0.10.0" diff --git a/src/main.rs b/src/main.rs index 9d0f77d..069cc5a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -182,14 +182,32 @@ async fn async_tasks(remote_cfg: RemoteConfig, mut task_rt: TaskRuntime) -> Resu ) .await??; if let Some(task_id) = profiling_task.id { - let data = Data { + let mut data = vec![]; + for ele in &perf_data.events { + let Some(event_type) = &ele.event_type else { + continue; + }; + + if let psh_proto::perf_data_proto::perf_event::EventType::MmapEvent(event)= event_type{ + let Some(filename) = &event.filename else { + continue; + }; + + let data_type = DataType::ElfFile(psh_proto::ElfFile { + filename: filename.to_owned(), + build_id: event.build_id.clone(), + arch: std::env::consts::ARCH.to_string(), + bytes: tokio::fs::read(filename).await? + }); + data.push(Data { data_type: Some(data_type) }); + } + } + let dat = Data { data_type: Some(DataType::PerfData(perf_data)), }; + data.push(dat); client - .export_data(psh_proto::ExportDataReq { - task_id, - data: vec![data], - }) + .export_data(psh_proto::ExportDataReq { task_id, data }) .await?; } } From 1129e0164698b603d0377eff0f66824b854882c8 Mon Sep 17 00:00:00 2001 From: saying121 Date: Mon, 13 Oct 2025 17:26:48 +0800 Subject: [PATCH 4/4] feat: add path validation and separate upload elf task --- Cargo.lock | 41 +++++++++++++--- Cargo.toml | 4 +- doc/config.toml | 3 ++ src/config.rs | 11 ++++- src/main.rs | 103 ++++++++++++++++++++++++++++++--------- src/services/rpc.rs | 27 +++++++++- src/services/sampling.rs | 55 +++++++++++++++++++++ 7 files changed, 209 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7bff538..3e628bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -220,7 +220,7 @@ dependencies = [ "cfg-if", "libc", "miniz_oxide", - "object", + "object 0.36.7", "rustc-demangle", "windows-targets 0.52.6", ] @@ -1952,6 +1952,17 @@ dependencies = [ "memchr", ] +[[package]] +name = "object" +version = "0.37.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe" +dependencies = [ + "flate2", + "memchr", + "ruzstd", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -2292,6 +2303,7 @@ dependencies = [ "nix", "num_cpus", "nvml-wrapper", + "object 0.37.3", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", @@ -2312,7 +2324,7 @@ dependencies = [ [[package]] name = "psh-proto" version = "0.1.0" -source = "git+https://github.com/OptimatistOpenSource/psh-proto.git?rev=c82356c52925ac0abf9cd93c7245c3518d78e096#c82356c52925ac0abf9cd93c7245c3518d78e096" +source = "git+https://github.com/OptimatistOpenSource/psh-proto.git?rev=5cf7cc9#5cf7cc987c936dc751d22ee76837b5fefa897677" dependencies = [ "perf-event-rs", "prost", @@ -2675,6 +2687,15 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eded382c5f5f786b989652c49544c4877d9f015cc22e145a5ea8ea66c2921cd2" +[[package]] +name = "ruzstd" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3640bec8aad418d7d03c72ea2de10d5c646a598f9883c7babc160d91e3c1b26c" +dependencies = [ + "twox-hash", +] + [[package]] name = "ryu" version = "1.0.20" @@ -3305,6 +3326,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "twox-hash" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" + [[package]] name = "typenum" version = "1.18.0" @@ -3565,7 +3592,7 @@ dependencies = [ "log", "mach2", "memfd", - "object", + "object 0.36.7", "once_cell", "paste", "postcard", @@ -3664,7 +3691,7 @@ dependencies = [ "gimli", "itertools 0.12.1", "log", - "object", + "object 0.36.7", "smallvec", "target-lexicon", "thiserror 1.0.69", @@ -3686,7 +3713,7 @@ dependencies = [ "gimli", "indexmap", "log", - "object", + "object 0.36.7", "postcard", "rustc-demangle", "semver", @@ -3721,7 +3748,7 @@ version = "28.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cec0a8e5620ae71bfcaaec78e3076be5b6ebf869f4e6191925d73242224a915" dependencies = [ - "object", + "object 0.36.7", "rustix 0.38.44", "wasmtime-versioned-export-macros", ] @@ -3794,7 +3821,7 @@ dependencies = [ "anyhow", "cranelift-codegen", "gimli", - "object", + "object 0.36.7", "target-lexicon", "wasmparser 0.221.3", "wasmtime-cranelift", diff --git a/Cargo.toml b/Cargo.toml index 4a8c89c..1ea9eaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ mimalloc = { workspace = true } nvml-wrapper = { workspace = true } perf-event-rs = { workspace = true } num_cpus = { workspace = true } +object = { workspace = true } [lints] workspace = true @@ -89,9 +90,10 @@ local-ip-address = "^0.6" TinyUFO = "0.4" crossbeam = "0.8" influxdb-line-protocol = "2" -psh-proto = { git = "https://github.com/OptimatistOpenSource/psh-proto.git", rev = "c82356c52925ac0abf9cd93c7245c3518d78e096" } +psh-proto = { git = "https://github.com/OptimatistOpenSource/psh-proto.git", rev = "5cf7cc9" } mimalloc = "0.1" nvml-wrapper = "0.10.0" +object = "0.37" [workspace.lints.rust] diff --git a/doc/config.toml b/doc/config.toml index 2568600..89dc4a0 100644 --- a/doc/config.toml +++ b/doc/config.toml @@ -30,3 +30,6 @@ buf_watermark = 2048 enable = false addr = "https://api.optimatist.com" interval = 10 + +[perms] +allowed_paths = [] diff --git a/src/config.rs b/src/config.rs index 0347412..459c98e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -12,7 +12,10 @@ // You should have received a copy of the GNU Lesser General Public License along with Performance Savior Home (PSH). If not, // see . -use std::{fs, path::Path}; +use std::{ + fs, + path::{Path, PathBuf}, +}; use anyhow::Result; use serde::Deserialize; @@ -24,6 +27,12 @@ const TEMPLATE: &str = include_str!("../doc/config.toml"); pub struct Config { pub daemon: DaemonConfig, pub remote: RemoteConfig, + pub perms: PermsConfig, +} + +#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Deserialize)] +pub struct PermsConfig { + pub allowed_paths: Vec, } #[derive(Clone, Deserialize)] diff --git a/src/main.rs b/src/main.rs index 069cc5a..94fbfcf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,7 +20,7 @@ mod otlp; mod runtime; mod services; -use std::{fs, sync::LazyLock, thread, time::Duration}; +use std::{fs, path::PathBuf, str::FromStr, sync::LazyLock, thread, time::Duration}; use anyhow::{Error, Result, bail}; use args::Args; @@ -35,12 +35,16 @@ use opentelemetry_otlp::ExportConfig; use psh_proto::{ HeartbeatReq, PerfDataProto, export_data_req::{Data, data::DataType}, + task_done_req::TaskStatus, }; use runtime::{TaskRuntime, WasmTask}; use services::rpc::RpcClient; use tokio::{runtime::Runtime, try_join}; -use self::services::{rpc::WhichTask, sampling::Profiler}; +use self::{ + config::PermsConfig, + services::{rpc::WhichTask, sampling::Profiler}, +}; #[global_allocator] static GLOBAL: MiMalloc = mimalloc::MiMalloc; @@ -101,7 +105,7 @@ fn main() -> Result<()> { }; thread::spawn(move || -> Result<()> { - let tasks = async_tasks(cfg.remote, task_rt); + let tasks = async_tasks(cfg.remote, task_rt, cfg.perms); TOKIO_RUNTIME.block_on(tasks)?; Ok(()) }) @@ -111,7 +115,11 @@ fn main() -> Result<()> { Ok(()) } -async fn async_tasks(remote_cfg: RemoteConfig, mut task_rt: TaskRuntime) -> Result<()> { +async fn async_tasks( + remote_cfg: RemoteConfig, + mut task_rt: TaskRuntime, + perms_cfg: PermsConfig, +) -> Result<()> { let token_cloned = remote_cfg.token.clone(); let rpc_task = async move { if !remote_cfg.rpc.enable { @@ -183,25 +191,6 @@ async fn async_tasks(remote_cfg: RemoteConfig, mut task_rt: TaskRuntime) -> Resu .await??; if let Some(task_id) = profiling_task.id { let mut data = vec![]; - for ele in &perf_data.events { - let Some(event_type) = &ele.event_type else { - continue; - }; - - if let psh_proto::perf_data_proto::perf_event::EventType::MmapEvent(event)= event_type{ - let Some(filename) = &event.filename else { - continue; - }; - - let data_type = DataType::ElfFile(psh_proto::ElfFile { - filename: filename.to_owned(), - build_id: event.build_id.clone(), - arch: std::env::consts::ARCH.to_string(), - bytes: tokio::fs::read(filename).await? - }); - data.push(Data { data_type: Some(data_type) }); - } - } let dat = Data { data_type: Some(DataType::PerfData(perf_data)), }; @@ -209,8 +198,72 @@ async fn async_tasks(remote_cfg: RemoteConfig, mut task_rt: TaskRuntime) -> Resu client .export_data(psh_proto::ExportDataReq { task_id, data }) .await?; + } else { + // TODO: analyze locally } } + WhichTask::UploadElf(upload_elf) => { + let mut data = vec![]; + let file_path = PathBuf::from_str(&upload_elf.filename).unwrap(); + let allowed = perms_cfg + .allowed_paths + .iter() + .any(|v| file_path.starts_with(v)); + if !allowed { + if let Err(e) = client + .task_done(upload_elf.id, TaskStatus::AccessDeined) + .await + { + tracing::error!("{e}"); + } + continue; + } + + let bytes = match tokio::fs::read(&upload_elf.filename).await { + Ok(o) => o, + Err(e) => { + if e.kind() == std::io::ErrorKind::NotFound { + if let Err(e) = client + .task_done(upload_elf.id, TaskStatus::NotFound) + .await + { + tracing::error!("{}", e); + } + } + continue; + } + }; + + let build_id = services::sampling::get_build_id(&bytes)?; + match (upload_elf.build_id, build_id) { + (Some(tbid), Some(bid)) if tbid != bid.to_string() => { + if let Err(e) = client + .task_done(upload_elf.id, TaskStatus::BadBuildId) + .await + { + tracing::error!("{e}"); + } + continue; + } + (_, _) => {} + }; + + let data_type = DataType::ElfFile(psh_proto::ElfFile { + filename: upload_elf.filename.to_owned(), + build_id: build_id.map(|v| v.to_string()), + arch: std::env::consts::ARCH.to_string(), + bytes, + }); + data.push(Data { + data_type: Some(data_type), + }); + client + .export_data(psh_proto::ExportDataReq { + task_id: upload_elf.id, + data, + }) + .await?; + } }; } } @@ -223,7 +276,9 @@ async fn async_tasks(remote_cfg: RemoteConfig, mut task_rt: TaskRuntime) -> Resu .await?; if let Some(id) = task_rt.finished_task_id() { - let _ = client.task_done(id).await; + if let Err(e) = client.task_done(id, TaskStatus::Ok).await { + tracing::error!("{e}"); + } } tokio::time::sleep(duration).await; diff --git a/src/services/rpc.rs b/src/services/rpc.rs index 573ea09..28c9cea 100644 --- a/src/services/rpc.rs +++ b/src/services/rpc.rs @@ -86,6 +86,14 @@ where pub enum WhichTask { Wasm(WasmTask), Profiling(ProfilingTask), + UploadElf(UploadElfTask), +} + +#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] +pub struct UploadElfTask { + pub id: String, + pub filename: String, + pub build_id: Option, } pub struct ProfilingTask { @@ -194,13 +202,28 @@ impl RpcClient { wasm_component_args: wasm_task.wasm_args, end_time, }), + TaskType::UploadElf(upload_elf_task) => WhichTask::UploadElf(UploadElfTask { + id: task.id, + filename: upload_elf_task.filename, + build_id: upload_elf_task.build_id, + }), }; Ok(Some(task)) } - pub async fn task_done(&mut self, task_id: String) -> Result<()> { - let req = into_req(TaskDoneReq { task_id }, &self.token)?; + pub async fn task_done( + &mut self, + task_id: String, + status: psh_proto::task_done_req::TaskStatus, + ) -> Result<()> { + let req = into_req( + TaskDoneReq { + task_id, + status: status as _, + }, + &self.token, + )?; self.client.task_done(req).await?; Ok(()) } diff --git a/src/services/sampling.rs b/src/services/sampling.rs index 7b8c0b3..8cb110a 100644 --- a/src/services/sampling.rs +++ b/src/services/sampling.rs @@ -1,4 +1,7 @@ +use std::{fmt::Display, str::FromStr}; + use anyhow::Result; +use object::Object; use perf_event_rs::{ EventScope, HardwareEvent, config::{Cpu, Process}, @@ -9,6 +12,51 @@ use psh_proto::{ perf_data_proto::{PerfEvent, PerfFileAttr}, }; +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] +pub struct BuildId([u8; 20]); + +impl BuildId { + pub fn from_bytes(bytes: &[u8]) -> Self { + let mut b = [0; 20]; + b.copy_from_slice(&bytes[..20]); + Self(b) + } +} + +impl Display for BuildId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for byte in &self.0 { + f.write_fmt(format_args!("{byte:02x}"))?; + } + Ok(()) + } +} + +impl FromStr for BuildId { + type Err = anyhow::Error; + + fn from_str(s: &str) -> std::result::Result { + let len = s.len(); + if len > 40 { + anyhow::bail!("40"); + } + let mut bytes = [0; 20]; + for i in 0..len / 2 { + let hex_byte = &s[i * 2..i * 2 + 2]; + let b = u8::from_str_radix(hex_byte, 16) + .map_err(|_| anyhow::anyhow!("Invalid build-id"))?; + bytes[i] = b; + } + Ok(Self(bytes)) + } +} + +pub fn get_build_id<'c>(content: &'c [u8]) -> Result> { + let file: object::File<'c> = object::File::parse(content)?; + let build_id = file.build_id()?.map(BuildId::from_bytes); + Ok(build_id) +} + #[derive(Default)] pub struct Profiler { samplers: Vec, @@ -126,3 +174,10 @@ impl Profiler { Ok(()) } } + +#[test] +fn feature() { + let a = BuildId([0; 20]); + let id = BuildId::from_str(&a.to_string()).unwrap(); + assert_eq!(a, id); +}