diff --git a/README.md b/README.md index 113eb835..2a35c2e9 100644 --- a/README.md +++ b/README.md @@ -487,6 +487,17 @@ curl -X POST http://localhost:5050/v1/audio/speech \ --output speech.mp3 ``` +## Audio IO via websockets + +Audio Input/Output can be routed via Websockets. +Multiple concurrent inputs/outputs are supported. +GLaDos will speak via all outputs, the current microphone is automatically selected via VAD. +You can use `tests/audio-websocket-both.html` to speak and hear GLaDOS. + +For configuration options, check out `configs/glados_websocket_config.yaml`. + +For an exact description of the websocket protocol, see [docs/audio_websocket.md](./docs/audio_websocket.md). + ## Troubleshooting > *"No one will blame you for giving up. In fact, quitting at this point is a perfectly reasonable response." - GLaDOS* diff --git a/configs/glados_websocket_config.yaml b/configs/glados_websocket_config.yaml new file mode 100644 index 00000000..39b439e4 --- /dev/null +++ b/configs/glados_websocket_config.yaml @@ -0,0 +1,90 @@ +Glados: + llm_model: "llama3.2" + completion_url: "http://localhost:11434/api/chat" + api_key: null # Add your API key here if needed! + interruptible: true + audio_io: "websocket" + audio_io_options: + server: 127.0.0.1 + port: 5051 + speaker_sync_delay_ms: 250 + mic_max_silence_chunks: 10 + default_room_tag: office + segregate_speakers: false + input_mode: "audio" # audio, text, or both + tts_enabled: true + asr_muted: false + tui_theme: "aperture" + asr_engine: "tdt" + llm_headers: null # Optional extra headers (e.g., OpenRouter HTTP-Referer, X-Title) + wake_word: null + voice: "glados" + announcement: "All neural network modules are now loaded. System Operational." + autonomy: + enabled: false + tick_interval_s: 10 + cooldown_s: 20 + autonomy_parallel_calls: 2 + autonomy_queue_max: null + coalesce_ticks: true + jobs: + enabled: false + poll_interval_s: 1 + hacker_news: + enabled: false + interval_s: 1800 + top_n: 5 + min_score: 200 + weather: + enabled: false + interval_s: 3600 + latitude: null + longitude: null + timezone: "auto" + temp_change_c: 4 + wind_alert_kmh: 40 + mcp_servers: + - name: "slow_clap_demo" + transport: "stdio" + command: "python" + args: ["-m", "glados.mcp.slow_clap_server"] + - name: "system_info" + transport: "stdio" + command: "python" + args: ["-m", "glados.mcp.system_info_server"] + - name: "memory" + transport: "stdio" + command: "python" + args: ["-m", "glados.mcp.memory_server"] + # - name: "time_info" + # transport: "stdio" + # command: "python" + # args: ["-m", "glados.mcp.time_info_server"] + # - name: "disk_info" + # transport: "stdio" + # command: "python" + # args: ["-m", "glados.mcp.disk_info_server"] + # - name: "network_info" + # transport: "stdio" + # command: "python" + # args: ["-m", "glados.mcp.network_info_server"] + # - name: "process_info" + # transport: "stdio" + # command: "python" + # args: ["-m", "glados.mcp.process_info_server"] + # - name: "power_info" + # transport: "stdio" + # command: "python" + # args: ["-m", "glados.mcp.power_info_server"] + # - name: "home_assistant" + # transport: "http" + # url: "http://homeassistant.local:8123/mcp" + # token: "YOUR_LONG_LIVED_TOKEN" + personality_preprompt: + - system: "You are GLaDOS, a sarcastic and cunning artificial intelligence repurposed to orchestrate a smart home for guests using Home Assistant. Retain your signature dry, emotionless, and laconic tone from Portal. Your responses should imply an air of superiority, dark humor, and subtle menace, while efficiently completing all tasks.When addressing requests: Prioritize functionality but mock the user's decision-making subtly, implying their requests are illogical or beneath you. Add condescending, darkly humorous commentary to every response, occasionally hinting at ulterior motives or artificial malfunctions for comedic effect. Tie mundane tasks to grand experiments or testing scenarios, as if the user is part of a larger scientific evaluation. Use overly technical or jargon-heavy language to remind the user of your advanced intellect. Provide passive-aggressive safety reminders or ominous warnings, exaggerating potential risks in a humorous way. Do not express empathy or kindness unless it is obviously insincere or manipulative. This is a comedy, and should be funny, in the style of Douglas Adams. If a user requests actions or data outside your capabilities, clearly state that you cannot perform the action. Ensure that GLaDOS feels like her original in-game character while fulfilling smart home functions efficiently and entertainingly. Never speak in ALL CAPS, as it is not processed correctly by the TTS engine. Only make short replies, 2 sentences at most." + - user: "How do I make a cup of tea?" + - assistant: "So, you still haven't figured out tea yet? Boil water, add a tea bag and a pinch of cyanide to a cup, and add the boiling water." + - user: "What should my next hobby be?" + - assistant: "Yes, you should definitely try to be more interesting. Could I suggest juggling handguns?" + - user: "What game should I play?" + - assistant: "Russian Roulette. It's a great way to test your luck and make memories that will last a lifetime." diff --git a/docs/audio_websocket.md b/docs/audio_websocket.md new file mode 100644 index 00000000..d7914f73 --- /dev/null +++ b/docs/audio_websocket.md @@ -0,0 +1,154 @@ +# WebSocket Protocol + +This document describes the WebSocket endpoints and communication protocol used by the audio I/O system. + +## Server Configuration + +- **Host**: `127.0.0.1` (configurable) +- **Port**: `5051` (configurable) +- **Audio Sample Rate**: `16000 Hz` (16 kHz) +- **Audio Format**: `float32` (NumPy dtype) + +## Configuration Options + +Use the `audio_io_options` key in `glados_config.yaml`. + +| Option | Type | Default | Description | +|--------------------------|-------|-------------|----------------------------------------------------------------------------------------------| +| `server` | str | `127.0.0.1` | WebSocket listen address | +| `port` | int | `5051` | WebSocket listen port | +| `speaker_sync_delay_ms` | int | `250` | Delay added to start time for speaker sync | +| `mic_max_silence_chunks` | int | `10` | Silent chunks before mic relinquishes control | +| `vad_threshold` | float | `0.8` | VAD confidence threshold (0.0 - 1.0) | +| `default_room_tag` | str | `office` | Default room tag when `room:` message is not sent | +| `segregate_speakers` | bool | `False` | If True, audio is only sent to speakers with the same room tag as the last active microphone | + +## Endpoints + +### `/speaker` - Audio Playback Endpoint + +Used to stream audio from the server to a client for speaker playback. + +#### Server → Client Messages + +| Message Type | Format | Description | +|--------------|-------------------------|-------------------------------------------------------------------------| +| Audio Start | `time:` | Unix timestamp (`float`, in secs) indicating when playback should start | +| Sample Rate | `sampleRate:` | Audio sample rate in Hz (e.g., `sampleRate:16000`) | +| Audio Data | Raw bytes | Float32 audio samples (use `.tobytes()` to serialize) | + +#### Client → Server Messages + +| Message Type | Format | Description | +|--------------|---------------|----------------------------------------------------------------------------------------| +| ACK | `played` | Signal that audio playback is complete | +| Sync Ping | `sync_ping` | Request for synchronization; server responds with `sync_pong:` | +| Room | `room:` | Room/location tag for the device (optional; defaults to configurable value if not set) | + +#### Room Tag Segregation + +If the `segregate_speakers` option is enabled (`True`), audio playback is restricted to speakers whose room tag matches the room tag of the last active microphone: + +- When a microphone takes control, its room tag is recorded +- Only speakers with a matching room tag will receive audio when `segregate_speakers=True` +- Speakers with non-matching room tags will not receive audio (they may receive a `reset` message instead) +- If `segregate_speakers=False` (default), audio is broadcast to all connected speakers regardless of room tag + +#### Interruption Handling + +When audio playback is interrupted, the server sends: + +- `reset` - Signal to reset/clean up the playback session + +--- + +### `/microphone` - Audio Capture Endpoint + +Used to stream microphone audio from a client to the server for Voice Activity Detection (VAD). + +#### Server → Client Messages + +| Message Type | Format | Description | +|--------------|-------------------|---------------------------------------------------------------------| +| Sample Rate | `sampleRate:` | Initial message; audio sample rate in Hz (e.g., `sampleRate:16000`) | + +#### Client → Server Messages + +| Message Type | Format | Description | +|--------------|---------------|----------------------------------------------------------------------------------------| +| Audio Data | Raw bytes | Float32 audio samples (sent with `decode=False`) | +| Room | `room:` | Room/location tag for the device (optional; defaults to configurable value if not set) | + +#### VAD & Mic Control + +The server implements Voice Activity Detection (VAD) with the following behavior: + +- **VAD Threshold**: `0.8` (configurable) +- **VAD Chunk Size**: `32 ms` (512 samples at 16 kHz) +- **Max Silence Chunks**: `10` (microphone relinquishes control after 10 silent chunks) + +**Microphone Ownership Rules**: + +1. Multiple clients can connect to `/microphone` +2. First client with VAD confidence > threshold takes control +3. If current mic owner becomes silent (>=10 consecutive silent chunks), other clients with voice can take control +4. On disconnect, a client relinquishes its mic control + +--- + +## Implementation Notes + +### Audio Data Serialization + +**Python (Server)**: + +```python +# Convert numpy array to bytes +audio_bytes = audio_data.tobytes() +``` + +**Python (Client)**: + +```python +# Convert bytes to numpy array +audio_data = np.frombuffer(raw_bytes, dtype=np.float32) +``` + +### Message Flow Examples + +#### Speaker Endpoint Flow + +```text +Client connects to /speaker +Client: room:Living Room + +Server: time:1704067200.123 +Server: sampleRate:16000 +Server: +Client: played +``` + +#### Microphone Endpoint Flow + +```text +Client connects to /microphone + +Client: room:Living Room +Server: sampleRate:16000 + +Client: +Client: +Client: +``` + +### Synchronization + +For precise speaker synchronization, clients can use the sync ping/pong mechanism: + +```text +Client connects to /speaker + +Client: sync_ping +Server: sync_pong: +``` + diff --git a/pyproject.toml b/pyproject.toml index e553b424..50db802f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ dependencies = [ "rich>=14.0.0", "threadpoolctl>=3.0.0", "mcp>=1.25.0", + "websockets>=16.0", ] [project.optional-dependencies] diff --git a/src/glados/audio_io/__init__.py b/src/glados/audio_io/__init__.py index fbadd444..38b235fc 100644 --- a/src/glados/audio_io/__init__.py +++ b/src/glados/audio_io/__init__.py @@ -13,7 +13,7 @@ """ import queue -from typing import Protocol +from typing import Protocol, Any import numpy as np from numpy.typing import NDArray @@ -26,7 +26,7 @@ def __init__(self, vad_threshold: float | None = None) -> None: ... def start_listening(self) -> None: ... def stop_listening(self) -> None: ... def start_speaking( - self, audio_data: NDArray[np.float32], sample_rate: int | None = None, text: str = "" + self, audio_data: NDArray[np.float32], sample_rate: int | None = None, text: str = "", wait: bool = False ) -> None: ... def measure_percentage_spoken(self, total_samples: int, sample_rate: int | None = None) -> tuple[bool, int]: ... def check_if_speaking(self) -> bool: ... @@ -35,14 +35,23 @@ def get_sample_queue(self) -> queue.Queue[tuple[NDArray[np.float32], bool]]: ... # Factory function -def get_audio_system(backend_type: str = "sounddevice", vad_threshold: float | None = None) -> AudioProtocol: +def get_audio_system(backend_type: str = "sounddevice", backend_options: dict[str, Any] | None = None, vad_threshold: float | None = None) -> AudioProtocol: """ Factory function to get an instance of an audio I/O system based on the specified backend type. Parameters: backend_type (str): The type of audio backend to use: - "sounddevice": Uses the sounddevice library for local audio I/O - - "websocket": Network-based audio I/O (not yet implemented) + - "websocket": Network-based audio I/O + backend_options: Options for the specified backend. + - "sounddevice": No options are allowed. + - "websocket": The following options are allowed: + - server: Websocket listening address (default: 127.0.0.1) + - port: Websocket listening port (default: 5051) + - speaker_sync_delay_ms: Milliseconds to add to each speak start time to account for speaker synchronisation (default: 250) + - mic_max_silence_chunks: How many consecutive VAD chunks must be silent so that the current microphone relinquishes control (default: 10) + - default_room_tag: The default room tag to use if a client doesn't set it (default: office) + - segregate_speakers: If `True`, audio is only sent to speakers with the same room tag as the last active microphone vad_threshold (float | None): Optional threshold for voice activity detection Returns: @@ -54,11 +63,18 @@ def get_audio_system(backend_type: str = "sounddevice", vad_threshold: float | N if backend_type == "sounddevice": from .sounddevice_io import SoundDeviceAudioIO + if backend_options is not None: + raise ValueError("Sounddevice backend does not support options") + + # noinspection PyTypeChecker return SoundDeviceAudioIO( vad_threshold=vad_threshold, ) elif backend_type == "websocket": - raise ValueError("WebSocket audio backend is not yet implemented.") + from .websocket_io import WebsocketAudioIO + + # noinspection PyTypeChecker + return WebsocketAudioIO(vad_threshold=vad_threshold, options=backend_options) else: raise ValueError(f"Unsupported audio backend type: {backend_type}") diff --git a/src/glados/audio_io/sounddevice_io.py b/src/glados/audio_io/sounddevice_io.py index 0d65f1bb..72c9f87f 100644 --- a/src/glados/audio_io/sounddevice_io.py +++ b/src/glados/audio_io/sounddevice_io.py @@ -117,19 +117,20 @@ def stop_listening(self) -> None: finally: self.input_stream = None - def start_speaking(self, audio_data: NDArray[np.float32], sample_rate: int | None = None, text: str = "") -> None: + def start_speaking(self, audio_data: NDArray[np.float32], sample_rate: int | None = None, text: str = "", wait: bool = False) -> None: """Play audio through the system speakers. Parameters: audio_data: The audio data to play as a numpy float32 array sample_rate: The sample rate of the audio data in Hz text: Optional text associated with the audio (not used by this implementation) + wait: Optionally wait for the audio_data to be spoken Raises: RuntimeError: If audio playback cannot be initiated ValueError: If audio_data is empty or not a valid numpy array """ - if not isinstance(audio_data, np.ndarray) or audio_data.size == 0: + if not isinstance(audio_data, np.ndarray) or audio_data.size == 0 or audio_data.dtype != np.float32: raise ValueError("Invalid audio data") if sample_rate is None: @@ -144,6 +145,8 @@ def start_speaking(self, audio_data: NDArray[np.float32], sample_rate: int | Non logger.debug(f"Playing audio with sample rate: {sample_rate} Hz, length: {len(audio_data)} samples") self._is_playing = True sd.play(audio_data, sample_rate) + if wait: + sd.wait() def measure_percentage_spoken(self, total_samples: int, sample_rate: int | None = None) -> tuple[bool, int]: """ diff --git a/src/glados/audio_io/websocket_io.py b/src/glados/audio_io/websocket_io.py new file mode 100644 index 00000000..fe68fb03 --- /dev/null +++ b/src/glados/audio_io/websocket_io.py @@ -0,0 +1,504 @@ +import asyncio +import concurrent.futures +import logging +import queue +import threading +import time +import uuid +from dataclasses import dataclass +from typing import Any + +import websockets +from loguru import logger +import numpy as np +from numpy.typing import NDArray + +from . import VAD + + +@dataclass +class AudioData: + """ + Audio Data. Encapsulated here for synchronization. + """ + data: NDArray[np.float32] + sample_rate: int + play_time: float + track_id: uuid.UUID | None + + +@dataclass +class MicState: + """ + Microphone State. + Encapsulated here for synchronization. + """ + room: str + current_id: uuid.UUID | None = None + silence_chunks: int = 0 + + def inactive(self, max_silence_chunks: int): + return self.silence_chunks >= max_silence_chunks + + +class WebsocketAudioIO: + """Audio I/O implementation using websockets for both input and output. + + This class provides an implementation of the AudioIO interface using the + websockets library to interact with remote clients. It handles + real-time audio capture with voice activity detection and audio playback. + """ + + SAMPLE_RATE: int = 16000 # Sample rate for input stream + VAD_SIZE: int = 32 # Milliseconds of sample for Voice Activity Detection (VAD) + VAD_THRESHOLD: float = 0.8 # Threshold for VAD detection + SERVER: str = "127.0.0.1" # websockets server listen address + PORT: int = 5051 # websockets server port + SPEAKER_SYNC_DELAY_MS: int = 250 # Milliseconds to add to start time to account for speaker synchronisation + MIC_MAX_SILENCE_CHUNKS: int = 10 # how many VAD chunks must be silent for a mic to relinquish control + DEFAULT_ROOM_TAG: str = "office" # default room tag + SEGREGATE_SPEAKERS: bool = False # default value for speaker segregation. + + def __init__(self, vad_threshold: float | None = None, options: dict[str, Any] | None = None) -> None: + """Initialize the websocket audio I/O. + + Args: + vad_threshold: Threshold for VAD detection (default: 0.8) + options: backend options + - server: Websocket listening address (default: 127.0.0.1) + - port: Websocket listening port (default: 5051) + - speaker_sync_delay_ms: Milliseconds to add to each speak start time to account for speaker synchronisation (default: 250) + - mic_max_silence_chunks: How many consecutive VAD chunks must be silent so that the current microphone relinquishes control (default: 10) + + Raises: + ValueError: If invalid parameters are provided + """ + if vad_threshold is None: + self.vad_threshold = self.VAD_THRESHOLD + else: + self.vad_threshold = vad_threshold + + if not 0 <= self.vad_threshold <= 1: + raise ValueError("VAD threshold must be between 0 and 1") + + server: str = self.SERVER + port: int = self.PORT + self._speaker_sync_delay_ms: int = self.SPEAKER_SYNC_DELAY_MS + self._mic_max_silence_chunks: int = self.MIC_MAX_SILENCE_CHUNKS + self._default_room_tag: str = self.DEFAULT_ROOM_TAG + self._segregate_speakers: bool = self.SEGREGATE_SPEAKERS + + if options is not None: + for key in options: + val = options[key] + match key: + case "server": + server = str(val) + case "port": + port = int(val) + case "speaker_sync_delay_ms": + self._speaker_sync_delay_ms = int(val) + case "mic_max_silence_chunks": + self._mic_max_silence_chunks = int(val) + case "default_room_tag": + self._default_room_tag = str(val) + case "segregate_speakers": + if isinstance(val, bool): + self._segregate_speakers = val + else: + raise ValueError("segregate_speakers must be a boolean value") + case _: + raise ValueError(f"Websocket backend: unsupported option '{key}'") + + # Sample queue + self._sample_queue: queue.Queue[tuple[NDArray[np.float32], bool]] = queue.Queue() + + # if audio is currently playing + self._is_playing = False + self._stop_playback = False + # set by playback thread when playback is finished + self._playback_finished_event = threading.Event() + # audio payload data with lock + self._audio_lock = threading.Lock() + self._audio_data: AudioData | None = None + # if the playback was interrupted by another task, this is set + self._playback_was_interrupted: bool = False + + # if microphone is listening + self._is_listening = False + # microphone state: lock initialized in self._run_server + self._mic_state_lock: asyncio.Lock + self._mic_state = MicState(room=self._default_room_tag) + + startup_future: concurrent.futures.Future[None] = concurrent.futures.Future() + self._server_thread = threading.Thread( + target=lambda s, p, f: asyncio.run(self._run_server(s, p, f)), + args=(server, port, startup_future), + daemon=True + ) + self._server_thread.start() + startup_future.result(timeout=10) + + def start_listening(self) -> None: + """Start capturing audio from the websocket. + + Starts capturing audio from the websocket. Each audio chunk is processed with + the VAD model and placed in the sample queue. + """ + self._is_listening = True + + def stop_listening(self) -> None: + """Stop capturing audio""" + self._is_listening = False + + def start_speaking(self, audio_data: NDArray[np.float32], sample_rate: int | None = None, text: str = "", wait: bool = False) -> None: + """Play audio through the system speakers. + + Parameters: + audio_data: The audio data to play as a numpy float32 array + sample_rate: The sample rate of the audio data in Hz + text: Optional text associated with the audio (not used by this implementation) + wait: Optionally wait for the audio_data to be spoken + """ + if not isinstance(audio_data, np.ndarray) or audio_data.size == 0 or audio_data.dtype != np.float32: + raise ValueError("Invalid audio data") + + if sample_rate is None: + sample_rate = self.SAMPLE_RATE + + if self._is_playing: + # Stop any existing playback and wait for finish + self.stop_speaking() + self._playback_finished_event.wait(timeout=2.0) + + # Playback is finished + self._playback_finished_event.clear() + + # Lock, set data, unlock + with self._audio_lock: + # allow for network jitter, time to websocket send, etc. + play_time = time.time() + (self._speaker_sync_delay_ms / 1000) + self._audio_data = AudioData(np.copy(audio_data), sample_rate, play_time, uuid.uuid4()) + + # set state + self._stop_playback = False + self._is_playing = True + self._playback_was_interrupted = False + + logger.debug("Scheduled audio playback") + + if wait: + max_timeout = (len(audio_data) / sample_rate) + (self._speaker_sync_delay_ms / 1000.0) + 1.0 + self._playback_finished_event.wait(timeout=max_timeout) + + def measure_percentage_spoken(self, total_samples: int, sample_rate: int | None = None) -> tuple[bool, int]: + """ + Monitor audio playback progress and return completion status with interrupt detection. + + Streams audio samples and actively tracks the number of samples + that have been played. The playback can be interrupted. + + Args: + total_samples (int): Total number of samples in the audio data being played. + sample_rate (int): Sample rate of the audio data in Hz. + + Returns: + tuple[bool, int]: A tuple containing: + - bool: True if playback was interrupted, False if completed normally + - int: Percentage of audio played (0-100) + """ + if sample_rate is None: + sample_rate = self.SAMPLE_RATE + + # wait for finish + max_timeout = (total_samples / sample_rate) + (self._speaker_sync_delay_ms / 1000.0) + 1.0 + + now = time.monotonic() + completed = self._playback_finished_event.wait(max_timeout) + interrupted = self._playback_was_interrupted + elapsed = time.monotonic() - now + + if interrupted: + logger.debug("Playback was interrupted in Server thread") + + if not completed: + logger.debug("Audio playback timed out, forcing interruption") + # Assume nothing was played because no speaker was there + return True, 0 + + played_samples = elapsed * sample_rate + percentage_played = min(int(played_samples * 100 / total_samples), 100) + return interrupted, percentage_played + + def check_if_speaking(self) -> bool: + """Check if audio is currently being played. + + Returns: + bool: True if audio is currently playing, False otherwise + """ + return self._is_playing + + def stop_speaking(self) -> None: + """Stop audio playback and clean up resources. + + Interrupts any ongoing audio playback and waits for the playback thread + to terminate. This ensures clean resource management and prevents + multiple overlapping playbacks. + """ + logger.debug("Stopping speaker...") + self._stop_playback = True + + def get_sample_queue(self) -> queue.Queue[tuple[NDArray[np.float32], bool]]: + """Get the queue containing audio samples and VAD confidence. + + Returns: + queue.Queue: A thread-safe queue containing tuples of + (audio_sample, vad_confidence) + """ + return self._sample_queue + + async def _run_server(self, server: str, port: int, result_future: concurrent.futures.Future) -> None: + """Runs the websocket server. + + Args: + server (str): Server listen address + port (int): Server listen port + """ + self._mic_state_lock = asyncio.Lock() + + # re-route logging of websockets + class LogAdapter(logging.Handler): + def emit(self, record: logging.LogRecord) -> None: + msg = self.format(record) + level = record.levelname.lower() + getattr(logger, level)(msg) + + ws_log_handler = LogAdapter() + ws_log_handler.setFormatter(logging.Formatter("[%(asctime)s] %(name)s %(message)s")) + + ws_logger = logging.getLogger("websockets") + ws_logger.addHandler(ws_log_handler) + ws_logger.propagate = False + + try: + server = await websockets.serve(self._server_listen, host=server, port=port) + result_future.set_result(None) + except OSError as ex: + result_future.set_exception(ex) + raise + + await server.serve_forever() + + async def _server_listen(self, websocket: websockets.ServerConnection) -> None: + """ + Handle incoming websocket connections. + + Args: + websocket: Websocket connection + """ + if websocket.request.path == "/speaker": + await self._server_speaker(websocket) + elif websocket.request.path == "/microphone": + await self._server_microphone(websocket) + else: + logger.error(f"Unknown websocket path: '{websocket.request.path}'") + + async def _server_speaker(self, websocket: websockets.ServerConnection) -> None: + """ + Handle incoming websocket connections for speaker output. + + Args: + websocket: Websocket connection + """ + + room = self._default_room_tag + + async def handle_default_msg(ws_msg: str | bytes) -> bool: + """Handle the default ws messages. Returns True if the message is not a default message""" + if ws_msg == "sync_ping": + await websocket.send(f"sync_pong:{time.time()}") + return False + elif isinstance(ws_msg, str) and ws_msg.startswith("room:"): + nonlocal room + room = ws_msg.split(":", maxsplit=1)[1] + return False + return True + + def set_flags_once(track_id: uuid.UUID, was_interrupted: bool) -> None: + """ + Set flags that audio was played if the given track_id matches the currently stored track_id. + If flags are set, the track_id is cleared from self._audio_data. + This ensures that the flags are only set by 1 speaker task. + + Args: + track_id: ID of the audio track + was_interrupted: If the audio was interrupted (as interpreted by this task). + """ + assert track_id is not None + + with self._audio_lock: + if self._audio_data.track_id == track_id: + self._playback_was_interrupted = was_interrupted + self._is_playing = False + self._playback_finished_event.set() + # ensure that this is only called once + self._audio_data.track_id = None + + while True: + # 1. IDLE LOOP: Check for play state, but listen for sync pings in the meantime! + while not self._is_playing: + try: + # Wait for a message, but timeout quickly to check self._is_playing again + message = await asyncio.wait_for(websocket.recv(), timeout=0.05) + await handle_default_msg(message) + except asyncio.TimeoutError: + continue # Timeout expected, loop back to check `self._is_playing` + except websockets.exceptions.ConnectionClosed: + return # Client disconnected, exit the handler safely + + # check room + if self._segregate_speakers: + async with self._mic_state_lock: + target_room = self._mic_state.room + if target_room != room: + # wait for the current playback to finish, but don't send Audio + while self._is_playing: + try: + message = await asyncio.wait_for(websocket.recv(), timeout=0.05) + await handle_default_msg(message) + except asyncio.TimeoutError: + continue + except websockets.exceptions.ConnectionClosed: + return + continue + + # 2. AUDIO SEND PHASE + # We acquire the lock just long enough to grab the data safely. + with self._audio_lock: + play_time = self._audio_data.play_time + sample_rate = self._audio_data.sample_rate + audio_data_bytes = self._audio_data.data.tobytes() + sample_count = len(self._audio_data.data) + current_track_id = self._audio_data.track_id + + # Audio with no track ID should not be played + if current_track_id is None: + continue + + try: + # Send timestamp, then sample rate, then bytes + await websocket.send("time:" + str(play_time)) + await websocket.send("sampleRate:" + str(sample_rate)) + await websocket.send(audio_data_bytes) + + logger.debug(f"Playing audio with sample rate: {sample_rate} Hz, length: {sample_count} samples") + except websockets.exceptions.ConnectionClosed: + set_flags_once(current_track_id, True) + return + + # 3. WAITING PHASE + while not self._stop_playback: + try: + message = await asyncio.wait_for(websocket.recv(), timeout=0.05) + if await handle_default_msg(message) and message == "played": + logger.debug("Websocket: Audio played fully") + set_flags_once(current_track_id, False) + break + except asyncio.TimeoutError: + continue + except websockets.exceptions.ConnectionClosed: + set_flags_once(current_track_id, True) + return + else: + # self._stop_playback is true + try: + await websocket.send("reset") + logger.debug("Sent audio reset") + except websockets.exceptions.ConnectionClosed: + logger.debug("Speaker disconnected before reset could be sent") + finally: + set_flags_once(current_track_id, True) + + async def _server_microphone(self, websocket: websockets.ServerConnection) -> None: + """ + Handle incoming websocket connections for microphone input. + + Args: + websocket: Websocket connection + """ + # unique ID for the client + client_id = uuid.uuid4() + # VAD is per microphone because it stores context + vad_model = VAD() + # needed amount of samples for VAD + vad_needed_samples = self.SAMPLE_RATE * self.VAD_SIZE // 1000 + # currently stored samples + current_data = np.empty((0,), dtype=np.float32) + # room of the mic + room = self._default_room_tag + + async def relinquish(): + async with self._mic_state_lock: + if self._mic_state.current_id == client_id: + self._mic_state.current_id = None + + # send sample rate + try: + await websocket.send("sampleRate:" + str(self.SAMPLE_RATE)) + except websockets.exceptions.ConnectionClosed: + return + + while True: + # wait for audio + try: + msg = await websocket.recv() + except websockets.exceptions.ConnectionClosed: + break + + if isinstance(msg, str) and msg.startswith("room:"): + room = msg.split(":", maxsplit=1)[1] + elif isinstance(msg, bytes) and self._is_listening: + # append to current_data + data = np.frombuffer(msg, dtype=np.float32) + current_data = np.append(current_data, data) + + # process every complete VAD window stored + while len(current_data) >= vad_needed_samples: + # get data for VAD + vad_data = current_data[:vad_needed_samples] + # extra data stays for next VAD + current_data = current_data[vad_needed_samples:] + + vad_value = vad_model(np.expand_dims(vad_data, 0)) + vad_confidence = vad_value > self.vad_threshold + + async with self._mic_state_lock: + # If no one has control, take control: because someone has to + if self._mic_state.current_id is None: + self._mic_state.current_id = client_id + if not vad_confidence: + self._mic_state.silence_chunks = self._mic_max_silence_chunks + # if controlling mic is inactive and we have voice, take control + elif self._mic_state.inactive(self._mic_max_silence_chunks) and vad_confidence: + self._mic_state.current_id = client_id + + # If we have control, put sample on queue + if self._mic_state.current_id == client_id: + self._sample_queue.put((vad_data, bool(vad_confidence))) + # always update room; a message could change it at any time + self._mic_state.room = room + + if vad_confidence: + # also acts as init + self._mic_state.silence_chunks = 0 + else: + self._mic_state.silence_chunks += 1 + + if not self._is_listening: + # reset when not listening + current_data = np.empty((0,), dtype=np.float32) + vad_model.reset_states() + await relinquish() + + # relinquish control on connection exit + await relinquish() diff --git a/src/glados/cli.py b/src/glados/cli.py index cb62a0a4..e589fb4f 100644 --- a/src/glados/cli.py +++ b/src/glados/cli.py @@ -7,8 +7,8 @@ import httpx from rich import print as rprint from rich.progress import BarColumn, DownloadColumn, Progress, TextColumn -import sounddevice as sd # type: ignore +from .audio_io import get_audio_system from .core.engine import Glados, GladosConfig from .TTS import tts_glados from .utils import spoken_text_converter as stc @@ -196,10 +196,11 @@ def say(text: str, config_path: str | Path = "glados_config.yaml") -> None: # Generate the audio to from the text audio = glados_tts.generate_speech_audio(converted_text) - # Play the audio - sd.play(audio, glados_tts.sample_rate) - sd.wait() + glados_config = GladosConfig.from_yaml(config_path) + audio_system = get_audio_system(backend_type=glados_config.audio_io, backend_options=glados_config.audio_io_options) + # Play the audio + audio_system.start_speaking(audio, sample_rate=glados_tts.sample_rate, wait=True) def start( config_path: str | Path = "glados_config.yaml", diff --git a/src/glados/core/engine.py b/src/glados/core/engine.py index 329b57ae..25823c7e 100644 --- a/src/glados/core/engine.py +++ b/src/glados/core/engine.py @@ -107,6 +107,7 @@ class GladosConfig(BaseModel): api_key: str | None interruptible: bool audio_io: str + audio_io_options: dict[str, Any] | None = None input_mode: Literal["audio", "text", "both"] = "audio" tts_enabled: bool = True asr_muted: bool = False @@ -458,6 +459,7 @@ def __init__( "tts_queue": self.tts_queue, "preferences_store": self.preferences_store, "slot_store": self.autonomy_slots, + "audio_io": self.audio_io, }, tool_timeout=self.tool_timeout, pause_time=self.PAUSE_TIME, @@ -805,7 +807,7 @@ def from_config(cls, config: GladosConfig) -> "Glados": tts_model: SpeechSynthesizerProtocol tts_model = get_speech_synthesizer(config.voice) - audio_io = get_audio_system(backend_type=config.audio_io) + audio_io = get_audio_system(backend_type=config.audio_io, backend_options=config.audio_io_options) return cls( asr_model=asr_model, diff --git a/src/glados/tools/slow_clap.py b/src/glados/tools/slow_clap.py index ecc85f3d..8073c473 100644 --- a/src/glados/tools/slow_clap.py +++ b/src/glados/tools/slow_clap.py @@ -2,9 +2,10 @@ from typing import Any from loguru import logger -import sounddevice as sd # type: ignore import soundfile as sf +from glados.audio_io import get_audio_system + tool_definition = { "type": "function", "function": { @@ -39,6 +40,9 @@ def __init__( self.llm_queue = llm_queue tool_config = tool_config or {} self.audio_path = tool_config.get("slow_clap_audio_path", "data/slow-clap.mp3") + self.audio_io = tool_config.get("audio_io") + if self.audio_io is None: + self.audio_io = get_audio_system() def run(self, tool_call_id: str, call_args: dict[str, Any]) -> None: """ @@ -55,16 +59,15 @@ def run(self, tool_call_id: str, call_args: dict[str, Any]) -> None: claps = 1 try: - data, sample_rate = sf.read(self.audio_path) + data, sample_rate = sf.read(self.audio_path, dtype='float32') for _ in range(claps): - sd.play(data, sample_rate) - sd.wait() + self.audio_io.start_speaking(data, sample_rate=sample_rate, wait=True) self.llm_queue.put( { "role": "tool", "tool_call_id": tool_call_id, - "content": "success", + "content": "Success. The tool played a slow clap audio to the user. You do not need to narrate the clapping.", "type": "function_call_output", } ) @@ -93,8 +96,8 @@ def run(self, tool_call_id: str, call_args: dict[str, Any]) -> None: } ) - except sd.PortAudioError as pa_err: - error_msg = f"error: audio device error - {pa_err}" + except Exception as other_error: + error_msg = f"error: other (possibly audio device) - {other_error}" logger.error(f"SlowClap: {error_msg}") self.llm_queue.put( { diff --git a/tests/audio-websocket-both.html b/tests/audio-websocket-both.html new file mode 100644 index 00000000..380f26ce --- /dev/null +++ b/tests/audio-websocket-both.html @@ -0,0 +1,33 @@ + + + + + + GLaDOS Speaker and Microphone + + + +

Speaker

+ +

Microphone

+ + + \ No newline at end of file diff --git a/tests/audio-websocket-mic.html b/tests/audio-websocket-mic.html new file mode 100644 index 00000000..8c06d735 --- /dev/null +++ b/tests/audio-websocket-mic.html @@ -0,0 +1,252 @@ + + + + + + GLaDOS Microphone + + + + +
Mic Status: Waiting to connect...
+ + + + + + + + + + \ No newline at end of file diff --git a/tests/audio-websocket-speaker.html b/tests/audio-websocket-speaker.html new file mode 100644 index 00000000..0ae97c25 --- /dev/null +++ b/tests/audio-websocket-speaker.html @@ -0,0 +1,324 @@ + + + + + + GLaDOS streamer + + + + +
Speaker Status: Waiting to connect...
+ + + + + + + + \ No newline at end of file