diff --git a/CHANGELOG.md b/CHANGELOG.md index 90a4e5e37..0a4bc1120 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,12 +9,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- `DeviceTrait::build_duplex_stream` and `build_duplex_stream_raw` for synchronized input/output. +- `duplex` module with `DuplexStreamConfig` and `DuplexCallbackInfo` types. +- **CoreAudio**: Duplex stream support with hardware-synchronized input/output. +- Example `duplex_feedback` demonstrating duplex stream usage. - `DeviceBusy` error variant for retriable device access errors (EBUSY, EAGAIN). - **ALSA**: `Debug` implementations for `Host`, `Device`, `Stream`, and internal types. - **ALSA**: Example demonstrating ALSA error suppression during enumeration. ### Changed +- **POTENTIALLY BREAKING**: `DeviceTrait` now includes `build_duplex_stream()` and `build_duplex_stream_raw()` methods. The default implementation returns `StreamConfigNotSupported`, so external implementations are compatible without changes. - Overall MSRV increased to 1.78. - **ALSA**: Update `alsa` dependency from 0.10 to 0.11. - **ALSA**: MSRV increased from 1.77 to 1.82 (required by alsa-sys 0.4.0). diff --git a/Cargo.toml b/Cargo.toml index fb47eb19a..56e635e3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -180,6 +180,9 @@ name = "record_wav" [[example]] name = "synth_tones" +[[example]] +name = "duplex_feedback" + [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] diff --git a/README.md b/README.md index 47d92c694..6355b6d96 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ This library currently supports the following: - Enumerate known supported input and output stream formats for a device. - Get the current default input and output stream formats for a device. - Build and run input and output PCM streams on a chosen device with a given stream format. +- Build and run duplex (simultaneous input/output) streams with hardware clock synchronization (macOS only, more platforms coming soon). Currently, supported hosts include: @@ -209,6 +210,7 @@ CPAL comes with several examples demonstrating various features: - `beep` - Generate a simple sine wave tone - `enumerate` - List all available audio devices and their capabilities - `feedback` - Pass input audio directly to output (microphone loopback) +- `duplex_feedback` - Hardware-synchronized duplex stream loopback (macOS only) - `record_wav` - Record audio from the default input device to a WAV file - `synth_tones` - Generate multiple tones simultaneously diff --git a/examples/duplex_feedback.rs b/examples/duplex_feedback.rs new file mode 100644 index 000000000..4ea873146 --- /dev/null +++ b/examples/duplex_feedback.rs @@ -0,0 +1,97 @@ +//! Feeds back the input stream directly into the output stream using a duplex stream. +//! +//! Unlike the `feedback.rs` example which uses separate input/output streams with a ring buffer, +//! duplex streams provide hardware-synchronized input/output without additional buffering. +//! +//! Note: Currently only supported on macOS (CoreAudio). Windows (WASAPI) and Linux (ALSA) +//! implementations are planned. + +use clap::Parser; +use cpal::duplex::DuplexStreamConfig; +use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use cpal::BufferSize; + +#[derive(Parser, Debug)] +#[command(version, about = "CPAL duplex feedback example", long_about = None)] +struct Opt { + /// The audio device to use (must support duplex operation) + #[arg(short, long, value_name = "DEVICE")] + device: Option, + + /// Number of input channels + #[arg(long, value_name = "CHANNELS", default_value_t = 2)] + input_channels: u16, + + /// Number of output channels + #[arg(long, value_name = "CHANNELS", default_value_t = 2)] + output_channels: u16, + + /// Sample rate in Hz + #[arg(short, long, value_name = "RATE", default_value_t = 48000)] + sample_rate: u32, + + /// Buffer size in frames + #[arg(short, long, value_name = "FRAMES", default_value_t = 512)] + buffer_size: u32, +} + +#[cfg(target_os = "macos")] +fn main() -> anyhow::Result<()> { + let opt = Opt::parse(); + let host = cpal::default_host(); + + // Find the device by device ID or use default + let device = if let Some(device_id_str) = opt.device { + let device_id = device_id_str.parse().expect("failed to parse device id"); + host.device_by_id(&device_id) + .expect(&format!("failed to find device with id: {}", device_id_str)) + } else { + host.default_output_device() + .expect("no default output device") + }; + + println!("Using device: \"{}\"", device.description()?.name()); + + // Create duplex stream configuration. + let config = DuplexStreamConfig::new( + opt.input_channels, + opt.output_channels, + opt.sample_rate, + BufferSize::Fixed(opt.buffer_size), + ); + + println!("Building duplex stream with config: {config:?}"); + + let stream = device.build_duplex_stream::( + &config, + move |input, output, _info| { + output.fill(0.0); + let copy_len = input.len().min(output.len()); + output[..copy_len].copy_from_slice(&input[..copy_len]); + }, + |err| eprintln!("Stream error: {err}"), + None, + )?; + + println!("Successfully built duplex stream."); + println!( + "Input: {} channels, Output: {} channels, Sample rate: {} Hz, Buffer size: {} frames", + opt.input_channels, opt.output_channels, opt.sample_rate, opt.buffer_size + ); + + println!("Starting duplex stream..."); + stream.play()?; + + println!("Playing for 10 seconds... (speak into your microphone)"); + std::thread::sleep(std::time::Duration::from_secs(10)); + + drop(stream); + println!("Done!"); + Ok(()) +} + +#[cfg(not(target_os = "macos"))] +fn main() { + eprintln!("Duplex streams are currently only supported on macOS."); + eprintln!("Windows (WASAPI) and Linux (ALSA) support is planned."); +} diff --git a/src/duplex.rs b/src/duplex.rs new file mode 100644 index 000000000..5bb149122 --- /dev/null +++ b/src/duplex.rs @@ -0,0 +1,236 @@ +//! Duplex audio stream support with synchronized input/output. +//! +//! This module provides types for building duplex (simultaneous input/output) audio streams +//! with hardware clock synchronization. +//! +//! # Overview +//! +//! Unlike separate input and output streams which may have independent clocks, a duplex stream +//! uses a single device context for both input and output, ensuring they share the same +//! hardware clock. This is essential for applications like: +//! +//! - DAWs (Digital Audio Workstations) +//! - Real-time audio effects processing +//! - Audio measurement and analysis +//! - Any application requiring sample-accurate I/O synchronization +//! +//! # Example +//! +//! ```no_run +//! use cpal::duplex::DuplexStreamConfig; +//! use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +//! use cpal::BufferSize; +//! +//! let host = cpal::default_host(); +//! let device = host.default_output_device().expect("no device"); +//! +//! let config = DuplexStreamConfig::symmetric(2, 48000, BufferSize::Fixed(512)); +//! +//! let stream = device.build_duplex_stream::( +//! &config, +//! |input, output, info| { +//! // Passthrough: copy input to output +//! output[..input.len()].copy_from_slice(input); +//! }, +//! |err| eprintln!("Stream error: {}", err), +//! None, +//! ).expect("failed to build duplex stream"); +//! ``` + +use crate::{SampleRate, StreamInstant}; + +/// Information passed to duplex callbacks. +/// +/// This contains timing information for the current audio buffer, combining +/// both input and output timing similar to [`InputCallbackInfo`](crate::InputCallbackInfo) +/// and [`OutputCallbackInfo`](crate::OutputCallbackInfo). +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub struct DuplexCallbackInfo { + /// The instant the stream's data callback was invoked. + pub callback: StreamInstant, + + /// The instant that input data was captured from the device. + /// + /// This is calculated by subtracting the input device latency from the callback time, + /// representing when the input samples were actually captured by the hardware (e.g., by an ADC). + pub capture: StreamInstant, + + /// The predicted instant that output data will be delivered to the device for playback. + /// + /// This is calculated by adding the output device latency to the callback time, + /// representing when the output samples will actually be played by the hardware (e.g., by a DAC). + pub playback: StreamInstant, +} + +impl DuplexCallbackInfo { + /// Create a new DuplexCallbackInfo. + pub fn new(callback: StreamInstant, capture: StreamInstant, playback: StreamInstant) -> Self { + Self { + callback, + capture, + playback, + } + } +} + +/// Configuration for a duplex audio stream. +/// +/// Unlike separate input/output streams, duplex streams require matching +/// configuration for both directions since they share a single device context. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct DuplexStreamConfig { + /// Number of input channels. + pub input_channels: u16, + + /// Number of output channels. + pub output_channels: u16, + + /// Sample rate in Hz. + pub sample_rate: SampleRate, + + /// Requested buffer size in frames. + pub buffer_size: crate::BufferSize, +} + +impl DuplexStreamConfig { + /// Create a new duplex stream configuration. + /// + /// # Panics + /// + /// Panics if: + /// - `input_channels` or `output_channels` is zero + /// - `sample_rate` is zero + /// - `buffer_size` is `BufferSize::Fixed(0)` + pub fn new( + input_channels: u16, + output_channels: u16, + sample_rate: SampleRate, + buffer_size: crate::BufferSize, + ) -> Self { + assert!(input_channels > 0, "input_channels must be greater than 0"); + assert!( + output_channels > 0, + "output_channels must be greater than 0" + ); + assert!(sample_rate > 0, "sample_rate must be greater than 0"); + assert!( + !matches!(buffer_size, crate::BufferSize::Fixed(0)), + "buffer_size cannot be Fixed(0)" + ); + + Self { + input_channels, + output_channels, + sample_rate, + buffer_size, + } + } + + /// Create a symmetric configuration (same channel count for input and output). + /// + /// # Panics + /// + /// Panics if `channels` is zero or if `sample_rate` is zero. + pub fn symmetric( + channels: u16, + sample_rate: SampleRate, + buffer_size: crate::BufferSize, + ) -> Self { + Self::new(channels, channels, sample_rate, buffer_size) + } + + /// Convert to a basic StreamConfig using output channel count. + /// + /// Useful for compatibility with existing cpal APIs. + pub fn to_stream_config(&self) -> crate::StreamConfig { + crate::StreamConfig { + channels: self.output_channels, + sample_rate: self.sample_rate, + buffer_size: self.buffer_size, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_duplex_callback_info() { + let callback = StreamInstant::new(1, 0); + let capture = StreamInstant::new(0, 500_000_000); // 500ms before callback + let playback = StreamInstant::new(1, 500_000_000); // 500ms after callback + + let info = DuplexCallbackInfo::new(callback, capture, playback); + + assert_eq!(info.callback, callback); + assert_eq!(info.capture, capture); + assert_eq!(info.playback, playback); + } + + #[test] + fn test_duplex_stream_config() { + let config = DuplexStreamConfig::symmetric(2, 48000, crate::BufferSize::Fixed(512)); + assert_eq!(config.input_channels, 2); + assert_eq!(config.output_channels, 2); + assert_eq!(config.sample_rate, 48000); + + let stream_config = config.to_stream_config(); + assert_eq!(stream_config.channels, 2); + assert_eq!(stream_config.sample_rate, 48000); + } + + #[test] + fn test_duplex_stream_config_asymmetric() { + let config = DuplexStreamConfig::new(1, 8, 96000, crate::BufferSize::Default); + assert_eq!(config.input_channels, 1); + assert_eq!(config.output_channels, 8); + assert_eq!(config.sample_rate, 96000); + } + + #[test] + fn test_duplex_stream_config_to_stream_config() { + let config = DuplexStreamConfig::new(1, 2, 48000, crate::BufferSize::Fixed(256)); + let stream_config = config.to_stream_config(); + + // to_stream_config uses output_channels + assert_eq!(stream_config.channels, 2); + assert_eq!(stream_config.sample_rate, 48000); + assert_eq!(stream_config.buffer_size, crate::BufferSize::Fixed(256)); + } + + #[test] + #[should_panic(expected = "input_channels must be greater than 0")] + fn test_duplex_stream_config_zero_input_channels() { + DuplexStreamConfig::new(0, 2, 48000, crate::BufferSize::Default); + } + + #[test] + #[should_panic(expected = "output_channels must be greater than 0")] + fn test_duplex_stream_config_zero_output_channels() { + DuplexStreamConfig::new(2, 0, 48000, crate::BufferSize::Default); + } + + #[test] + #[should_panic(expected = "sample_rate must be greater than 0")] + fn test_duplex_stream_config_zero_sample_rate() { + DuplexStreamConfig::new(2, 2, 0, crate::BufferSize::Default); + } + + #[test] + #[should_panic(expected = "buffer_size cannot be Fixed(0)")] + fn test_duplex_stream_config_zero_buffer_size() { + DuplexStreamConfig::new(2, 2, 48000, crate::BufferSize::Fixed(0)); + } + + #[test] + fn test_duplex_stream_config_clone_and_eq() { + let config1 = DuplexStreamConfig::new(2, 4, 48000, crate::BufferSize::Fixed(512)); + let config2 = config1.clone(); + + assert_eq!(config1, config2); + + let config3 = DuplexStreamConfig::new(2, 4, 44100, crate::BufferSize::Fixed(512)); + assert_ne!(config1, config3); + } +} diff --git a/src/host/coreaudio/macos/device.rs b/src/host/coreaudio/macos/device.rs index 1302299f3..0d4cf810a 100644 --- a/src/host/coreaudio/macos/device.rs +++ b/src/host/coreaudio/macos/device.rs @@ -1,6 +1,7 @@ use super::OSStatus; use super::Stream; use super::{asbd_from_config, check_os_status, frames_to_duration, host_time_to_stream_instant}; +use crate::duplex::DuplexCallbackInfo; use crate::host::coreaudio::macos::loopback::LoopbackDevice; use crate::host::coreaudio::macos::StreamInner; use crate::traits::DeviceTrait; @@ -14,7 +15,8 @@ use coreaudio::audio_unit::render_callback::{self, data}; use coreaudio::audio_unit::{AudioUnit, Element, Scope}; use objc2_audio_toolbox::{ kAudioOutputUnitProperty_CurrentDevice, kAudioOutputUnitProperty_EnableIO, - kAudioUnitProperty_StreamFormat, + kAudioUnitProperty_SetRenderCallback, kAudioUnitProperty_StreamFormat, AURenderCallbackStruct, + AudioUnitRender, AudioUnitRenderActionFlags, }; use objc2_core_audio::kAudioDevicePropertyDeviceUID; use objc2_core_audio::kAudioObjectPropertyElementMain; @@ -29,7 +31,7 @@ use objc2_core_audio::{ AudioObjectPropertyScope, AudioObjectSetPropertyData, }; use objc2_core_audio_types::{ - AudioBuffer, AudioBufferList, AudioStreamBasicDescription, AudioValueRange, + AudioBuffer, AudioBufferList, AudioStreamBasicDescription, AudioTimeStamp, AudioValueRange, }; use objc2_core_foundation::CFString; use objc2_core_foundation::Type; @@ -335,6 +337,21 @@ impl DeviceTrait for Device { timeout, ) } + + fn build_duplex_stream_raw( + &self, + config: &crate::duplex::DuplexStreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + _timeout: Option, + ) -> Result + where + D: FnMut(&Data, &mut Data, &DuplexCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + Device::build_duplex_stream_raw(self, config, sample_format, data_callback, error_callback) + } } #[derive(Clone, Eq, Hash, PartialEq)] @@ -716,6 +733,14 @@ impl Device { .map(|mut configs| configs.next().is_some()) .unwrap_or(false) } + + /// Check if this device supports output (playback). + fn supports_output(&self) -> bool { + // Check if the device has output channels by trying to get its output configuration + self.supported_output_configs() + .map(|mut configs| configs.next().is_some()) + .unwrap_or(false) + } } impl fmt::Debug for Device { @@ -825,6 +850,7 @@ impl Device { audio_unit, device_id: self.audio_device_id, _loopback_device: loopback_aggregate, + duplex_callback_ptr: None, }, error_callback_for_stream, )?; @@ -928,6 +954,7 @@ impl Device { audio_unit, device_id: self.audio_device_id, _loopback_device: None, + duplex_callback_ptr: None, }, error_callback_for_stream, )?; @@ -945,6 +972,311 @@ impl Device { Ok(stream) } + + /// Build a duplex stream with synchronized input and output. + /// + /// This creates a single HAL AudioUnit with both input and output enabled, + /// ensuring they share the same hardware clock. + fn build_duplex_stream_raw( + &self, + config: &crate::duplex::DuplexStreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + ) -> Result + where + D: FnMut(&Data, &mut Data, &DuplexCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + // Validate that device supports duplex + if !self.supports_input() || !self.supports_output() { + return Err(BuildStreamError::StreamConfigNotSupported); + } + + // Potentially change the device sample rate to match the config. + set_sample_rate(self.audio_device_id, config.sample_rate)?; + + // Create HAL AudioUnit - always use HalOutput for duplex + let mut audio_unit = AudioUnit::new(coreaudio::audio_unit::IOType::HalOutput)?; + + // Enable BOTH input and output on the AudioUnit + let enable: u32 = 1; + + // Enable input on Element 1 + audio_unit.set_property( + kAudioOutputUnitProperty_EnableIO, + Scope::Input, + Element::Input, + Some(&enable), + )?; + + // Enable output on Element 0 (usually enabled by default, but be explicit) + audio_unit.set_property( + kAudioOutputUnitProperty_EnableIO, + Scope::Output, + Element::Output, + Some(&enable), + )?; + + // Set device for the unit (applies to both input and output) + audio_unit.set_property( + kAudioOutputUnitProperty_CurrentDevice, + Scope::Global, + Element::Output, + Some(&self.audio_device_id), + )?; + + // Create StreamConfig for input side + let input_stream_config = StreamConfig { + channels: config.input_channels as ChannelCount, + sample_rate: config.sample_rate, + buffer_size: config.buffer_size, + }; + + // Create StreamConfig for output side + let output_stream_config = StreamConfig { + channels: config.output_channels as ChannelCount, + sample_rate: config.sample_rate, + buffer_size: config.buffer_size, + }; + + // Configure input format (Scope::Output on Element::Input) + let input_asbd = asbd_from_config(&input_stream_config, sample_format); + audio_unit.set_property( + kAudioUnitProperty_StreamFormat, + Scope::Output, + Element::Input, + Some(&input_asbd), + )?; + + // Configure output format (Scope::Input on Element::Output) + let output_asbd = asbd_from_config(&output_stream_config, sample_format); + audio_unit.set_property( + kAudioUnitProperty_StreamFormat, + Scope::Input, + Element::Output, + Some(&output_asbd), + )?; + + // Configure buffer size if requested + if let BufferSize::Fixed(buffer_size) = &config.buffer_size { + audio_unit.set_property( + kAudioDevicePropertyBufferFrameSize, + Scope::Global, + Element::Output, + Some(buffer_size), + )?; + } + + // Get actual buffer size for pre-allocating input buffer + let buffer_size: u32 = audio_unit + .get_property( + kAudioDevicePropertyBufferFrameSize, + Scope::Global, + Element::Output, + ) + .unwrap_or(512); + + // Get callback vars for latency calculation (matching input/output pattern) + let sample_rate = config.sample_rate; + let device_buffer_frames = get_device_buffer_frame_size(&audio_unit).ok(); + + // Get the raw AudioUnit pointer for use in the callback + let raw_audio_unit = *audio_unit.as_ref(); + + // Configuration for callback + let input_channels = config.input_channels as usize; + let sample_bytes = sample_format.sample_size(); + + // Pre-allocate input buffer for the configured buffer size (in bytes) + let input_buffer_samples = buffer_size as usize * input_channels; + let input_buffer_bytes = input_buffer_samples * sample_bytes; + let mut input_buffer: Vec = vec![0u8; input_buffer_bytes]; + + // Wrap error callback in Arc for sharing between callback and disconnect handler + let error_callback = Arc::new(Mutex::new(error_callback)); + let error_callback_for_callback = error_callback.clone(); + + // Move data callback into closure + let mut data_callback = data_callback; + + // Create the duplex callback closure + // This closure owns all captured state - no Mutex needed for data_callback or input_buffer + let duplex_proc: Box = Box::new( + move |io_action_flags: NonNull, + in_time_stamp: NonNull, + _in_bus_number: u32, + in_number_frames: u32, + io_data: *mut AudioBufferList| + -> i32 { + let num_frames = in_number_frames as usize; + let input_samples = num_frames * input_channels; + let input_bytes = input_samples * sample_bytes; + + // SAFETY: in_time_stamp is valid per CoreAudio contract + let timestamp = unsafe { in_time_stamp.as_ref() }; + + // Create StreamInstant for callback_instant + let callback_instant = match host_time_to_stream_instant(timestamp.mHostTime) { + Err(err) => { + invoke_error_callback(&error_callback_for_callback, err.into()); + return 0; + } + Ok(cb) => cb, + }; + + // Calculate latency-adjusted timestamps (matching input/output pattern) + let buffer_frames = num_frames; + // Use device buffer size for latency calculation if available + let latency_frames = device_buffer_frames.unwrap_or( + // Fallback to callback buffer size if device buffer size is unknown + buffer_frames, + ); + let delay = frames_to_duration(latency_frames, sample_rate); + + // Capture time: when input was actually captured (in the past) + let capture = callback_instant + .sub(delay) + .expect("`capture` occurs before origin of `StreamInstant`"); + + // Playback time: when output will actually play (in the future) + let playback = callback_instant + .add(delay) + .expect("`playback` occurs beyond representation supported by `StreamInstant`"); + + // Pull input from Element 1 using AudioUnitRender + // We use the pre-allocated input_buffer + unsafe { + // Set up AudioBufferList pointing to our input buffer + let mut input_buffer_list = AudioBufferList { + mNumberBuffers: 1, + mBuffers: [AudioBuffer { + mNumberChannels: input_channels as u32, + mDataByteSize: input_bytes as u32, + mData: input_buffer.as_mut_ptr() as *mut std::ffi::c_void, + }], + }; + + let status = AudioUnitRender( + raw_audio_unit, + io_action_flags.as_ptr(), + in_time_stamp, + 1, // Element 1 = input + in_number_frames, + NonNull::new_unchecked(&mut input_buffer_list), + ); + + if status != 0 { + // Report error but continue with silence for graceful degradation + invoke_error_callback( + &error_callback_for_callback, + StreamError::BackendSpecific { + err: BackendSpecificError { + description: format!( + "AudioUnitRender failed for input: OSStatus {}", + status + ), + }, + }, + ); + input_buffer[..input_bytes].fill(0); + } + } + + // Get output buffer from CoreAudio + if io_data.is_null() { + return 0; + } + + // Create Data wrappers for input and output + let input_data = unsafe { + Data::from_parts( + input_buffer.as_mut_ptr() as *mut (), + input_samples, + sample_format, + ) + }; + + let mut output_data = unsafe { + let buffer_list = &mut *io_data; + if buffer_list.mNumberBuffers == 0 { + return 0; + } + let buffer = &mut buffer_list.mBuffers[0]; + if buffer.mData.is_null() { + return 0; + } + let output_samples = buffer.mDataByteSize as usize / sample_bytes; + Data::from_parts(buffer.mData as *mut (), output_samples, sample_format) + }; + + // Create callback info with latency-adjusted times + let callback_info = DuplexCallbackInfo::new(callback_instant, capture, playback); + + // Call user callback with input and output Data + data_callback(&input_data, &mut output_data, &callback_info); + + 0 // noErr + }, + ); + + // Box the wrapper and get raw pointer for CoreAudio + let wrapper = Box::new(DuplexProcWrapper { + callback: duplex_proc, + }); + let wrapper_ptr = Box::into_raw(wrapper); + + // Set up the render callback + let render_callback = AURenderCallbackStruct { + inputProc: Some(duplex_input_proc), + inputProcRefCon: wrapper_ptr as *mut std::ffi::c_void, + }; + + audio_unit.set_property( + kAudioUnitProperty_SetRenderCallback, + Scope::Global, + Element::Output, + Some(&render_callback), + )?; + + // Create the stream inner, storing the callback pointer for cleanup + let inner = StreamInner { + playing: true, + audio_unit, + device_id: self.audio_device_id, + _loopback_device: None, + duplex_callback_ptr: Some(wrapper_ptr), + }; + + // Create error callback for stream - either dummy or real based on device type + // For duplex, check both input and output default device status + let error_callback_for_stream: super::ErrorCallback = + if is_default_input_device(self) || is_default_output_device(self) { + Box::new(|_: StreamError| {}) + } else { + let error_callback_clone = error_callback.clone(); + Box::new(move |err: StreamError| { + invoke_error_callback(&error_callback_clone, err); + }) + }; + + // Create the duplex stream + let stream = Stream::new(inner, error_callback_for_stream)?; + + // Start the audio unit + stream + .inner + .lock() + .map_err(|_| BuildStreamError::BackendSpecific { + err: BackendSpecificError { + description: "Failed to lock duplex stream".to_string(), + }, + })? + .audio_unit + .start()?; + + Ok(stream) + } } /// Configure stream format and buffer size for CoreAudio stream. @@ -1017,3 +1349,64 @@ fn get_device_buffer_frame_size(audio_unit: &AudioUnit) -> Result, + NonNull, + u32, // bus_number + u32, // num_frames + *mut AudioBufferList, +) -> i32; + +/// Wrapper for the boxed duplex callback closure. +/// +/// This struct is allocated on the heap and its pointer is passed to CoreAudio +/// as the refcon. The extern "C" callback function casts the refcon back to +/// this type and calls the closure. +pub(crate) struct DuplexProcWrapper { + callback: Box, +} + +// SAFETY: DuplexProcWrapper is Send because: +// 1. The boxed closure captures only Send types (the DuplexCallback trait requires Send) +// 2. The raw pointer stored in StreamInner is only accessed: +// - During Drop, after stopping the audio unit (callback no longer running) +// 3. CoreAudio guarantees single-threaded callback invocation +unsafe impl Send for DuplexProcWrapper {} + +/// CoreAudio render callback for duplex audio. +/// +/// This is a thin wrapper that casts the refcon back to our DuplexProcWrapper +/// and calls the inner closure. The closure owns all the callback state via +/// move semantics, so no Mutex is needed. +/// +/// # Safety +/// This is an unsafe extern "C-unwind" callback called by CoreAudio. The refcon +/// must be a valid pointer to a DuplexProcWrapper. +extern "C-unwind" fn duplex_input_proc( + in_ref_con: NonNull, + io_action_flags: NonNull, + in_time_stamp: NonNull, + in_bus_number: u32, + in_number_frames: u32, + io_data: *mut AudioBufferList, +) -> i32 { + let wrapper = unsafe { in_ref_con.cast::().as_mut() }; + (wrapper.callback)( + io_action_flags, + in_time_stamp, + in_bus_number, + in_number_frames, + io_data, + ) +} diff --git a/src/host/coreaudio/macos/mod.rs b/src/host/coreaudio/macos/mod.rs index a7a025166..b88c4a879 100644 --- a/src/host/coreaudio/macos/mod.rs +++ b/src/host/coreaudio/macos/mod.rs @@ -179,15 +179,29 @@ struct StreamInner { #[allow(dead_code)] device_id: AudioDeviceID, /// Manage the lifetime of the aggregate device used - /// for loopback recording + /// for loopback recording (used by input streams only) _loopback_device: Option, + /// Pointer to the duplex callback wrapper, needed for cleanup. + /// This is only used by duplex streams and is None for regular input/output streams. + duplex_callback_ptr: Option<*mut device::DuplexProcWrapper>, } +// SAFETY: StreamInner is Send because: +// 1. AudioUnit is Send (handles thread safety internally) +// 2. AudioDeviceID is a simple integer type +// 3. LoopbackDevice is Send (contains only Send types) +// 4. The raw pointer duplex_callback_ptr is only accessed: +// - During Drop, after stopping the audio unit (callback no longer running) +// - The pointer was created from a Box that is Send +// - CoreAudio guarantees single-threaded callback invocation +// 5. The pointer is never dereferenced while the audio unit is running +unsafe impl Send for StreamInner {} + impl StreamInner { fn play(&mut self) -> Result<(), PlayStreamError> { if !self.playing { if let Err(e) = self.audio_unit.start() { - let description = format!("{e}"); + let description = e.to_string(); let err = BackendSpecificError { description }; return Err(err.into()); } @@ -199,7 +213,7 @@ impl StreamInner { fn pause(&mut self) -> Result<(), PauseStreamError> { if self.playing { if let Err(e) = self.audio_unit.stop() { - let description = format!("{e}"); + let description = e.to_string(); let err = BackendSpecificError { description }; return Err(err.into()); } @@ -209,6 +223,25 @@ impl StreamInner { } } +impl Drop for StreamInner { + fn drop(&mut self) { + // Stop the audio unit first to ensure callback is no longer being called + let _ = self.audio_unit.stop(); + + // Clean up duplex callback if present + if let Some(ptr) = self.duplex_callback_ptr { + if !ptr.is_null() { + unsafe { + let _ = Box::from_raw(ptr); + } + } + } + + // AudioUnit's own Drop will handle uninitialize and dispose + // _loopback_device's Drop will handle aggregate device cleanup + } +} + pub struct Stream { inner: Arc>, // Manages the device disconnection listener separately to allow Stream to be Send. @@ -365,4 +398,214 @@ mod test { *sample = Sample::EQUILIBRIUM; } } + + #[test] + fn test_duplex_stream() { + use crate::duplex::DuplexStreamConfig; + use crate::BufferSize; + use std::sync::atomic::{AtomicU32, Ordering}; + use std::sync::Arc; + + // Skip in CI due to audio device permissions + if std::env::var("CI").is_ok() { + println!("Skipping test_duplex_stream in CI environment due to permissions"); + return; + } + + let host = default_host(); + let device = host.default_output_device().expect("no output device"); + + // Check if device supports both input and output + let has_input = device + .supported_input_configs() + .map(|mut configs| configs.next().is_some()) + .unwrap_or(false); + let has_output = device + .supported_output_configs() + .map(|mut configs| configs.next().is_some()) + .unwrap_or(false); + + if !has_input || !has_output { + println!("Skipping test_duplex_stream: device doesn't support both input and output"); + return; + } + + let callback_count = Arc::new(AtomicU32::new(0)); + let callback_count_clone = callback_count.clone(); + + // Get supported sample rates from output config + let output_config = device + .supported_output_configs() + .unwrap() + .next() + .unwrap() + .with_max_sample_rate(); + + let config = DuplexStreamConfig { + input_channels: 2, + output_channels: 2, + sample_rate: output_config.sample_rate(), + buffer_size: BufferSize::Default, + }; + + println!("Building duplex stream with config: {:?}", config); + + let stream = device.build_duplex_stream::( + &config, + move |input, output, _info| { + callback_count_clone.fetch_add(1, Ordering::Relaxed); + // Simple passthrough: copy input to output + let copy_len = input.len().min(output.len()); + output[..copy_len].copy_from_slice(&input[..copy_len]); + // Zero any remaining output + for sample in output[copy_len..].iter_mut() { + *sample = 0.0; + } + }, + |err| println!("Error: {err}"), + None, + ); + + match stream { + Ok(stream) => { + stream.play().unwrap(); + std::thread::sleep(std::time::Duration::from_millis(500)); + stream.pause().unwrap(); + + let count = callback_count.load(Ordering::Relaxed); + println!("Duplex callback was called {} times", count); + assert!( + count > 0, + "Duplex callback should have been called at least once" + ); + } + Err(e) => { + // This is acceptable if the device doesn't truly support duplex + println!("Could not create duplex stream: {:?}", e); + } + } + } + + /// Test that verifies duplex synchronization by checking timestamp continuity. + #[test] + fn test_duplex_synchronization_verification() { + use crate::duplex::DuplexStreamConfig; + use crate::BufferSize; + use std::sync::atomic::{AtomicU64, Ordering}; + use std::sync::{Arc, Mutex}; + + // Skip in CI due to audio device permissions + if std::env::var("CI").is_ok() { + println!("Skipping duplex sync test in CI environment"); + return; + } + + let host = default_host(); + let device = host.default_output_device().expect("no output device"); + + // Check device capabilities + let has_input = device + .supported_input_configs() + .map(|mut c| c.next().is_some()) + .unwrap_or(false); + let has_output = device + .supported_output_configs() + .map(|mut c| c.next().is_some()) + .unwrap_or(false); + + if !has_input || !has_output { + println!("Skipping: device doesn't support both input and output"); + return; + } + + /// Verification state collected during callbacks + #[derive(Debug, Default)] + struct SyncVerificationState { + callback_count: u64, + total_frames: u64, + } + + let state = Arc::new(Mutex::new(SyncVerificationState::default())); + let state_clone = state.clone(); + + // Get device config + let output_config = device + .supported_output_configs() + .unwrap() + .next() + .unwrap() + .with_max_sample_rate(); + + let sample_rate = output_config.sample_rate(); + let input_channels = 2u16; + let output_channels = 2u16; + let buffer_size = 512u32; + + let config = DuplexStreamConfig { + input_channels, + output_channels, + sample_rate, + buffer_size: BufferSize::Fixed(buffer_size), + }; + + println!("=== Duplex Synchronization Verification Test ==="); + println!("Config: {:?}", config); + + let error_count = Arc::new(AtomicU64::new(0)); + let error_count_cb = error_count.clone(); + + let stream = match device.build_duplex_stream::( + &config, + move |input, output, _info| { + let mut state = state_clone.lock().unwrap(); + state.callback_count += 1; + + // Calculate frames from output buffer size + let frames = output.len() / output_channels as usize; + state.total_frames += frames as u64; + + // Simple passthrough + let copy_len = input.len().min(output.len()); + output[..copy_len].copy_from_slice(&input[..copy_len]); + for sample in output[copy_len..].iter_mut() { + *sample = 0.0; + } + }, + move |err| { + println!("Stream error: {err}"); + error_count_cb.fetch_add(1, Ordering::Relaxed); + }, + None, + ) { + Ok(s) => s, + Err(e) => { + println!("Could not create duplex stream: {:?}", e); + return; + } + }; + + // Run for 1 second + println!("Running duplex stream for 1 second..."); + stream.play().unwrap(); + std::thread::sleep(std::time::Duration::from_secs(1)); + stream.pause().unwrap(); + + // Collect results + let state = state.lock().unwrap(); + let stream_errors = error_count.load(Ordering::Relaxed); + + println!("\n=== Verification Results ==="); + println!("Callbacks: {}", state.callback_count); + println!("Total frames: {}", state.total_frames); + println!("Stream errors: {}", stream_errors); + + // Assertions + assert!( + state.callback_count > 0, + "Callback should have been called at least once" + ); + assert_eq!(stream_errors, 0, "No stream errors should occur"); + + println!("\n=== All synchronization checks PASSED ==="); + } } diff --git a/src/lib.rs b/src/lib.rs index 2cc9d582d..0094a0eff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -192,6 +192,7 @@ use std::convert::TryInto; use std::time::Duration; pub mod device_description; +pub mod duplex; mod error; mod host; pub mod platform; diff --git a/src/platform/mod.rs b/src/platform/mod.rs index 0f62026d7..0d0d799dd 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -521,6 +521,29 @@ macro_rules! impl_platform_host { )* } } + + fn build_duplex_stream_raw( + &self, + config: &crate::duplex::DuplexStreamConfig, + sample_format: crate::SampleFormat, + data_callback: D, + error_callback: E, + timeout: Option, + ) -> Result + where + D: FnMut(&crate::Data, &mut crate::Data, &crate::duplex::DuplexCallbackInfo) + Send + 'static, + E: FnMut(crate::StreamError) + Send + 'static, + { + match self.0 { + $( + $(#[cfg($feat)])? + DeviceInner::$HostVariant(ref d) => d + .build_duplex_stream_raw(config, sample_format, data_callback, error_callback, timeout) + .map(StreamInner::$HostVariant) + .map(Stream::from), + )* + } + } } impl crate::traits::HostTrait for Host { diff --git a/src/traits.rs b/src/traits.rs index 2c3bccc28..4003bf647 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -8,6 +8,7 @@ use std::time::Duration; use crate::{ + duplex::{DuplexCallbackInfo, DuplexStreamConfig}, BuildStreamError, Data, DefaultStreamConfigError, DeviceDescription, DeviceId, DeviceIdError, DeviceNameError, DevicesError, InputCallbackInfo, InputDevices, OutputCallbackInfo, OutputDevices, PauseStreamError, PlayStreamError, SampleFormat, SizedSample, StreamConfig, @@ -94,10 +95,12 @@ pub trait DeviceTrait { type SupportedInputConfigs: Iterator; /// The iterator type yielding supported output stream formats. type SupportedOutputConfigs: Iterator; - /// The stream type created by [`build_input_stream_raw`] and [`build_output_stream_raw`]. + /// The stream type created by [`build_input_stream_raw`], [`build_output_stream_raw`], + /// and [`build_duplex_stream_raw`]. /// /// [`build_input_stream_raw`]: Self::build_input_stream_raw /// [`build_output_stream_raw`]: Self::build_output_stream_raw + /// [`build_duplex_stream_raw`]: Self::build_duplex_stream_raw type Stream: StreamTrait; /// The human-readable name of the device. @@ -139,6 +142,15 @@ pub trait DeviceTrait { .is_ok_and(|mut iter| iter.next().is_some()) } + /// True if the device supports duplex (simultaneous input and output), otherwise false. + /// + /// Duplex operation requires the device to support both input and output with a shared + /// hardware clock. This is typically true for audio interfaces but may not be available + /// on all devices (e.g., output-only speakers or input-only microphones). + fn supports_duplex(&self) -> bool { + self.supports_input() && self.supports_output() + } + /// An iterator yielding formats that are supported by the backend. /// /// Can return an error if the device is no longer valid (e.g. it has been disconnected). @@ -286,6 +298,116 @@ pub trait DeviceTrait { where D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, E: FnMut(StreamError) + Send + 'static; + + /// Create a duplex stream with synchronized input and output. + /// + /// A duplex stream uses a single audio unit with both input and output enabled, + /// ensuring they share the same hardware clock. This is essential for applications + /// requiring sample-accurate synchronization between input and output, such as: + /// + /// - DAWs (Digital Audio Workstations) + /// - Real-time audio effects processing + /// - Audio measurement and analysis + /// + /// # Parameters + /// + /// * `config` - The duplex stream configuration specifying channels, sample rate, and buffer size. + /// * `data_callback` - Called periodically with synchronized input and output buffers. + /// - `input`: Interleaved samples from the input device in format `T` + /// - `output`: Mutable buffer to fill with interleaved samples for output in format `T` + /// - `info`: Timing information including hardware timestamp + /// * `error_callback` - Called when a stream error occurs (e.g., device disconnected). + /// * `timeout` - Optional timeout for backend operations. `None` indicates blocking behavior, + /// `Some(duration)` sets a maximum wait time. Not all backends support timeouts. + /// + /// # Errors + /// + /// Returns an error if: + /// - The device doesn't support duplex operation ([`supports_duplex`](Self::supports_duplex) returns false) + /// - The requested configuration is not supported + /// - The device is no longer available + /// + /// # Example + /// + /// ```no_run + /// use cpal::duplex::DuplexStreamConfig; + /// use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; + /// use cpal::BufferSize; + /// + /// let host = cpal::default_host(); + /// let device = host.default_output_device().expect("no device"); + /// + /// let config = DuplexStreamConfig::symmetric(2, 48000, BufferSize::Fixed(512)); + /// + /// let stream = device.build_duplex_stream::( + /// &config, + /// |input, output, info| { + /// // Passthrough: copy input to output + /// output[..input.len()].copy_from_slice(input); + /// }, + /// |err| eprintln!("Stream error: {}", err), + /// None, // No timeout + /// ); + /// ``` + fn build_duplex_stream( + &self, + config: &DuplexStreamConfig, + mut data_callback: D, + error_callback: E, + timeout: Option, + ) -> Result + where + T: SizedSample, + D: FnMut(&[T], &mut [T], &DuplexCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + self.build_duplex_stream_raw( + config, + T::FORMAT, + move |input, output, info| { + data_callback( + input + .as_slice() + .expect("host supplied incorrect sample type"), + output + .as_slice_mut() + .expect("host supplied incorrect sample type"), + info, + ) + }, + error_callback, + timeout, + ) + } + + /// Create a dynamically typed duplex stream. + /// + /// This method allows working with sample data as raw bytes, useful when the sample + /// format is determined at runtime. For compile-time known formats, prefer + /// [`build_duplex_stream`](Self::build_duplex_stream). + /// + /// # Parameters + /// + /// * `config` - The duplex stream configuration specifying channels, sample rate, and buffer size. + /// * `sample_format` - The sample format of the audio data. + /// * `data_callback` - Called periodically with synchronized input and output buffers as [`Data`]. + /// * `error_callback` - Called when a stream error occurs (e.g., device disconnected). + /// * `timeout` - Optional timeout for backend operations. `None` indicates blocking behavior, + /// `Some(duration)` sets a maximum wait time. Not all backends support timeouts. + fn build_duplex_stream_raw( + &self, + _config: &DuplexStreamConfig, + _sample_format: SampleFormat, + _data_callback: D, + _error_callback: E, + _timeout: Option, + ) -> Result + where + D: FnMut(&Data, &mut Data, &DuplexCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + Err(BuildStreamError::StreamConfigNotSupported) + } } /// A stream created from [`Device`](DeviceTrait), with methods to control playback.