From fa9283b9b13e2d8d977c1a54e1e9869311a64803 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Mon, 5 Jan 2026 22:48:42 +0900 Subject: [PATCH 01/27] chore: little process --- Cargo.toml | 1 + src/host/mod.rs | 5 + src/host/pipewire/device.rs | 353 ++++++++++++++++++++++++++++++++++++ src/host/pipewire/mod.rs | 12 ++ 4 files changed, 371 insertions(+) create mode 100644 src/host/pipewire/device.rs create mode 100644 src/host/pipewire/mod.rs diff --git a/Cargo.toml b/Cargo.toml index fb47eb19a..8e16e0ee3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ custom = [] [dependencies] dasp_sample = "0.11" +pipewire = "0.9.2" [dev-dependencies] anyhow = "1.0" diff --git a/src/host/mod.rs b/src/host/mod.rs index 58b79bb75..bba12fe63 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -7,6 +7,11 @@ pub(crate) mod aaudio; target_os = "netbsd" ))] pub(crate) mod alsa; +#[cfg(all( + any(target_os = "linux", target_os = "dragonfly", target_os = "freebsd"), + //feature = "pipewire" +))] +pub(crate) mod pipewire; #[cfg(all(windows, feature = "asio"))] pub(crate) mod asio; #[cfg(all( diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs new file mode 100644 index 000000000..8b6283f49 --- /dev/null +++ b/src/host/pipewire/device.rs @@ -0,0 +1,353 @@ +use std::{cell::RefCell, rc::Rc}; + +use crate::DeviceDirection; +use pipewire::{ + self as pw, + metadata::{Metadata, MetadataListener}, + node::{Node, NodeListener}, + proxy::ProxyT, + spa::utils::result::AsyncSeq, +}; + +pub type Devices = std::vec::IntoIter; + +#[derive(Clone, Debug, Default, Copy)] +pub(crate) enum DeviceType { + #[default] + Node, + DefaultSink, + DefaultInput, + DefaultOutput, +} + +#[derive(Clone, Debug, Default)] +pub struct Device { + id: u32, + node_name: String, + nick_name: String, + description: String, + direction: DeviceDirection, + channels: usize, + limit_quantum: u32, + rate: u32, + allow_rates: Vec, + quantum: u32, + min_quantum: u32, + max_quantum: u32, + device_type: DeviceType, +} + +impl Device { + fn sink_default() -> Self { + Self { + id: 0, + node_name: "sink_default".to_owned(), + nick_name: "sink_default".to_owned(), + description: "default_sink".to_owned(), + direction: DeviceDirection::Input, + channels: 2, + device_type: DeviceType::DefaultSink, + ..Default::default() + } + } + fn input_default() -> Self { + Self { + id: 0, + node_name: "input_default".to_owned(), + nick_name: "input_default".to_owned(), + description: "default_input".to_owned(), + direction: DeviceDirection::Input, + channels: 2, + device_type: DeviceType::DefaultInput, + ..Default::default() + } + } + fn output_default() -> Self { + Self { + id: 0, + node_name: "output_default".to_owned(), + nick_name: "output_default".to_owned(), + description: "default_output".to_owned(), + direction: DeviceDirection::Output, + channels: 2, + device_type: DeviceType::DefaultOutput, + ..Default::default() + } + } +} + +impl Device { + pub fn id(&self) -> u32 { + self.id + } + pub fn name(&self) -> &str { + &self.nick_name + } + pub fn channels(&self) -> usize { + self.channels + } + pub fn direction(&self) -> DeviceDirection { + self.direction + } + pub fn node_name(&self) -> &str { + &self.node_name + } + pub fn description(&self) -> &str { + &self.description + } + pub fn limit_quantam(&self) -> u32 { + self.limit_quantum + } + pub fn min_quantum(&self) -> u32 { + self.min_quantum + } + pub fn max_quantum(&self) -> u32 { + self.max_quantum + } + pub fn quantum(&self) -> u32 { + self.quantum + } + pub fn rate(&self) -> u32 { + self.rate + } + pub fn allow_rates(&self) -> &[u32] { + &self.allow_rates + } +} + +#[derive(Debug, Clone, Default)] +struct Settings { + rate: u32, + allow_rates: Vec, + quantum: u32, + min_quantum: u32, + max_quantum: u32, +} + +#[allow(dead_code)] +enum Request { + Node(NodeListener), + Meta(MetadataListener), +} + +impl From for Request { + fn from(value: NodeListener) -> Self { + Self::Node(value) + } +} + +impl From for Request { + fn from(value: MetadataListener) -> Self { + Self::Meta(value) + } +} + +fn init_roundtrip() -> Option> { + let mainloop = pw::main_loop::MainLoopRc::new(None).ok()?; + let context = pw::context::ContextRc::new(&mainloop, None).ok()?; + let core = context.connect_rc(None).ok()?; + let registry = core.get_registry_rc().ok()?; + + // To comply with Rust's safety rules, we wrap this variable in an `Rc` and a `Cell`. + let devices: Rc>> = Rc::new(RefCell::new(vec![ + Device::sink_default(), + Device::input_default(), + Device::output_default(), + ])); + let requests = Rc::new(RefCell::new(vec![])); + let settings = Rc::new(RefCell::new(Settings::default())); + let loop_clone = mainloop.clone(); + + // Trigger the sync event. The server's answer won't be processed until we start the main loop, + // so we can safely do this before setting up a callback. This lets us avoid using a Cell. + let peddings: Rc>> = Rc::new(RefCell::new(vec![])); + let pending = core.sync(0).expect("sync failed"); + + peddings.borrow_mut().push(pending); + + let _listener_core = core + .add_listener_local() + .done({ + let peddings = peddings.clone(); + move |id, seq| { + if id != pw::core::PW_ID_CORE { + return; + } + let mut peddinglist = peddings.borrow_mut(); + let Some(index) = peddinglist.iter().position(|o_seq| *o_seq == seq) else { + return; + }; + peddinglist.remove(index); + if !peddinglist.is_empty() { + return; + } + loop_clone.quit(); + } + }) + .register(); + let _listener_reg = registry + .add_listener_local() + .global({ + let devices = devices.clone(); + let registry = registry.clone(); + let requests = requests.clone(); + let settings = settings.clone(); + move |global| match global.type_ { + pipewire::types::ObjectType::Metadata => { + if !global.props.is_some_and(|props| { + props + .get("metadata.name") + .is_some_and(|name| name == "settings") + }) { + return; + } + let meta_settings: Metadata = registry.bind(global).unwrap(); + let settings = settings.clone(); + let listener = meta_settings + .add_listener_local() + .property(move |_, key, _, value| { + match (key, value) { + (Some("clock.rate"), Some(rate)) => { + let Ok(rate) = rate.parse() else { + return 0; + }; + settings.borrow_mut().rate = rate; + } + (Some("clock.allowed-rates"), Some(list)) => { + let Some(list) = list.strip_prefix("[") else { + return 0; + }; + let Some(list) = list.strip_suffix("]") else { + return 0; + }; + let list = list.trim(); + let list: Vec<&str> = list.split(' ').collect(); + let mut allow_rates = vec![]; + for rate in list { + let Ok(rate) = rate.parse() else { + return 0; + }; + allow_rates.push(rate); + } + settings.borrow_mut().allow_rates = allow_rates; + } + (Some("clock.quantum"), Some(quantum)) => { + let Ok(quantum) = quantum.parse() else { + return 0; + }; + settings.borrow_mut().quantum = quantum; + } + (Some("clock.min-quantum"), Some(min_quantum)) => { + let Ok(min_quantum) = min_quantum.parse() else { + return 0; + }; + settings.borrow_mut().min_quantum = min_quantum; + } + (Some("clock.max-quantum"), Some(max_quantum)) => { + let Ok(max_quantum) = max_quantum.parse() else { + return 0; + }; + settings.borrow_mut().max_quantum = max_quantum; + } + _ => {} + } + 0 + }) + .register(); + let pending = core.sync(0).expect("sync failed"); + peddings.borrow_mut().push(pending); + requests + .borrow_mut() + .push((meta_settings.upcast(), Request::Meta(listener))); + } + pipewire::types::ObjectType::Node => { + let Some(props) = global.props else { + return; + }; + let Some(media_class) = props.get("media.class") else { + return; + }; + if !matches!(media_class, "Audio/Sink" | "Audio/Source") { + return; + } + + let node: Node = registry.bind(global).expect("should ok"); + + let devices = devices.clone(); + let listener = node + .add_listener_local() + .info(move |info| { + let Some(props) = info.props() else { + return; + }; + let Some(media_class) = props.get("media.class") else { + return; + }; + let direction = match media_class { + "Audio/Sink" => DeviceDirection::Input, + "Audio/Source" => DeviceDirection::Output, + _ => { + return; + } + }; + let id = info.id(); + let node_name = props.get("node.name").unwrap_or("unknown").to_owned(); + let nick_name = props.get("node.nick").unwrap_or("unknown").to_owned(); + let description = props + .get("node.description") + .unwrap_or("unknown") + .to_owned(); + let channels: usize = props + .get("audio.channels") + .and_then(|channels| channels.parse().ok()) + .unwrap_or(2); + let limit_quantum: u32 = props + .get("clock.quantum-limit") + .and_then(|channels| channels.parse().ok()) + .unwrap_or(0); + let device = Device { + id, + node_name, + nick_name, + description, + direction, + channels, + limit_quantum, + ..Default::default() + }; + devices.borrow_mut().push(device); + }) + .register(); + let pending = core.sync(0).expect("sync failed"); + peddings.borrow_mut().push(pending); + requests + .borrow_mut() + .push((node.upcast(), Request::Node(listener))); + } + _ => {} + } + }) + .register(); + + mainloop.run(); + + let mut devices = devices.take(); + let settings = settings.take(); + for device in devices.iter_mut() { + device.rate = settings.rate; + device.allow_rates = settings.allow_rates.clone(); + device.quantum = settings.quantum; + device.min_quantum = settings.min_quantum; + device.max_quantum = settings.max_quantum; + } + Some(devices) +} + +pub fn init_devices() -> Option> { + pw::init(); + let devices = init_roundtrip()?; + unsafe { + pw::deinit(); + } + Some(devices) +} diff --git a/src/host/pipewire/mod.rs b/src/host/pipewire/mod.rs new file mode 100644 index 000000000..19c9b2d02 --- /dev/null +++ b/src/host/pipewire/mod.rs @@ -0,0 +1,12 @@ +use device::{init_devices, Device}; +mod device; + +#[derive(Debug)] +pub struct Host(Vec); + +impl Host { + pub fn new() -> Result { + let devices = init_devices().ok_or(crate::HostUnavailable)?; + Ok(Host(devices)) + } +} From 92b9deaf0c74ff16c8e7887a1b42514e09cbd0e0 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Mon, 5 Jan 2026 23:28:50 +0900 Subject: [PATCH 02/27] feat: base settings --- Cargo.toml | 5 +- src/host/mod.rs | 2 +- src/host/pipewire/device.rs | 166 +++++++++++++++++++++++++++++++++--- src/host/pipewire/mod.rs | 28 +++++- src/platform/mod.rs | 18 +++- 5 files changed, 202 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8e16e0ee3..e40f1759e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ asio = [ # Platform: Linux, DragonFly BSD, FreeBSD, NetBSD, macOS, Windows # Note: JACK must be installed separately on all platforms jack = ["dep:jack"] +pipewire = ["dep:pipewire"] # WebAssembly backend using wasm-bindgen # Enables the Web Audio API backend for browser-based audio @@ -53,9 +54,10 @@ audioworklet = [ # Platform: All platforms custom = [] +default = ["pipewire"] + [dependencies] dasp_sample = "0.11" -pipewire = "0.9.2" [dev-dependencies] anyhow = "1.0" @@ -90,6 +92,7 @@ alsa = "0.11" libc = "0.2" audio_thread_priority = { version = "0.34", optional = true } jack = { version = "0.13", optional = true } +pipewire = { version = "0.9.2", optional = true } [target.'cfg(target_vendor = "apple")'.dependencies] mach2 = "0.5" diff --git a/src/host/mod.rs b/src/host/mod.rs index bba12fe63..608f95720 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -9,7 +9,7 @@ pub(crate) mod aaudio; pub(crate) mod alsa; #[cfg(all( any(target_os = "linux", target_os = "dragonfly", target_os = "freebsd"), - //feature = "pipewire" + feature = "pipewire" ))] pub(crate) mod pipewire; #[cfg(all(windows, feature = "asio"))] diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index 8b6283f49..6de9e1c25 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -1,6 +1,11 @@ use std::{cell::RefCell, rc::Rc}; -use crate::DeviceDirection; +use crate::{ + traits::{DeviceTrait, StreamTrait}, + DeviceDirection, SupportedStreamConfigRange, +}; + +use crate::iter::{SupportedInputConfigs, SupportedOutputConfigs}; use pipewire::{ self as pw, metadata::{Metadata, MetadataListener}, @@ -20,6 +25,7 @@ pub(crate) enum DeviceType { DefaultOutput, } +#[allow(unused)] #[derive(Clone, Debug, Default)] pub struct Device { id: u32, @@ -27,7 +33,7 @@ pub struct Device { nick_name: String, description: String, direction: DeviceDirection, - channels: usize, + channels: u16, limit_quantum: u32, rate: u32, allow_rates: Vec, @@ -38,6 +44,9 @@ pub struct Device { } impl Device { + pub(crate) fn device_type(&self) -> DeviceType { + self.device_type + } fn sink_default() -> Self { Self { id: 0, @@ -76,14 +85,149 @@ impl Device { } } -impl Device { - pub fn id(&self) -> u32 { - self.id +// TODO: +pub struct Stream; + +impl StreamTrait for Stream { + fn play(&self) -> Result<(), crate::PlayStreamError> { + todo!() + } + fn pause(&self) -> Result<(), crate::PauseStreamError> { + todo!() + } +} + +impl DeviceTrait for Device { + type Stream = Stream; + type SupportedInputConfigs = SupportedInputConfigs; + type SupportedOutputConfigs = SupportedOutputConfigs; + + fn id(&self) -> Result { + Ok(crate::DeviceId( + crate::HostId::PipeWire, + self.nick_name.clone(), + )) + } + + // TODO: device type + fn description(&self) -> Result { + Ok(crate::DeviceDescriptionBuilder::new(&self.nick_name) + .direction(self.direction()) + .build()) + } + + fn supports_input(&self) -> bool { + matches!( + self.direction, + DeviceDirection::Input | DeviceDirection::Duplex + ) + } + + fn supports_output(&self) -> bool { + matches!( + self.direction, + DeviceDirection::Output | DeviceDirection::Duplex + ) + } + + // TODO: sample_format + fn supported_input_configs( + &self, + ) -> Result { + if !self.supports_input() { + return Err(crate::SupportedStreamConfigsError::DeviceNotAvailable); + } + Ok(vec![SupportedStreamConfigRange { + channels: self.channels, + min_sample_rate: self.rate, + max_sample_rate: self.rate, + buffer_size: crate::SupportedBufferSize::Range { + min: self.min_quantum, + max: self.max_quantum, + }, + sample_format: crate::SampleFormat::I32, + }] + .into_iter()) } - pub fn name(&self) -> &str { - &self.nick_name + fn supported_output_configs( + &self, + ) -> Result { + if !self.supports_output() { + return Err(crate::SupportedStreamConfigsError::DeviceNotAvailable); + } + Ok(vec![SupportedStreamConfigRange { + channels: self.channels, + min_sample_rate: self.rate, + max_sample_rate: self.rate, + buffer_size: crate::SupportedBufferSize::Range { + min: self.min_quantum, + max: self.max_quantum, + }, + sample_format: crate::SampleFormat::I32, + }] + .into_iter()) + } + fn default_input_config( + &self, + ) -> Result { + Ok(crate::SupportedStreamConfig { + channels: self.channels, + sample_format: crate::SampleFormat::I32, + sample_rate: self.rate, + buffer_size: crate::SupportedBufferSize::Range { + min: self.min_quantum, + max: self.max_quantum, + }, + }) } - pub fn channels(&self) -> usize { + + fn default_output_config( + &self, + ) -> Result { + Ok(crate::SupportedStreamConfig { + channels: self.channels, + sample_format: crate::SampleFormat::I32, + sample_rate: self.rate, + buffer_size: crate::SupportedBufferSize::Range { + min: self.min_quantum, + max: self.max_quantum, + }, + }) + } + + fn build_input_stream_raw( + &self, + config: &crate::StreamConfig, + sample_format: crate::SampleFormat, + data_callback: D, + error_callback: E, + timeout: Option, + ) -> Result + where + D: FnMut(&crate::Data, &crate::InputCallbackInfo) + Send + 'static, + E: FnMut(crate::StreamError) + Send + 'static, + { + todo!() + } + + fn build_output_stream_raw( + &self, + config: &crate::StreamConfig, + sample_format: crate::SampleFormat, + data_callback: D, + error_callback: E, + timeout: Option, + ) -> Result + where + D: FnMut(&mut crate::Data, &crate::OutputCallbackInfo) + Send + 'static, + E: FnMut(crate::StreamError) + Send + 'static, + { + todo!() + } +} + +impl Device { + pub fn channels(&self) -> u16 { self.channels } pub fn direction(&self) -> DeviceDirection { @@ -92,9 +236,7 @@ impl Device { pub fn node_name(&self) -> &str { &self.node_name } - pub fn description(&self) -> &str { - &self.description - } + pub fn limit_quantam(&self) -> u32 { self.limit_quantum } @@ -297,7 +439,7 @@ fn init_roundtrip() -> Option> { .get("node.description") .unwrap_or("unknown") .to_owned(); - let channels: usize = props + let channels = props .get("audio.channels") .and_then(|channels| channels.parse().ok()) .unwrap_or(2); diff --git a/src/host/pipewire/mod.rs b/src/host/pipewire/mod.rs index 19c9b2d02..2e84ce7fc 100644 --- a/src/host/pipewire/mod.rs +++ b/src/host/pipewire/mod.rs @@ -1,4 +1,6 @@ -use device::{init_devices, Device}; +use device::{init_devices, Device, DeviceType, Devices}; + +use crate::traits::HostTrait; mod device; #[derive(Debug)] @@ -10,3 +12,27 @@ impl Host { Ok(Host(devices)) } } + +impl HostTrait for Host { + type Devices = Devices; + type Device = Device; + fn is_available() -> bool { + true + } + fn devices(&self) -> Result { + Ok(self.0.clone().into_iter()) + } + + fn default_input_device(&self) -> Option { + self.0 + .iter() + .find(|device| matches!(device.device_type(), DeviceType::DefaultSink)) + .cloned() + } + fn default_output_device(&self) -> Option { + self.0 + .iter() + .find(|device| matches!(device.device_type(), DeviceType::DefaultOutput)) + .cloned() + } +} diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 0f62026d7..cb87255b8 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -713,11 +713,25 @@ mod platform_impl { ))) )] pub use crate::host::jack::Host as JackHost; - + #[cfg(feature = "pipewire")] + #[cfg_attr( + docsrs, + doc(cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "jack" + ))) + )] + pub use crate::host::pipewire::Host as PipeWireHost; impl_platform_host!( #[cfg(feature = "jack")] Jack => JackHost, Alsa => AlsaHost, - #[cfg(feature = "custom")] Custom => super::CustomHost + #[cfg(feature = "custom")] Custom => super::CustomHost, + #[cfg(feature = "pipewire")] PipeWire => super::PipeWireHost, ); /// The default host for the current compilation target platform. From d309a90a615d91af8d2d28e58d28dfef48ded50d Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Tue, 6 Jan 2026 11:33:52 +0900 Subject: [PATCH 03/27] chore: base connect function --- src/host/pipewire/device.rs | 19 +--- src/host/pipewire/mod.rs | 2 +- src/host/pipewire/stream.rs | 196 ++++++++++++++++++++++++++++++++++++ 3 files changed, 201 insertions(+), 16 deletions(-) create mode 100644 src/host/pipewire/stream.rs diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index 6de9e1c25..774939f81 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -1,9 +1,6 @@ use std::{cell::RefCell, rc::Rc}; -use crate::{ - traits::{DeviceTrait, StreamTrait}, - DeviceDirection, SupportedStreamConfigRange, -}; +use crate::{traits::DeviceTrait, DeviceDirection, SupportedStreamConfigRange}; use crate::iter::{SupportedInputConfigs, SupportedOutputConfigs}; use pipewire::{ @@ -14,6 +11,8 @@ use pipewire::{ spa::utils::result::AsyncSeq, }; +use super::stream::Stream; + pub type Devices = std::vec::IntoIter; #[derive(Clone, Debug, Default, Copy)] @@ -83,20 +82,10 @@ impl Device { ..Default::default() } } -} - -// TODO: -pub struct Stream; - -impl StreamTrait for Stream { - fn play(&self) -> Result<(), crate::PlayStreamError> { - todo!() - } - fn pause(&self) -> Result<(), crate::PauseStreamError> { + pub(crate) fn pw_properties(&self) -> pw::properties::Properties { todo!() } } - impl DeviceTrait for Device { type Stream = Stream; type SupportedInputConfigs = SupportedInputConfigs; diff --git a/src/host/pipewire/mod.rs b/src/host/pipewire/mod.rs index 2e84ce7fc..02e493db2 100644 --- a/src/host/pipewire/mod.rs +++ b/src/host/pipewire/mod.rs @@ -2,7 +2,7 @@ use device::{init_devices, Device, DeviceType, Devices}; use crate::traits::HostTrait; mod device; - +mod stream; #[derive(Debug)] pub struct Host(Vec); diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs new file mode 100644 index 000000000..bfc9a0d67 --- /dev/null +++ b/src/host/pipewire/stream.rs @@ -0,0 +1,196 @@ +use std::time::Duration; + +use crate::{traits::StreamTrait, InputCallbackInfo, SampleFormat, StreamConfig, StreamError}; +use pipewire::{ + self as pw, + context::ContextRc, + main_loop::MainLoopRc, + spa::{ + param::{ + format::{MediaSubtype, MediaType}, + format_utils, + }, + pod::Pod, + }, + stream::{StreamListener, StreamRc}, +}; + +use crate::Data; +pub struct Stream; + +impl StreamTrait for Stream { + fn play(&self) -> Result<(), crate::PlayStreamError> { + todo!() + } + fn pause(&self) -> Result<(), crate::PauseStreamError> { + todo!() + } +} + +impl From for pw::spa::param::audio::AudioFormat { + fn from(value: SampleFormat) -> Self { + match value { + SampleFormat::I8 => Self::S8, + SampleFormat::U8 => Self::U8, + + #[cfg(target_endian = "little")] + SampleFormat::I16 => Self::S16LE, + #[cfg(target_endian = "big")] + SampleFormat::I16 => Self::S16BE, + #[cfg(target_endian = "little")] + SampleFormat::U16 => Self::U16LE, + #[cfg(target_endian = "big")] + SampleFormat::U16 => Self::U16BE, + + #[cfg(target_endian = "little")] + SampleFormat::I24 => Self::S24LE, + #[cfg(target_endian = "big")] + SampleFormat::I24 => Self::S24BE, + #[cfg(target_endian = "little")] + SampleFormat::U24 => Self::U24LE, + #[cfg(target_endian = "big")] + SampleFormat::U24 => Self::U24BE, + #[cfg(target_endian = "little")] + SampleFormat::I32 => Self::S32LE, + #[cfg(target_endian = "big")] + SampleFormat::I32 => Self::S32BE, + #[cfg(target_endian = "little")] + SampleFormat::U32 => Self::U32LE, + #[cfg(target_endian = "big")] + SampleFormat::U32 => Self::U32BE, + #[cfg(target_endian = "little")] + SampleFormat::F32 => Self::F64BE, + #[cfg(target_endian = "big")] + SampleFormat::F32 => Self::F32BE, + #[cfg(target_endian = "little")] + SampleFormat::F64 => Self::F64LE, + #[cfg(target_endian = "big")] + SampleFormat::F64 => Self::F64BE, + SampleFormat::I64 => Self::Unknown, + SampleFormat::U64 => Self::Unknown, + } + } +} +struct UserData { + data_callback: D, + error_callback: E, + samples_format: SampleFormat, + format: pw::spa::param::audio::AudioInfoRaw, +} +struct StreamData { + mainloop: MainLoopRc, + listener: StreamListener>, + stream: StreamRc, + context: ContextRc, +} + +fn connect_input( + config: &StreamConfig, + properties: pw::properties::PropertiesBox, + samples_format: SampleFormat, + data_callback: D, + error_callback: E, + _timeout: Option, +) -> Result, pw::Error> +where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, +{ + pw::init(); + let mainloop = pw::main_loop::MainLoopRc::new(None)?; + let context = pw::context::ContextRc::new(&mainloop, None)?; + let core = context.connect_rc(None)?; + + let data = UserData { + data_callback, + error_callback, + samples_format, + format: Default::default(), + }; + + let stream = pw::stream::StreamRc::new(core, "cpal-capture", properties)?; + let listener = stream + .add_local_listener_with_user_data(data) + .param_changed(|_, user_data, id, param| { + let Some(param) = param else { + return; + }; + if id != pw::spa::param::ParamType::Format.as_raw() { + return; + } + + let (media_type, media_subtype) = match format_utils::parse_format(param) { + Ok(v) => v, + Err(_) => return, + }; + + // only accept raw audio + if media_type != MediaType::Audio || media_subtype != MediaSubtype::Raw { + return; + } + + // call a helper function to parse the format for us. + user_data + .format + .parse(param) + .expect("Failed to parse param changed to AudioInfoRaw"); + }) + .process(|stream, user_data| match stream.dequeue_buffer() { + None => (user_data.error_callback)(StreamError::BufferUnderrun), + Some(mut buffer) => { + let datas = buffer.datas_mut(); + if datas.is_empty() { + return; + } + let data = &mut datas[0]; + let n_channels = user_data.format.channels(); + let n_samples = data.chunk().size() / user_data.samples_format.sample_size() as u32; + + let Some(samples) = data.data() else { + return; + }; + let data = samples.as_ptr() as *mut (); + let _data = + unsafe { Data::from_parts(data, n_samples as usize, user_data.samples_format) }; + //(user_data.data_callback)(&data,crate::InputCallbackInfo::new(Ins) ) + } + }) + .register()?; + let mut audio_info = pw::spa::param::audio::AudioInfoRaw::new(); + audio_info.set_format(samples_format.into()); + audio_info.set_rate(config.sample_rate); + audio_info.set_channels(config.channels as u32); + + let obj = pw::spa::pod::Object { + type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(), + id: pw::spa::param::ParamType::EnumFormat.as_raw(), + properties: audio_info.into(), + }; + let values: Vec = pw::spa::pod::serialize::PodSerializer::serialize( + std::io::Cursor::new(Vec::new()), + &pw::spa::pod::Value::Object(obj), + ) + .unwrap() + .0 + .into_inner(); + + let mut params = [Pod::from_bytes(&values).unwrap()]; + + /* Now connect this stream. We ask that our process function is + * called in a realtime thread. */ + stream.connect( + pw::spa::utils::Direction::Input, + None, + pw::stream::StreamFlags::AUTOCONNECT + | pw::stream::StreamFlags::MAP_BUFFERS + | pw::stream::StreamFlags::RT_PROCESS, + &mut params, + )?; + + Ok(StreamData { + mainloop, + listener, + stream, + context, + }) +} From d3441604e4d632821b79c33b3d179c666a79182e Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Tue, 6 Jan 2026 18:59:59 +0900 Subject: [PATCH 04/27] chore: nearly finished --- Cargo.toml | 2 +- src/host/pipewire/device.rs | 159 ++++++++++++++++++++++++++++++++++-- src/host/pipewire/stream.rs | 156 +++++++++++++++++++++++++++++++---- 3 files changed, 291 insertions(+), 26 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e40f1759e..324130a67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,7 +92,7 @@ alsa = "0.11" libc = "0.2" audio_thread_priority = { version = "0.34", optional = true } jack = { version = "0.13", optional = true } -pipewire = { version = "0.9.2", optional = true } +pipewire = { version = "0.9.2", optional = true, features = ["v0_3_44"]} [target.'cfg(target_vendor = "apple")'.dependencies] mach2 = "0.5" diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index 774939f81..c7a6c63fa 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -1,5 +1,7 @@ +use std::time::Duration; use std::{cell::RefCell, rc::Rc}; +use crate::host::pipewire::stream::StreamData; use crate::{traits::DeviceTrait, DeviceDirection, SupportedStreamConfigRange}; use crate::iter::{SupportedInputConfigs, SupportedOutputConfigs}; @@ -11,6 +13,8 @@ use pipewire::{ spa::utils::result::AsyncSeq, }; +use std::thread; + use super::stream::Stream; pub type Devices = std::vec::IntoIter; @@ -24,6 +28,13 @@ pub(crate) enum DeviceType { DefaultOutput, } +#[derive(Clone, Debug, Default, Copy)] +pub enum Role { + Sink, + #[default] + Source, +} + #[allow(unused)] #[derive(Clone, Debug, Default)] pub struct Device { @@ -40,6 +51,8 @@ pub struct Device { min_quantum: u32, max_quantum: u32, device_type: DeviceType, + object_id: String, + role: Role, } impl Device { @@ -55,6 +68,7 @@ impl Device { direction: DeviceDirection::Input, channels: 2, device_type: DeviceType::DefaultSink, + role: Role::Sink, ..Default::default() } } @@ -67,6 +81,7 @@ impl Device { direction: DeviceDirection::Input, channels: 2, device_type: DeviceType::DefaultInput, + role: Role::Source, ..Default::default() } } @@ -79,11 +94,31 @@ impl Device { direction: DeviceDirection::Output, channels: 2, device_type: DeviceType::DefaultOutput, + role: Role::Source, ..Default::default() } } - pub(crate) fn pw_properties(&self) -> pw::properties::Properties { - todo!() + pub(crate) fn pw_properties(&self) -> pw::properties::PropertiesBox { + let mut properties = match self.direction { + DeviceDirection::Output => pw::properties::properties! { + *pw::keys::MEDIA_TYPE => "Audio", + *pw::keys::MEDIA_CATEGORY => "Playback", + *pw::keys::MEDIA_ROLE => "Music", + }, + DeviceDirection::Input => pw::properties::properties! { + *pw::keys::MEDIA_TYPE => "Audio", + *pw::keys::MEDIA_CATEGORY => "Capture", + *pw::keys::MEDIA_ROLE => "Music", + }, + _ => unreachable!(), + }; + if matches!(self.role, Role::Sink) { + properties.insert(*pw::keys::STREAM_CAPTURE_SINK, "true"); + } + if matches!(self.device_type, DeviceType::Node) { + properties.insert(*pw::keys::TARGET_OBJECT, self.object_id.to_owned()); + } + properties } } impl DeviceTrait for Device { @@ -196,7 +231,54 @@ impl DeviceTrait for Device { D: FnMut(&crate::Data, &crate::InputCallbackInfo) + Send + 'static, E: FnMut(crate::StreamError) + Send + 'static, { - todo!() + let (pw_play_tx, pw_play_rv) = pw::channel::channel::(); + + let (pw_init_tx, pw_init_rv) = std::sync::mpsc::channel::(); + let device = self.clone(); + let config = config.clone(); + let wait_timeout = timeout.clone().unwrap_or(Duration::from_secs(2)); + let handle = thread::Builder::new() + .name("pw_capture_music_in".to_owned()) + .spawn(move || { + let properties = device.pw_properties(); + let Ok(StreamData { + mainloop, + listener, + stream, + context, + }) = super::stream::connect_input( + &config, + properties, + sample_format, + data_callback, + error_callback, + timeout, + ) + else { + let _ = pw_init_tx.send(false); + return; + }; + let _ = pw_init_tx.send(true); + let stream = stream.clone(); + let _receiver = pw_play_rv.attach(mainloop.loop_(), move |play| { + let _ = stream.set_active(play); + }); + mainloop.run(); + drop(listener); + drop(context); + }) + .unwrap(); + if pw_init_rv + .recv_timeout(wait_timeout) + .ok() + .is_none_or(|re| !re) + { + return Err(crate::BuildStreamError::DeviceNotAvailable); + }; + Ok(Stream { + handle, + controller: pw_play_tx, + }) } fn build_output_stream_raw( @@ -211,7 +293,54 @@ impl DeviceTrait for Device { D: FnMut(&mut crate::Data, &crate::OutputCallbackInfo) + Send + 'static, E: FnMut(crate::StreamError) + Send + 'static, { - todo!() + let (pw_play_tx, pw_play_rv) = pw::channel::channel::(); + + let (pw_init_tx, pw_init_rv) = std::sync::mpsc::channel::(); + let device = self.clone(); + let config = config.clone(); + let wait_timeout = timeout.clone().unwrap_or(Duration::from_secs(2)); + let handle = thread::Builder::new() + .name("pw_capture_music_out".to_owned()) + .spawn(move || { + let properties = device.pw_properties(); + let Ok(StreamData { + mainloop, + listener, + stream, + context, + }) = super::stream::connect_output( + &config, + properties, + sample_format, + data_callback, + error_callback, + timeout, + ) + else { + let _ = pw_init_tx.send(false); + return; + }; + let _ = pw_init_tx.send(true); + let stream = stream.clone(); + let _receiver = pw_play_rv.attach(mainloop.loop_(), move |play| { + let _ = stream.set_active(play); + }); + mainloop.run(); + drop(listener); + drop(context); + }) + .unwrap(); + if pw_init_rv + .recv_timeout(wait_timeout) + .ok() + .is_none_or(|re| !re) + { + return Err(crate::BuildStreamError::DeviceNotAvailable); + }; + Ok(Stream { + handle, + controller: pw_play_tx, + }) } } @@ -414,13 +543,26 @@ fn init_roundtrip() -> Option> { let Some(media_class) = props.get("media.class") else { return; }; - let direction = match media_class { - "Audio/Sink" => DeviceDirection::Input, - "Audio/Source" => DeviceDirection::Output, + let role = match media_class { + "Audio/Sink" => Role::Sink, + "Audio/Source" => Role::Source, _ => { return; } }; + let Some(group) = props.get("port.group") else { + return; + }; + let direction = match group { + "playback" => DeviceDirection::Input, + "capture" => DeviceDirection::Output, + _ => { + return; + } + }; + let Some(object_id) = props.get("object.id") else { + return; + }; let id = info.id(); let node_name = props.get("node.name").unwrap_or("unknown").to_owned(); let nick_name = props.get("node.nick").unwrap_or("unknown").to_owned(); @@ -436,14 +578,17 @@ fn init_roundtrip() -> Option> { .get("clock.quantum-limit") .and_then(|channels| channels.parse().ok()) .unwrap_or(0); + let device = Device { id, node_name, nick_name, description, direction, + role, channels, limit_quantum, + object_id: object_id.to_owned(), ..Default::default() }; devices.borrow_mut().push(device); diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index bfc9a0d67..328925f4d 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -1,6 +1,9 @@ -use std::time::Duration; +use std::{thread::JoinHandle, time::Duration}; -use crate::{traits::StreamTrait, InputCallbackInfo, SampleFormat, StreamConfig, StreamError}; +use crate::{ + traits::StreamTrait, InputCallbackInfo, OutputCallbackInfo, SampleFormat, StreamConfig, + StreamError, +}; use pipewire::{ self as pw, context::ContextRc, @@ -16,14 +19,21 @@ use pipewire::{ }; use crate::Data; -pub struct Stream; + +#[allow(unused)] +pub struct Stream { + pub(crate) handle: JoinHandle<()>, + pub(crate) controller: pw::channel::Sender, +} impl StreamTrait for Stream { fn play(&self) -> Result<(), crate::PlayStreamError> { - todo!() + let _ = self.controller.send(true); + Ok(()) } fn pause(&self) -> Result<(), crate::PauseStreamError> { - todo!() + let _ = self.controller.send(false); + Ok(()) } } @@ -71,23 +81,133 @@ impl From for pw::spa::param::audio::AudioFormat { } } } -struct UserData { + +pub struct UserData { data_callback: D, error_callback: E, - samples_format: SampleFormat, + sample_format: SampleFormat, format: pw::spa::param::audio::AudioInfoRaw, } -struct StreamData { - mainloop: MainLoopRc, - listener: StreamListener>, - stream: StreamRc, - context: ContextRc, +pub struct StreamData { + pub mainloop: MainLoopRc, + pub listener: StreamListener>, + pub stream: StreamRc, + pub context: ContextRc, } +pub fn connect_output( + config: &StreamConfig, + properties: pw::properties::PropertiesBox, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + _timeout: Option, +) -> Result, pw::Error> +where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, +{ + pw::init(); + let mainloop = pw::main_loop::MainLoopRc::new(None)?; + let context = pw::context::ContextRc::new(&mainloop, None)?; + let core = context.connect_rc(None)?; -fn connect_input( + let data = UserData { + data_callback, + error_callback, + sample_format, + format: Default::default(), + }; + + let stream = pw::stream::StreamRc::new(core, "cpal-capture", properties)?; + let listener = stream + .add_local_listener_with_user_data(data) + .param_changed(|_, user_data, id, param| { + let Some(param) = param else { + return; + }; + if id != pw::spa::param::ParamType::Format.as_raw() { + return; + } + + let (media_type, media_subtype) = match format_utils::parse_format(param) { + Ok(v) => v, + Err(_) => return, + }; + + // only accept raw audio + if media_type != MediaType::Audio || media_subtype != MediaSubtype::Raw { + return; + } + + // call a helper function to parse the format for us. + user_data + .format + .parse(param) + .expect("Failed to parse param changed to AudioInfoRaw"); + }) + .process(|stream, user_data| match stream.dequeue_buffer() { + None => (user_data.error_callback)(StreamError::BufferUnderrun), + Some(mut buffer) => { + let datas = buffer.datas_mut(); + if datas.is_empty() { + return; + } + let data = &mut datas[0]; + let n_channels = user_data.format.channels(); + let n_samples = data.chunk().size() / user_data.sample_format.sample_size() as u32; + + let Some(samples) = data.data() else { + return; + }; + let data = samples.as_ptr() as *mut (); + let _data = + unsafe { Data::from_parts(data, n_samples as usize, user_data.sample_format) }; + //(user_data.data_callback)(&data,crate::InputCallbackInfo::new(Ins) ) + } + }) + .register()?; + let mut audio_info = pw::spa::param::audio::AudioInfoRaw::new(); + audio_info.set_format(sample_format.into()); + audio_info.set_rate(config.sample_rate); + audio_info.set_channels(config.channels as u32); + + let obj = pw::spa::pod::Object { + type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(), + id: pw::spa::param::ParamType::EnumFormat.as_raw(), + properties: audio_info.into(), + }; + let values: Vec = pw::spa::pod::serialize::PodSerializer::serialize( + std::io::Cursor::new(Vec::new()), + &pw::spa::pod::Value::Object(obj), + ) + .unwrap() + .0 + .into_inner(); + + let mut params = [Pod::from_bytes(&values).unwrap()]; + + /* Now connect this stream. We ask that our process function is + * called in a realtime thread. */ + stream.connect( + pw::spa::utils::Direction::Output, + None, + pw::stream::StreamFlags::AUTOCONNECT + | pw::stream::StreamFlags::MAP_BUFFERS + | pw::stream::StreamFlags::RT_PROCESS, + &mut params, + )?; + + Ok(StreamData { + mainloop, + listener, + stream, + context, + }) +} +pub fn connect_input( config: &StreamConfig, properties: pw::properties::PropertiesBox, - samples_format: SampleFormat, + sample_format: SampleFormat, data_callback: D, error_callback: E, _timeout: Option, @@ -104,7 +224,7 @@ where let data = UserData { data_callback, error_callback, - samples_format, + sample_format, format: Default::default(), }; @@ -144,20 +264,20 @@ where } let data = &mut datas[0]; let n_channels = user_data.format.channels(); - let n_samples = data.chunk().size() / user_data.samples_format.sample_size() as u32; + let n_samples = data.chunk().size() / user_data.sample_format.sample_size() as u32; let Some(samples) = data.data() else { return; }; let data = samples.as_ptr() as *mut (); let _data = - unsafe { Data::from_parts(data, n_samples as usize, user_data.samples_format) }; + unsafe { Data::from_parts(data, n_samples as usize, user_data.sample_format) }; //(user_data.data_callback)(&data,crate::InputCallbackInfo::new(Ins) ) } }) .register()?; let mut audio_info = pw::spa::param::audio::AudioInfoRaw::new(); - audio_info.set_format(samples_format.into()); + audio_info.set_format(sample_format.into()); audio_info.set_rate(config.sample_rate); audio_info.set_channels(config.channels as u32); From af2d9d0b6cd4ae92f3a4c5494ea34f0c0fd5e780 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Tue, 6 Jan 2026 21:08:19 +0900 Subject: [PATCH 05/27] chore: seems working well --- _typos.toml | 5 ++ src/host/pipewire/device.rs | 37 +++++++----- src/host/pipewire/mod.rs | 2 + src/host/pipewire/stream.rs | 113 ++++++++++++++++++++++++++++++++---- src/platform/mod.rs | 2 +- 5 files changed, 130 insertions(+), 29 deletions(-) create mode 100644 _typos.toml diff --git a/_typos.toml b/_typos.toml new file mode 100644 index 000000000..35e0a4db7 --- /dev/null +++ b/_typos.toml @@ -0,0 +1,5 @@ +[files] +extend-exclude = ["**/*.cmake", "**/*.json", "assets_for_test/*"] + +[default.extend-words] +datas = "datas" diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index c7a6c63fa..65a21df4a 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -52,6 +52,7 @@ pub struct Device { max_quantum: u32, device_type: DeviceType, object_id: String, + device_id: String, role: Role, } @@ -112,11 +113,12 @@ impl Device { }, _ => unreachable!(), }; + dbg!(&self); if matches!(self.role, Role::Sink) { properties.insert(*pw::keys::STREAM_CAPTURE_SINK, "true"); } if matches!(self.device_type, DeviceType::Node) { - properties.insert(*pw::keys::TARGET_OBJECT, self.object_id.to_owned()); + properties.insert(*pw::keys::TARGET_OBJECT, self.device_id.to_owned()); } properties } @@ -154,7 +156,6 @@ impl DeviceTrait for Device { ) } - // TODO: sample_format fn supported_input_configs( &self, ) -> Result { @@ -169,7 +170,7 @@ impl DeviceTrait for Device { min: self.min_quantum, max: self.max_quantum, }, - sample_format: crate::SampleFormat::I32, + sample_format: crate::SampleFormat::F32, }] .into_iter()) } @@ -187,7 +188,7 @@ impl DeviceTrait for Device { min: self.min_quantum, max: self.max_quantum, }, - sample_format: crate::SampleFormat::I32, + sample_format: crate::SampleFormat::F32, }] .into_iter()) } @@ -196,7 +197,7 @@ impl DeviceTrait for Device { ) -> Result { Ok(crate::SupportedStreamConfig { channels: self.channels, - sample_format: crate::SampleFormat::I32, + sample_format: crate::SampleFormat::F32, sample_rate: self.rate, buffer_size: crate::SupportedBufferSize::Range { min: self.min_quantum, @@ -210,7 +211,7 @@ impl DeviceTrait for Device { ) -> Result { Ok(crate::SupportedStreamConfig { channels: self.channels, - sample_format: crate::SampleFormat::I32, + sample_format: crate::SampleFormat::F32, sample_rate: self.rate, buffer_size: crate::SupportedBufferSize::Range { min: self.min_quantum, @@ -303,23 +304,27 @@ impl DeviceTrait for Device { .name("pw_capture_music_out".to_owned()) .spawn(move || { let properties = device.pw_properties(); - let Ok(StreamData { + + let StreamData { mainloop, listener, stream, context, - }) = super::stream::connect_output( + } = match super::stream::connect_output( &config, properties, sample_format, data_callback, error_callback, timeout, - ) - else { - let _ = pw_init_tx.send(false); - return; + ) { + Ok(data) => data, + Err(_) => { + let _ = pw_init_tx.send(false); + return; + } }; + let _ = pw_init_tx.send(true); let stream = stream.clone(); let _receiver = pw_play_rv.attach(mainloop.loop_(), move |play| { @@ -563,6 +568,9 @@ fn init_roundtrip() -> Option> { let Some(object_id) = props.get("object.id") else { return; }; + let Some(device_id) = props.get("device.id") else { + return; + }; let id = info.id(); let node_name = props.get("node.name").unwrap_or("unknown").to_owned(); let nick_name = props.get("node.nick").unwrap_or("unknown").to_owned(); @@ -589,6 +597,7 @@ fn init_roundtrip() -> Option> { channels, limit_quantum, object_id: object_id.to_owned(), + device_id: device_id.to_owned(), ..Default::default() }; devices.borrow_mut().push(device); @@ -620,10 +629,6 @@ fn init_roundtrip() -> Option> { } pub fn init_devices() -> Option> { - pw::init(); let devices = init_roundtrip()?; - unsafe { - pw::deinit(); - } Some(devices) } diff --git a/src/host/pipewire/mod.rs b/src/host/pipewire/mod.rs index 02e493db2..65b978371 100644 --- a/src/host/pipewire/mod.rs +++ b/src/host/pipewire/mod.rs @@ -3,11 +3,13 @@ use device::{init_devices, Device, DeviceType, Devices}; use crate::traits::HostTrait; mod device; mod stream; +use pipewire as pw; #[derive(Debug)] pub struct Host(Vec); impl Host { pub fn new() -> Result { + pw::init(); let devices = init_devices().ok_or(crate::HostUnavailable)?; Ok(Host(devices)) } diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index 328925f4d..8b16d18e9 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -1,8 +1,11 @@ -use std::{thread::JoinHandle, time::Duration}; +use std::{ + thread::JoinHandle, + time::{Duration, Instant}, +}; use crate::{ - traits::StreamTrait, InputCallbackInfo, OutputCallbackInfo, SampleFormat, StreamConfig, - StreamError, + traits::StreamTrait, BackendSpecificError, InputCallbackInfo, OutputCallbackInfo, SampleFormat, + StreamConfig, StreamError, }; use pipewire::{ self as pw, @@ -69,7 +72,7 @@ impl From for pw::spa::param::audio::AudioFormat { #[cfg(target_endian = "big")] SampleFormat::U32 => Self::U32BE, #[cfg(target_endian = "little")] - SampleFormat::F32 => Self::F64BE, + SampleFormat::F32 => Self::F32LE, #[cfg(target_endian = "big")] SampleFormat::F32 => Self::F32BE, #[cfg(target_endian = "little")] @@ -87,6 +90,52 @@ pub struct UserData { error_callback: E, sample_format: SampleFormat, format: pw::spa::param::audio::AudioInfoRaw, + created_instance: Instant, +} + +impl UserData +where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, +{ + fn publish_data_in(&mut self, frames: usize, data: &Data) -> Result<(), BackendSpecificError> { + let callback = stream_timestamp_fallback(self.created_instance)?; + let delay_duration = frames_to_duration(frames, self.format.rate()); + let capture = callback + .add(delay_duration) + .ok_or_else(|| BackendSpecificError { + description: "`playback` occurs beyond representation supported by `StreamInstant`" + .to_string(), + })?; + let timestamp = crate::InputStreamTimestamp { callback, capture }; + let info = crate::InputCallbackInfo { timestamp }; + (self.data_callback)(data, &info); + Ok(()) + } +} +impl UserData +where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, +{ + fn publish_data_out( + &mut self, + frames: usize, + data: &mut Data, + ) -> Result<(), BackendSpecificError> { + let callback = stream_timestamp_fallback(self.created_instance)?; + let delay_duration = frames_to_duration(frames, self.format.rate()); + let playback = callback + .add(delay_duration) + .ok_or_else(|| BackendSpecificError { + description: "`playback` occurs beyond representation supported by `StreamInstant`" + .to_string(), + })?; + let timestamp = crate::OutputStreamTimestamp { callback, playback }; + let info = crate::OutputCallbackInfo { timestamp }; + (self.data_callback)(data, &info); + Ok(()) + } } pub struct StreamData { pub mainloop: MainLoopRc, @@ -94,6 +143,30 @@ pub struct StreamData { pub stream: StreamRc, pub context: ContextRc, } + +// Use elapsed duration since stream creation as fallback when hardware timestamps are unavailable. +// +// This ensures positive values that are compatible with our `StreamInstant` representation. +#[inline] +fn stream_timestamp_fallback( + creation: std::time::Instant, +) -> Result { + let now = std::time::Instant::now(); + let duration = now.duration_since(creation); + crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128).ok_or(BackendSpecificError { + description: "stream duration has exceeded `StreamInstant` representation".to_string(), + }) +} + +// Convert the given duration in frames at the given sample rate to a `std::time::Duration`. +#[inline] +fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration { + let secsf = frames as f64 / rate as f64; + let secs = secsf as u64; + let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32; + std::time::Duration::new(secs, nanos) +} + pub fn connect_output( config: &StreamConfig, properties: pw::properties::PropertiesBox, @@ -111,14 +184,16 @@ where let context = pw::context::ContextRc::new(&mainloop, None)?; let core = context.connect_rc(None)?; + dbg!(&properties); let data = UserData { data_callback, error_callback, sample_format, format: Default::default(), + created_instance: Instant::now(), }; - let stream = pw::stream::StreamRc::new(core, "cpal-capture", properties)?; + let stream = pw::stream::StreamRc::new(core, "cpal-playback", properties)?; let listener = stream .add_local_listener_with_user_data(data) .param_changed(|_, user_data, id, param| { @@ -152,17 +227,27 @@ where if datas.is_empty() { return; } - let data = &mut datas[0]; + let buf_data = &mut datas[0]; let n_channels = user_data.format.channels(); - let n_samples = data.chunk().size() / user_data.sample_format.sample_size() as u32; - let Some(samples) = data.data() else { + let Some(samples) = buf_data.data() else { return; }; + let stride = user_data.sample_format.sample_size() * n_channels as usize; + let frames = samples.len() / stride; + + let n_samples = samples.len() / user_data.sample_format.sample_size(); + let data = samples.as_ptr() as *mut (); - let _data = + let mut data = unsafe { Data::from_parts(data, n_samples as usize, user_data.sample_format) }; - //(user_data.data_callback)(&data,crate::InputCallbackInfo::new(Ins) ) + if let Err(err) = user_data.publish_data_out(frames, &mut data) { + (user_data.error_callback)(StreamError::BackendSpecific { err }); + } + let chunk = buf_data.chunk_mut(); + *chunk.offset_mut() = 0; + *chunk.stride_mut() = stride as i32; + *chunk.size_mut() = frames as u32; } }) .register()?; @@ -226,6 +311,7 @@ where error_callback, sample_format, format: Default::default(), + created_instance: Instant::now(), }; let stream = pw::stream::StreamRc::new(core, "cpal-capture", properties)?; @@ -265,14 +351,17 @@ where let data = &mut datas[0]; let n_channels = user_data.format.channels(); let n_samples = data.chunk().size() / user_data.sample_format.sample_size() as u32; + let frames = n_samples / n_channels; let Some(samples) = data.data() else { return; }; let data = samples.as_ptr() as *mut (); - let _data = + let data = unsafe { Data::from_parts(data, n_samples as usize, user_data.sample_format) }; - //(user_data.data_callback)(&data,crate::InputCallbackInfo::new(Ins) ) + if let Err(err) = user_data.publish_data_in(frames as usize, &data) { + (user_data.error_callback)(StreamError::BackendSpecific { err }); + } } }) .register()?; diff --git a/src/platform/mod.rs b/src/platform/mod.rs index cb87255b8..d2a62aeb8 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -736,7 +736,7 @@ mod platform_impl { /// The default host for the current compilation target platform. pub fn default_host() -> Host { - AlsaHost::new() + PipeWireHost::new() .expect("the default host should always be available") .into() } From 8a5faeecc30fb9d47f86509aad37526dda8a1925 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Tue, 6 Jan 2026 21:22:54 +0900 Subject: [PATCH 06/27] fix: feature wrong in platform/mod.rs --- src/platform/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/platform/mod.rs b/src/platform/mod.rs index d2a62aeb8..fd57960cc 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -723,7 +723,7 @@ mod platform_impl { target_os = "freebsd", target_os = "netbsd" ), - feature = "jack" + feature = "pipewire" ))) )] pub use crate::host::pipewire::Host as PipeWireHost; From 1eca9ce2b92b357721cb120370435978ed3d853d Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Tue, 6 Jan 2026 21:24:05 +0900 Subject: [PATCH 07/27] chore: tidy up, clippy, fmt --- Cargo.toml | 2 +- src/host/mod.rs | 10 +++++----- src/host/pipewire/device.rs | 4 ++-- src/host/pipewire/stream.rs | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 324130a67..258056511 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ documentation = "https://docs.rs/cpal" license = "Apache-2.0" keywords = ["audio", "sound"] edition = "2021" -rust-version = "1.78" +rust-version = "1.82" [features] # ASIO backend for Windows diff --git a/src/host/mod.rs b/src/host/mod.rs index 608f95720..e5ef68f20 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -7,11 +7,6 @@ pub(crate) mod aaudio; target_os = "netbsd" ))] pub(crate) mod alsa; -#[cfg(all( - any(target_os = "linux", target_os = "dragonfly", target_os = "freebsd"), - feature = "pipewire" -))] -pub(crate) mod pipewire; #[cfg(all(windows, feature = "asio"))] pub(crate) mod asio; #[cfg(all( @@ -36,6 +31,11 @@ pub(crate) mod emscripten; ) ))] pub(crate) mod jack; +#[cfg(all( + any(target_os = "linux", target_os = "dragonfly", target_os = "freebsd"), + feature = "pipewire" +))] +pub(crate) mod pipewire; #[cfg(windows)] pub(crate) mod wasapi; #[cfg(all(target_arch = "wasm32", feature = "wasm-bindgen"))] diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index 65a21df4a..ed53d0ab1 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -237,7 +237,7 @@ impl DeviceTrait for Device { let (pw_init_tx, pw_init_rv) = std::sync::mpsc::channel::(); let device = self.clone(); let config = config.clone(); - let wait_timeout = timeout.clone().unwrap_or(Duration::from_secs(2)); + let wait_timeout = timeout.unwrap_or(Duration::from_secs(2)); let handle = thread::Builder::new() .name("pw_capture_music_in".to_owned()) .spawn(move || { @@ -299,7 +299,7 @@ impl DeviceTrait for Device { let (pw_init_tx, pw_init_rv) = std::sync::mpsc::channel::(); let device = self.clone(); let config = config.clone(); - let wait_timeout = timeout.clone().unwrap_or(Duration::from_secs(2)); + let wait_timeout = timeout.unwrap_or(Duration::from_secs(2)); let handle = thread::Builder::new() .name("pw_capture_music_out".to_owned()) .spawn(move || { diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index 8b16d18e9..e63b5e02d 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -240,7 +240,7 @@ where let data = samples.as_ptr() as *mut (); let mut data = - unsafe { Data::from_parts(data, n_samples as usize, user_data.sample_format) }; + unsafe { Data::from_parts(data, n_samples, user_data.sample_format) }; if let Err(err) = user_data.publish_data_out(frames, &mut data) { (user_data.error_callback)(StreamError::BackendSpecific { err }); } From 36325b6ed25196491a1dea65d7e4c297b738b9a9 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Tue, 6 Jan 2026 21:40:06 +0900 Subject: [PATCH 08/27] fix: output error size should be frames * stride --- src/host/pipewire/mod.rs | 2 -- src/host/pipewire/stream.rs | 3 +-- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/host/pipewire/mod.rs b/src/host/pipewire/mod.rs index 65b978371..02e493db2 100644 --- a/src/host/pipewire/mod.rs +++ b/src/host/pipewire/mod.rs @@ -3,13 +3,11 @@ use device::{init_devices, Device, DeviceType, Devices}; use crate::traits::HostTrait; mod device; mod stream; -use pipewire as pw; #[derive(Debug)] pub struct Host(Vec); impl Host { pub fn new() -> Result { - pw::init(); let devices = init_devices().ok_or(crate::HostUnavailable)?; Ok(Host(devices)) } diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index e63b5e02d..5e9cb54e2 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -184,7 +184,6 @@ where let context = pw::context::ContextRc::new(&mainloop, None)?; let core = context.connect_rc(None)?; - dbg!(&properties); let data = UserData { data_callback, error_callback, @@ -247,7 +246,7 @@ where let chunk = buf_data.chunk_mut(); *chunk.offset_mut() = 0; *chunk.stride_mut() = stride as i32; - *chunk.size_mut() = frames as u32; + *chunk.size_mut() = (frames * stride) as u32; } }) .register()?; From 4e1a62d1a18713427106da24cb0d87bb261192b8 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Tue, 6 Jan 2026 22:58:49 +0900 Subject: [PATCH 09/27] feat: sink can do both side --- src/host/pipewire/device.rs | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index ed53d0ab1..ab4ab1552 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -99,8 +99,11 @@ impl Device { ..Default::default() } } - pub(crate) fn pw_properties(&self) -> pw::properties::PropertiesBox { - let mut properties = match self.direction { + pub(crate) fn pw_properties( + &self, + direction: DeviceDirection, + ) -> pw::properties::PropertiesBox { + let mut properties = match direction { DeviceDirection::Output => pw::properties::properties! { *pw::keys::MEDIA_TYPE => "Audio", *pw::keys::MEDIA_CATEGORY => "Playback", @@ -195,6 +198,9 @@ impl DeviceTrait for Device { fn default_input_config( &self, ) -> Result { + if !self.supports_input() { + return Err(crate::DefaultStreamConfigError::StreamTypeNotSupported); + } Ok(crate::SupportedStreamConfig { channels: self.channels, sample_format: crate::SampleFormat::F32, @@ -209,6 +215,9 @@ impl DeviceTrait for Device { fn default_output_config( &self, ) -> Result { + if !self.supports_output() { + return Err(crate::DefaultStreamConfigError::StreamTypeNotSupported); + } Ok(crate::SupportedStreamConfig { channels: self.channels, sample_format: crate::SampleFormat::F32, @@ -241,7 +250,7 @@ impl DeviceTrait for Device { let handle = thread::Builder::new() .name("pw_capture_music_in".to_owned()) .spawn(move || { - let properties = device.pw_properties(); + let properties = device.pw_properties(DeviceDirection::Input); let Ok(StreamData { mainloop, listener, @@ -303,7 +312,7 @@ impl DeviceTrait for Device { let handle = thread::Builder::new() .name("pw_capture_music_out".to_owned()) .spawn(move || { - let properties = device.pw_properties(); + let properties = device.pw_properties(DeviceDirection::Output); let StreamData { mainloop, @@ -558,9 +567,10 @@ fn init_roundtrip() -> Option> { let Some(group) = props.get("port.group") else { return; }; - let direction = match group { - "playback" => DeviceDirection::Input, - "capture" => DeviceDirection::Output, + let direction = match (group, role) { + ("playback", Role::Sink) => DeviceDirection::Duplex, + ("playback", Role::Source) => DeviceDirection::Input, + ("capture", _) => DeviceDirection::Input, _ => { return; } From 912411e68db3c214b8d6cdb8d4673533785df109 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Tue, 6 Jan 2026 23:04:47 +0900 Subject: [PATCH 10/27] chore: complete supported configs --- src/host/pipewire/device.rs | 52 +++++++++++++++++++++---------------- src/host/pipewire/stream.rs | 12 +++++++++ 2 files changed, 41 insertions(+), 23 deletions(-) diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index ab4ab1552..d03f05cfa 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -1,7 +1,7 @@ use std::time::Duration; use std::{cell::RefCell, rc::Rc}; -use crate::host::pipewire::stream::StreamData; +use crate::host::pipewire::stream::{StreamData, SUPPORTED_FORMATS}; use crate::{traits::DeviceTrait, DeviceDirection, SupportedStreamConfigRange}; use crate::iter::{SupportedInputConfigs, SupportedOutputConfigs}; @@ -165,17 +165,20 @@ impl DeviceTrait for Device { if !self.supports_input() { return Err(crate::SupportedStreamConfigsError::DeviceNotAvailable); } - Ok(vec![SupportedStreamConfigRange { - channels: self.channels, - min_sample_rate: self.rate, - max_sample_rate: self.rate, - buffer_size: crate::SupportedBufferSize::Range { - min: self.min_quantum, - max: self.max_quantum, - }, - sample_format: crate::SampleFormat::F32, - }] - .into_iter()) + Ok(SUPPORTED_FORMATS + .iter() + .map(|sample_format| SupportedStreamConfigRange { + channels: self.channels, + min_sample_rate: self.rate, + max_sample_rate: self.rate, + buffer_size: crate::SupportedBufferSize::Range { + min: self.min_quantum, + max: self.max_quantum, + }, + sample_format: *sample_format, + }) + .collect::>() + .into_iter()) } fn supported_output_configs( &self, @@ -183,17 +186,20 @@ impl DeviceTrait for Device { if !self.supports_output() { return Err(crate::SupportedStreamConfigsError::DeviceNotAvailable); } - Ok(vec![SupportedStreamConfigRange { - channels: self.channels, - min_sample_rate: self.rate, - max_sample_rate: self.rate, - buffer_size: crate::SupportedBufferSize::Range { - min: self.min_quantum, - max: self.max_quantum, - }, - sample_format: crate::SampleFormat::F32, - }] - .into_iter()) + Ok(SUPPORTED_FORMATS + .iter() + .map(|sample_format| SupportedStreamConfigRange { + channels: self.channels, + min_sample_rate: self.rate, + max_sample_rate: self.rate, + buffer_size: crate::SupportedBufferSize::Range { + min: self.min_quantum, + max: self.max_quantum, + }, + sample_format: *sample_format, + }) + .collect::>() + .into_iter()) } fn default_input_config( &self, diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index 5e9cb54e2..18af55c5d 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -40,6 +40,18 @@ impl StreamTrait for Stream { } } +pub(crate) const SUPPORTED_FORMATS: &[SampleFormat] = &[ + SampleFormat::I8, + SampleFormat::U8, + SampleFormat::I16, + SampleFormat::U16, + SampleFormat::I24, + SampleFormat::U24, + SampleFormat::I32, + SampleFormat::U32, + SampleFormat::F64, +]; + impl From for pw::spa::param::audio::AudioFormat { fn from(value: SampleFormat) -> Self { match value { From 49dfbdbbd5f41b71d6278e9375152107585d5a20 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Tue, 6 Jan 2026 23:31:18 +0900 Subject: [PATCH 11/27] feat: support show the device type --- src/host/pipewire/device.rs | 55 +++++++++++++++++++++++-------------- src/host/pipewire/mod.rs | 6 ++-- 2 files changed, 37 insertions(+), 24 deletions(-) diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index d03f05cfa..889befcde 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -19,8 +19,9 @@ use super::stream::Stream; pub type Devices = std::vec::IntoIter; +/// This enum record whether it is created by human or just default device #[derive(Clone, Debug, Default, Copy)] -pub(crate) enum DeviceType { +pub(crate) enum ClassType { #[default] Node, DefaultSink, @@ -50,15 +51,16 @@ pub struct Device { quantum: u32, min_quantum: u32, max_quantum: u32, - device_type: DeviceType, + class_type: ClassType, object_id: String, device_id: String, role: Role, + icon_name: String, } impl Device { - pub(crate) fn device_type(&self) -> DeviceType { - self.device_type + pub(crate) fn class_type(&self) -> ClassType { + self.class_type } fn sink_default() -> Self { Self { @@ -66,9 +68,9 @@ impl Device { node_name: "sink_default".to_owned(), nick_name: "sink_default".to_owned(), description: "default_sink".to_owned(), - direction: DeviceDirection::Input, + direction: DeviceDirection::Duplex, channels: 2, - device_type: DeviceType::DefaultSink, + class_type: ClassType::DefaultSink, role: Role::Sink, ..Default::default() } @@ -81,7 +83,7 @@ impl Device { description: "default_input".to_owned(), direction: DeviceDirection::Input, channels: 2, - device_type: DeviceType::DefaultInput, + class_type: ClassType::DefaultInput, role: Role::Source, ..Default::default() } @@ -94,11 +96,20 @@ impl Device { description: "default_output".to_owned(), direction: DeviceDirection::Output, channels: 2, - device_type: DeviceType::DefaultOutput, + class_type: ClassType::DefaultOutput, role: Role::Source, ..Default::default() } } + + fn device_type(&self) -> crate::DeviceType { + match self.icon_name.as_str() { + "audio-headphones" => crate::DeviceType::Headphones, + "audio-input-microphone" => crate::DeviceType::Microphone, + _ => crate::DeviceType::Unknown, + } + } + pub(crate) fn pw_properties( &self, direction: DeviceDirection, @@ -116,11 +127,10 @@ impl Device { }, _ => unreachable!(), }; - dbg!(&self); if matches!(self.role, Role::Sink) { properties.insert(*pw::keys::STREAM_CAPTURE_SINK, "true"); } - if matches!(self.device_type, DeviceType::Node) { + if matches!(self.class_type, ClassType::Node) { properties.insert(*pw::keys::TARGET_OBJECT, self.device_id.to_owned()); } properties @@ -138,10 +148,10 @@ impl DeviceTrait for Device { )) } - // TODO: device type fn description(&self) -> Result { Ok(crate::DeviceDescriptionBuilder::new(&self.nick_name) .direction(self.direction()) + .device_type(self.device_type()) .build()) } @@ -163,7 +173,7 @@ impl DeviceTrait for Device { &self, ) -> Result { if !self.supports_input() { - return Err(crate::SupportedStreamConfigsError::DeviceNotAvailable); + return Ok(vec![].into_iter()); } Ok(SUPPORTED_FORMATS .iter() @@ -184,7 +194,7 @@ impl DeviceTrait for Device { &self, ) -> Result { if !self.supports_output() { - return Err(crate::SupportedStreamConfigsError::DeviceNotAvailable); + return Ok(vec![].into_iter()); } Ok(SUPPORTED_FORMATS .iter() @@ -320,24 +330,22 @@ impl DeviceTrait for Device { .spawn(move || { let properties = device.pw_properties(DeviceDirection::Output); - let StreamData { + let Ok(StreamData { mainloop, listener, stream, context, - } = match super::stream::connect_output( + }) = super::stream::connect_output( &config, properties, sample_format, data_callback, error_callback, timeout, - ) { - Ok(data) => data, - Err(_) => { - let _ = pw_init_tx.send(false); - return; - } + ) + else { + let _ = pw_init_tx.send(false); + return; }; let _ = pw_init_tx.send(true); @@ -602,6 +610,10 @@ fn init_roundtrip() -> Option> { .get("clock.quantum-limit") .and_then(|channels| channels.parse().ok()) .unwrap_or(0); + let icon_name = props + .get("device.icon_name") + .unwrap_or("default") + .to_owned(); let device = Device { id, @@ -612,6 +624,7 @@ fn init_roundtrip() -> Option> { role, channels, limit_quantum, + icon_name, object_id: object_id.to_owned(), device_id: device_id.to_owned(), ..Default::default() diff --git a/src/host/pipewire/mod.rs b/src/host/pipewire/mod.rs index 02e493db2..af7c402a0 100644 --- a/src/host/pipewire/mod.rs +++ b/src/host/pipewire/mod.rs @@ -1,4 +1,4 @@ -use device::{init_devices, Device, DeviceType, Devices}; +use device::{init_devices, ClassType, Device, Devices}; use crate::traits::HostTrait; mod device; @@ -26,13 +26,13 @@ impl HostTrait for Host { fn default_input_device(&self) -> Option { self.0 .iter() - .find(|device| matches!(device.device_type(), DeviceType::DefaultSink)) + .find(|device| matches!(device.class_type(), ClassType::DefaultSink)) .cloned() } fn default_output_device(&self) -> Option { self.0 .iter() - .find(|device| matches!(device.device_type(), DeviceType::DefaultOutput)) + .find(|device| matches!(device.class_type(), ClassType::DefaultOutput)) .cloned() } } From 82ef7000d4c89438f48df7dceb295baa6dbe99ff Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Tue, 6 Jan 2026 23:32:57 +0900 Subject: [PATCH 12/27] chore: modify the _typos.toml --- _typos.toml | 3 --- 1 file changed, 3 deletions(-) diff --git a/_typos.toml b/_typos.toml index 35e0a4db7..02d3ec106 100644 --- a/_typos.toml +++ b/_typos.toml @@ -1,5 +1,2 @@ -[files] -extend-exclude = ["**/*.cmake", "**/*.json", "assets_for_test/*"] - [default.extend-words] datas = "datas" From 1a3fa408b9b50ec88b7892443a697ba075b6ef69 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Tue, 6 Jan 2026 23:40:45 +0900 Subject: [PATCH 13/27] chore: reset default backend to alsa --- examples/beep.rs | 37 ++++++++++++++++++++++++++++++++++++- examples/record_wav.rs | 30 +++++++++++++++++++++++++++--- src/platform/mod.rs | 2 +- 3 files changed, 64 insertions(+), 5 deletions(-) diff --git a/examples/beep.rs b/examples/beep.rs index f5e32c1f5..3b9b88a83 100644 --- a/examples/beep.rs +++ b/examples/beep.rs @@ -37,6 +37,19 @@ struct Opt { #[arg(short, long)] #[allow(dead_code)] jack: bool, + /// Use the pipewire host + #[cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "pipewire" + ))] + #[arg(short, long)] + #[allow(dead_code)] + pipewire: bool, } fn main() -> anyhow::Result<()> { @@ -64,6 +77,28 @@ fn main() -> anyhow::Result<()> { } else { cpal::default_host() }; + // Conditionally compile with jack if the feature is specified. + #[cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "pipewire" + ))] + // Manually check for flags. Can be passed through cargo with -- e.g. + // cargo run --release --example beep --features jack -- --jack + let host = if opt.pipewire { + cpal::host_from_id(cpal::available_hosts() + .into_iter() + .find(|id| *id == cpal::HostId::PipeWire) + .expect( + "make sure --features pipewire is specified. only works on OSes where jack is available", + )).expect("jack host unavailable") + } else { + cpal::default_host() + }; #[cfg(any( not(any( @@ -72,7 +107,7 @@ fn main() -> anyhow::Result<()> { target_os = "freebsd", target_os = "netbsd" )), - not(feature = "jack") + not(any(feature = "jack", feature = "pipewire")) ))] let host = cpal::default_host(); diff --git a/examples/record_wav.rs b/examples/record_wav.rs index 1fea0ee89..7b2ace2cd 100644 --- a/examples/record_wav.rs +++ b/examples/record_wav.rs @@ -33,6 +33,19 @@ struct Opt { #[arg(short, long)] #[allow(dead_code)] jack: bool, + /// Use the pipewire host + #[cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "pipewire" + ))] + #[arg(short, long)] + #[allow(dead_code)] + pipewire: bool, } fn main() -> Result<(), anyhow::Error> { @@ -49,7 +62,7 @@ fn main() -> Result<(), anyhow::Error> { feature = "jack" ))] // Manually check for flags. Can be passed through cargo with -- e.g. - // cargo run --release --example beep --features jack -- --jack + // cargo run --release --example record_wav --features jack -- --jack let host = if opt.jack { cpal::host_from_id(cpal::available_hosts() .into_iter() @@ -60,7 +73,18 @@ fn main() -> Result<(), anyhow::Error> { } else { cpal::default_host() }; - + // Manually check for flags. Can be passed through cargo with -- e.g. + // cargo run --release --example record_wav --features pipewire -- -- pipewire + let host = if opt.pipewire { + cpal::host_from_id(cpal::available_hosts() + .into_iter() + .find(|id| *id == cpal::HostId::PipeWire) + .expect( + "make sure --features pipewire is specified. only works on OSes where jack is available", + )).expect("jack host unavailable") + } else { + cpal::default_host() + }; #[cfg(any( not(any( target_os = "linux", @@ -68,7 +92,7 @@ fn main() -> Result<(), anyhow::Error> { target_os = "freebsd", target_os = "netbsd" )), - not(feature = "jack") + not(any(feature = "jack", feature = "pipewire")) ))] let host = cpal::default_host(); diff --git a/src/platform/mod.rs b/src/platform/mod.rs index fd57960cc..0c46eea93 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -736,7 +736,7 @@ mod platform_impl { /// The default host for the current compilation target platform. pub fn default_host() -> Host { - PipeWireHost::new() + AlsaHost::new() .expect("the default host should always be available") .into() } From fd59a2980da88b480628f53ebadaabfc1aa2c071 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Tue, 6 Jan 2026 23:42:37 +0900 Subject: [PATCH 14/27] chore: add pipewire dependence --- .github/workflows/platforms.yml | 2 +- .github/workflows/quality.yml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/platforms.yml b/.github/workflows/platforms.yml index e4e923077..98d65760d 100644 --- a/.github/workflows/platforms.yml +++ b/.github/workflows/platforms.yml @@ -32,7 +32,7 @@ env: MSRV_WASM: "1.82" MSRV_WINDOWS: "1.82" - PACKAGES_LINUX: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev + PACKAGES_LINUX: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev libpipewire-0.3-dev ANDROID_COMPILE_SDK: "30" ANDROID_BUILD_TOOLS: "30.0.3" diff --git a/.github/workflows/quality.yml b/.github/workflows/quality.yml index cd470bfd0..ba55ac99c 100644 --- a/.github/workflows/quality.yml +++ b/.github/workflows/quality.yml @@ -92,7 +92,7 @@ jobs: if: runner.os == 'Linux' uses: awalsh128/cache-apt-pkgs-action@latest with: - packages: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev + packages: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev libpipewire-0.3-dev - name: Setup ASIO SDK if: runner.os == 'Windows' @@ -128,7 +128,7 @@ jobs: - name: Cache Linux audio packages uses: awalsh128/cache-apt-pkgs-action@latest with: - packages: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev + packages: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev libpipewire-0.3-dev - name: Install Rust toolchain uses: dtolnay/rust-toolchain@nightly From 674225bdf61437feb65cd2d80a6d65d8acb141c4 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Wed, 7 Jan 2026 10:33:01 +0900 Subject: [PATCH 15/27] fix: target object should be the node name --- src/host/pipewire/device.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index 889befcde..1411c2850 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -131,7 +131,7 @@ impl Device { properties.insert(*pw::keys::STREAM_CAPTURE_SINK, "true"); } if matches!(self.class_type, ClassType::Node) { - properties.insert(*pw::keys::TARGET_OBJECT, self.device_id.to_owned()); + properties.insert(*pw::keys::TARGET_OBJECT, self.node_name().to_owned()); } properties } From a6cc7d1c7f2d5faacc1351bd5f15122643c1fc66 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Wed, 7 Jan 2026 10:37:58 +0900 Subject: [PATCH 16/27] chore: use object_serial instead --- src/host/pipewire/device.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index 1411c2850..9d57aa70d 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -56,6 +56,7 @@ pub struct Device { device_id: String, role: Role, icon_name: String, + object_serial: u32, } impl Device { @@ -131,7 +132,7 @@ impl Device { properties.insert(*pw::keys::STREAM_CAPTURE_SINK, "true"); } if matches!(self.class_type, ClassType::Node) { - properties.insert(*pw::keys::TARGET_OBJECT, self.node_name().to_owned()); + properties.insert(*pw::keys::TARGET_OBJECT, self.object_serial.to_string()); } properties } @@ -595,6 +596,12 @@ fn init_roundtrip() -> Option> { let Some(device_id) = props.get("device.id") else { return; }; + let Some(object_serial) = props + .get("object.serial") + .and_then(|serial| serial.parse().ok()) + else { + return; + }; let id = info.id(); let node_name = props.get("node.name").unwrap_or("unknown").to_owned(); let nick_name = props.get("node.nick").unwrap_or("unknown").to_owned(); @@ -627,6 +634,7 @@ fn init_roundtrip() -> Option> { icon_name, object_id: object_id.to_owned(), device_id: device_id.to_owned(), + object_serial, ..Default::default() }; devices.borrow_mut().push(device); From ca41b12851783ffddc0bc00b8ef2097395570e97 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Wed, 7 Jan 2026 10:45:17 +0900 Subject: [PATCH 17/27] chore: capture should be output --- src/host/pipewire/device.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index 9d57aa70d..0e73a7fe9 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -585,7 +585,7 @@ fn init_roundtrip() -> Option> { let direction = match (group, role) { ("playback", Role::Sink) => DeviceDirection::Duplex, ("playback", Role::Source) => DeviceDirection::Input, - ("capture", _) => DeviceDirection::Input, + ("capture", _) => DeviceDirection::Output, _ => { return; } From 8695b5d6f390eb3903cf8b97adda5f1db8a01c95 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Thu, 8 Jan 2026 18:47:28 +0900 Subject: [PATCH 18/27] chore: remove unused timeout --- src/host/pipewire/device.rs | 2 -- src/host/pipewire/stream.rs | 7 +------ 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index 0e73a7fe9..3b795cbb0 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -279,7 +279,6 @@ impl DeviceTrait for Device { sample_format, data_callback, error_callback, - timeout, ) else { let _ = pw_init_tx.send(false); @@ -342,7 +341,6 @@ impl DeviceTrait for Device { sample_format, data_callback, error_callback, - timeout, ) else { let _ = pw_init_tx.send(false); diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index 18af55c5d..bcccefabd 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -1,7 +1,4 @@ -use std::{ - thread::JoinHandle, - time::{Duration, Instant}, -}; +use std::{thread::JoinHandle, time::Instant}; use crate::{ traits::StreamTrait, BackendSpecificError, InputCallbackInfo, OutputCallbackInfo, SampleFormat, @@ -185,7 +182,6 @@ pub fn connect_output( sample_format: SampleFormat, data_callback: D, error_callback: E, - _timeout: Option, ) -> Result, pw::Error> where D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, @@ -306,7 +302,6 @@ pub fn connect_input( sample_format: SampleFormat, data_callback: D, error_callback: E, - _timeout: Option, ) -> Result, pw::Error> where D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, From 6cea2ad15b9f354037ae7e6430f231450bd887f7 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Sat, 10 Jan 2026 07:22:13 +0900 Subject: [PATCH 19/27] fix: test pipewire with rust 1.85 --- .github/workflows/platforms.yml | 17 +++++++++++------ Cargo.toml | 2 +- examples/record_wav.rs | 11 +++++++++++ 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/.github/workflows/platforms.yml b/.github/workflows/platforms.yml index 98d65760d..6fd13a0d1 100644 --- a/.github/workflows/platforms.yml +++ b/.github/workflows/platforms.yml @@ -24,11 +24,11 @@ on: env: # MSRV varies by backend due to platform-specific dependencies - MSRV_AAUDIO: "1.82" - MSRV_ALSA: "1.82" - MSRV_COREAUDIO: "1.80" + MSRV_AAUDIO: "1.85" + MSRV_ALSA: "1.85" + MSRV_COREAUDIO: "1.82" MSRV_JACK: "1.82" - MSRV_WASIP1: "1.78" + MSRV_WASIP1: "1.82" MSRV_WASM: "1.82" MSRV_WINDOWS: "1.82" @@ -65,13 +65,13 @@ jobs: platform-msrv: ${{ env.MSRV_ALSA }} jack-msrv: ${{ env.MSRV_JACK }} - - name: Install Rust MSRV (${{ env.MSRV_ALSA }}) + - name: Install Rust MSRV (${{ env.MSRV_PIPEWIRE }}) uses: dtolnay/rust-toolchain@master with: toolchain: ${{ env.MSRV_ALSA }} - name: Install Rust MSRV (${{ steps.msrv.outputs.all-features }}) - if: steps.msrv.outputs.all-features != env.MSRV_ALSA + if: steps.msrv.outputs.all-features != env.MSRV_PIPEWIRE uses: dtolnay/rust-toolchain@master with: toolchain: ${{ steps.msrv.outputs.all-features }} @@ -107,6 +107,11 @@ jobs: steps: - uses: actions/checkout@v5 + - name: Cache Linux audio packages + uses: awalsh128/cache-apt-pkgs-action@latest + with: + packages: ${{ env.PACKAGES_LINUX }} + - name: Determine MSRV for all-features id: msrv uses: ./.github/actions/determine-msrv diff --git a/Cargo.toml b/Cargo.toml index 258056511..9b7f5bcce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ audioworklet = [ # Platform: All platforms custom = [] -default = ["pipewire"] +default = [] [dependencies] dasp_sample = "0.11" diff --git a/examples/record_wav.rs b/examples/record_wav.rs index 7b2ace2cd..9a1ef2374 100644 --- a/examples/record_wav.rs +++ b/examples/record_wav.rs @@ -73,6 +73,17 @@ fn main() -> Result<(), anyhow::Error> { } else { cpal::default_host() }; + + // Conditionally compile with jack if the feature is specified. + #[cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "pipewire" + ))] // Manually check for flags. Can be passed through cargo with -- e.g. // cargo run --release --example record_wav --features pipewire -- -- pipewire let host = if opt.pipewire { From a49e3bf8bf5354c12bceaf8c4b153f86a3a5edf9 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Sat, 10 Jan 2026 08:19:14 +0900 Subject: [PATCH 20/27] chore: keep rust version --- .github/workflows/platforms.yml | 4 ++-- Cargo.toml | 2 +- src/host/pipewire/device.rs | 12 ++---------- 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/.github/workflows/platforms.yml b/.github/workflows/platforms.yml index 6fd13a0d1..80f9afa3f 100644 --- a/.github/workflows/platforms.yml +++ b/.github/workflows/platforms.yml @@ -26,9 +26,9 @@ env: # MSRV varies by backend due to platform-specific dependencies MSRV_AAUDIO: "1.85" MSRV_ALSA: "1.85" - MSRV_COREAUDIO: "1.82" + MSRV_COREAUDIO: "1.80" MSRV_JACK: "1.82" - MSRV_WASIP1: "1.82" + MSRV_WASIP1: "1.78" MSRV_WASM: "1.82" MSRV_WINDOWS: "1.82" diff --git a/Cargo.toml b/Cargo.toml index 9b7f5bcce..df3e19eda 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ documentation = "https://docs.rs/cpal" license = "Apache-2.0" keywords = ["audio", "sound"] edition = "2021" -rust-version = "1.82" +rust-version = "1.78" [features] # ASIO backend for Windows diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index 3b795cbb0..c7a89124a 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -294,11 +294,7 @@ impl DeviceTrait for Device { drop(context); }) .unwrap(); - if pw_init_rv - .recv_timeout(wait_timeout) - .ok() - .is_none_or(|re| !re) - { + if pw_init_rv.recv_timeout(wait_timeout).unwrap_or(false) { return Err(crate::BuildStreamError::DeviceNotAvailable); }; Ok(Stream { @@ -357,11 +353,7 @@ impl DeviceTrait for Device { drop(context); }) .unwrap(); - if pw_init_rv - .recv_timeout(wait_timeout) - .ok() - .is_none_or(|re| !re) - { + if pw_init_rv.recv_timeout(wait_timeout).unwrap_or(false) { return Err(crate::BuildStreamError::DeviceNotAvailable); }; Ok(Stream { From f457c69d389d665747c4e8c53f1883ba48410a85 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Sun, 11 Jan 2026 01:01:51 +0900 Subject: [PATCH 21/27] fix: ci --- .github/workflows/platforms.yml | 5 ----- Cross.toml | 5 ++--- Dockerfile | 3 ++- 3 files changed, 4 insertions(+), 9 deletions(-) diff --git a/.github/workflows/platforms.yml b/.github/workflows/platforms.yml index 80f9afa3f..28bcae90c 100644 --- a/.github/workflows/platforms.yml +++ b/.github/workflows/platforms.yml @@ -107,11 +107,6 @@ jobs: steps: - uses: actions/checkout@v5 - - name: Cache Linux audio packages - uses: awalsh128/cache-apt-pkgs-action@latest - with: - packages: ${{ env.PACKAGES_LINUX }} - - name: Determine MSRV for all-features id: msrv uses: ./.github/actions/determine-msrv diff --git a/Cross.toml b/Cross.toml index 6d0ad81d1..57b252d5c 100644 --- a/Cross.toml +++ b/Cross.toml @@ -1,7 +1,6 @@ [target.armv7-unknown-linux-gnueabihf] dockerfile = "Dockerfile" +build-args = { CROSS_BASE_IMAGE = "ubuntu:24.04" } [target.armv7-unknown-linux-gnueabihf.env] -passthrough = [ - "RUSTFLAGS", -] +passthrough = ["RUSTFLAGS"] diff --git a/Dockerfile b/Dockerfile index 8e56a2efd..5ecc260d8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,4 +7,5 @@ ENV PKG_CONFIG_PATH=/usr/lib/arm-linux-gnueabihf/pkgconfig/ RUN dpkg --add-architecture armhf && \ apt-get update && \ apt-get install libasound2-dev:armhf -y && \ - apt-get install libjack-jackd2-dev:armhf libjack-jackd2-0:armhf -y \ + apt-get install libjack-jackd2-dev:armhf libjack-jackd2-0:armhf -y && \ + apt-get install libpipewire-0.3-dev:armhf -y \ From 424d78f089c182d068e313e2d24cf51f6645c5b1 Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Sun, 11 Jan 2026 18:49:24 +0900 Subject: [PATCH 22/27] fix: ci problem because cross-rs is based on ubuntu20.04, so it does not contains pipewire --- .github/workflows/platforms.yml | 8 ++++---- Cross.toml | 1 - Dockerfile | 4 ++-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/.github/workflows/platforms.yml b/.github/workflows/platforms.yml index 28bcae90c..8623f422b 100644 --- a/.github/workflows/platforms.yml +++ b/.github/workflows/platforms.yml @@ -94,10 +94,10 @@ jobs: run: cargo +${{ env.MSRV_ALSA }} check --examples --no-default-features --workspace --verbose - name: Run tests (all features) - run: cargo +${{ steps.msrv.outputs.all-features }} test --all-features --workspace --verbose + run: cargo +${{ steps.msrv.outputs.all-features }} test --features=jack --workspace --verbose - name: Check examples (all features) - run: cargo +${{ steps.msrv.outputs.all-features }} check --examples --all-features --workspace --verbose + run: cargo +${{ steps.msrv.outputs.all-features }} check --examples --features=jack --workspace --verbose # Linux ARMv7 (cross-compilation) linux-armv7: @@ -150,10 +150,10 @@ jobs: run: cross +${{ env.MSRV_ALSA }} test --no-default-features --workspace --verbose --target ${{ env.TARGET }} - name: Run tests (all features) - run: cross +${{ steps.msrv.outputs.all-features }} test --all-features --workspace --verbose --target ${{ env.TARGET }} + run: cross +${{ steps.msrv.outputs.all-features }} test --features=jack --workspace --verbose --target ${{ env.TARGET }} - name: Check examples (all features) - run: cross +${{ steps.msrv.outputs.all-features }} test --all-features --workspace --verbose --target ${{ env.TARGET }} + run: cross +${{ steps.msrv.outputs.all-features }} test --features=jack --workspace --verbose --target ${{ env.TARGET }} # Windows (x86_64 and i686) windows: diff --git a/Cross.toml b/Cross.toml index 57b252d5c..09b92e9ea 100644 --- a/Cross.toml +++ b/Cross.toml @@ -1,6 +1,5 @@ [target.armv7-unknown-linux-gnueabihf] dockerfile = "Dockerfile" -build-args = { CROSS_BASE_IMAGE = "ubuntu:24.04" } [target.armv7-unknown-linux-gnueabihf.env] passthrough = ["RUSTFLAGS"] diff --git a/Dockerfile b/Dockerfile index 5ecc260d8..5405f1417 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,5 +7,5 @@ ENV PKG_CONFIG_PATH=/usr/lib/arm-linux-gnueabihf/pkgconfig/ RUN dpkg --add-architecture armhf && \ apt-get update && \ apt-get install libasound2-dev:armhf -y && \ - apt-get install libjack-jackd2-dev:armhf libjack-jackd2-0:armhf -y && \ - apt-get install libpipewire-0.3-dev:armhf -y \ + apt-get install libjack-jackd2-dev:armhf libjack-jackd2-0:armhf -y +# TODO: now the cross-rs is based on ubuntu:20.04, so it does not contain pipewire-0.3-dev From 58815875553b871373f1de4c7ee1ea9965e5acbb Mon Sep 17 00:00:00 2001 From: ShootingStarDragons Date: Sun, 11 Jan 2026 18:56:03 +0900 Subject: [PATCH 23/27] fix: remove the unexisted variable --- .github/workflows/platforms.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/platforms.yml b/.github/workflows/platforms.yml index 8623f422b..eeed8a4e1 100644 --- a/.github/workflows/platforms.yml +++ b/.github/workflows/platforms.yml @@ -65,13 +65,13 @@ jobs: platform-msrv: ${{ env.MSRV_ALSA }} jack-msrv: ${{ env.MSRV_JACK }} - - name: Install Rust MSRV (${{ env.MSRV_PIPEWIRE }}) + - name: Install Rust MSRV (${{ env.MSRV_ALSA }}) uses: dtolnay/rust-toolchain@master with: toolchain: ${{ env.MSRV_ALSA }} - name: Install Rust MSRV (${{ steps.msrv.outputs.all-features }}) - if: steps.msrv.outputs.all-features != env.MSRV_PIPEWIRE + if: steps.msrv.outputs.all-features != env.MSRV_ALSA uses: dtolnay/rust-toolchain@master with: toolchain: ${{ steps.msrv.outputs.all-features }} From 81c7e8eaee25ce60d779695df94437db27108c7e Mon Sep 17 00:00:00 2001 From: loxoron218 Date: Tue, 20 Jan 2026 23:05:42 +0100 Subject: [PATCH 24/27] feat(pipewire): add support for I64, U64, and F32 sample formats --- src/host/pipewire/stream.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs index bcccefabd..b99aeede5 100644 --- a/src/host/pipewire/stream.rs +++ b/src/host/pipewire/stream.rs @@ -46,6 +46,9 @@ pub(crate) const SUPPORTED_FORMATS: &[SampleFormat] = &[ SampleFormat::U24, SampleFormat::I32, SampleFormat::U32, + SampleFormat::I64, + SampleFormat::U64, + SampleFormat::F32, SampleFormat::F64, ]; From cbba3fe5dd559e640f35d11f657f4d0ea5011120 Mon Sep 17 00:00:00 2001 From: loxoron218 Date: Wed, 21 Jan 2026 01:54:32 +0100 Subject: [PATCH 25/27] feat(pipewire): support multiple sample rates in device configuration --- src/host/pipewire/device.rs | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index c7a89124a..df80c20e6 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -176,17 +176,26 @@ impl DeviceTrait for Device { if !self.supports_input() { return Ok(vec![].into_iter()); } - Ok(SUPPORTED_FORMATS + let rates = if self.allow_rates.is_empty() { + vec![self.rate] + } else { + self.allow_rates.clone() + }; + Ok(rates .iter() - .map(|sample_format| SupportedStreamConfigRange { + .flat_map(|&rate| { + SUPPORTED_FORMATS + .iter() + .map(move |sample_format| SupportedStreamConfigRange { channels: self.channels, - min_sample_rate: self.rate, - max_sample_rate: self.rate, + min_sample_rate: rate, + max_sample_rate: rate, buffer_size: crate::SupportedBufferSize::Range { min: self.min_quantum, max: self.max_quantum, }, sample_format: *sample_format, + }) }) .collect::>() .into_iter()) @@ -197,17 +206,26 @@ impl DeviceTrait for Device { if !self.supports_output() { return Ok(vec![].into_iter()); } - Ok(SUPPORTED_FORMATS + let rates = if self.allow_rates.is_empty() { + vec![self.rate] + } else { + self.allow_rates.clone() + }; + Ok(rates .iter() - .map(|sample_format| SupportedStreamConfigRange { + .flat_map(|&rate| { + SUPPORTED_FORMATS + .iter() + .map(move |sample_format| SupportedStreamConfigRange { channels: self.channels, - min_sample_rate: self.rate, - max_sample_rate: self.rate, + min_sample_rate: rate, + max_sample_rate: rate, buffer_size: crate::SupportedBufferSize::Range { min: self.min_quantum, max: self.max_quantum, }, sample_format: *sample_format, + }) }) .collect::>() .into_iter()) From 7c0be369a500c9e9c70b7de2033190e4189f71d3 Mon Sep 17 00:00:00 2001 From: loxoron218 Date: Wed, 21 Jan 2026 12:25:09 +0100 Subject: [PATCH 26/27] fix(pipewire): improve error handling for stream initialization timeout Replace boolean check with match expression to properly handle both timeout expiration and channel receive errors, returning a more accurate StreamConfigNotSupported error instead of DeviceNotAvailable. --- src/host/pipewire/device.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index df80c20e6..efac1c1a4 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -312,13 +312,13 @@ impl DeviceTrait for Device { drop(context); }) .unwrap(); - if pw_init_rv.recv_timeout(wait_timeout).unwrap_or(false) { - return Err(crate::BuildStreamError::DeviceNotAvailable); - }; - Ok(Stream { + match pw_init_rv.recv_timeout(wait_timeout) { + Ok(true) => Ok(Stream { handle, controller: pw_play_tx, - }) + }), + Ok(false) | Err(_) => Err(crate::BuildStreamError::StreamConfigNotSupported), + } } fn build_output_stream_raw( @@ -371,15 +371,15 @@ impl DeviceTrait for Device { drop(context); }) .unwrap(); - if pw_init_rv.recv_timeout(wait_timeout).unwrap_or(false) { - return Err(crate::BuildStreamError::DeviceNotAvailable); - }; - Ok(Stream { + match pw_init_rv.recv_timeout(wait_timeout) { + Ok(true) => Ok(Stream { handle, controller: pw_play_tx, - }) - } + }), + Ok(false) | Err(_) => Err(crate::BuildStreamError::StreamConfigNotSupported), } + } + } impl Device { pub fn channels(&self) -> u16 { From d6329aaec1a27fa4431cad725fff68266373675c Mon Sep 17 00:00:00 2001 From: loxoron218 Date: Wed, 21 Jan 2026 12:25:45 +0100 Subject: [PATCH 27/27] fix(pipewire): normalize channel list parsing and improve code formatting --- src/host/pipewire/device.rs | 42 ++++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs index efac1c1a4..6e5fe229c 100644 --- a/src/host/pipewire/device.rs +++ b/src/host/pipewire/device.rs @@ -187,14 +187,14 @@ impl DeviceTrait for Device { SUPPORTED_FORMATS .iter() .map(move |sample_format| SupportedStreamConfigRange { - channels: self.channels, + channels: self.channels, min_sample_rate: rate, max_sample_rate: rate, - buffer_size: crate::SupportedBufferSize::Range { - min: self.min_quantum, - max: self.max_quantum, - }, - sample_format: *sample_format, + buffer_size: crate::SupportedBufferSize::Range { + min: self.min_quantum, + max: self.max_quantum, + }, + sample_format: *sample_format, }) }) .collect::>() @@ -217,14 +217,14 @@ impl DeviceTrait for Device { SUPPORTED_FORMATS .iter() .map(move |sample_format| SupportedStreamConfigRange { - channels: self.channels, + channels: self.channels, min_sample_rate: rate, max_sample_rate: rate, - buffer_size: crate::SupportedBufferSize::Range { - min: self.min_quantum, - max: self.max_quantum, - }, - sample_format: *sample_format, + buffer_size: crate::SupportedBufferSize::Range { + min: self.min_quantum, + max: self.max_quantum, + }, + sample_format: *sample_format, }) }) .collect::>() @@ -314,8 +314,8 @@ impl DeviceTrait for Device { .unwrap(); match pw_init_rv.recv_timeout(wait_timeout) { Ok(true) => Ok(Stream { - handle, - controller: pw_play_tx, + handle, + controller: pw_play_tx, }), Ok(false) | Err(_) => Err(crate::BuildStreamError::StreamConfigNotSupported), } @@ -373,13 +373,13 @@ impl DeviceTrait for Device { .unwrap(); match pw_init_rv.recv_timeout(wait_timeout) { Ok(true) => Ok(Stream { - handle, - controller: pw_play_tx, + handle, + controller: pw_play_tx, }), Ok(false) | Err(_) => Err(crate::BuildStreamError::StreamConfigNotSupported), -} - } } + } +} impl Device { pub fn channels(&self) -> u16 { @@ -518,7 +518,11 @@ fn init_roundtrip() -> Option> { return 0; }; let list = list.trim(); - let list: Vec<&str> = list.split(' ').collect(); + let list_normalized = list.replace(',', " "); + let list: Vec<&str> = list_normalized + .split(' ') + .filter(|s| !s.is_empty()) + .collect(); let mut allow_rates = vec![]; for rate in list { let Ok(rate) = rate.parse() else {