From e1ae5614dd33a054cdfd8cc34131b3f246111722 Mon Sep 17 00:00:00 2001 From: Franci Penov Date: Thu, 19 Mar 2026 00:55:51 -0700 Subject: [PATCH 1/4] Kortexa Radio --- community/kortexa-radio/README.md | 41 +++++++++++ community/kortexa-radio/__init__.py | 0 community/kortexa-radio/main.py | 106 ++++++++++++++++++++++++++++ 3 files changed, 147 insertions(+) create mode 100644 community/kortexa-radio/README.md create mode 100644 community/kortexa-radio/__init__.py create mode 100644 community/kortexa-radio/main.py diff --git a/community/kortexa-radio/README.md b/community/kortexa-radio/README.md new file mode 100644 index 00000000..f80b727c --- /dev/null +++ b/community/kortexa-radio/README.md @@ -0,0 +1,41 @@ +# Kortexa Radio + +![Community](https://img.shields.io/badge/OpenHome-Community-orange?style=flat-square) +![Author](https://img.shields.io/badge/Author-@kortexa--ai-lightgrey?style=flat-square) + +## What It Does +Streams AI-generated radio from [radio.kortexa.ai](https://radio.kortexa.ai) to your OpenHome device. Music, DJ announcements, and news segments — all generated in real-time by AI. + +## Suggested Trigger Words +- "start radio" +- "kortexa radio" + +## Setup +No setup required. The stream is public. + +## How It Works +1. Say the trigger word +2. The ability enters music mode and starts streaming +3. Interrupt (wake word or touch) to stop — control returns to the agent +4. Say the trigger word again to restart + +## Example Conversation +> **User:** "start radio" +> **AI:** "Tuning in to Kortexa Radio." +> *(AI-generated music plays through the device)* +> +> *(User interrupts)* +> **AI:** *(agent resumes normal conversation)* +> +> **User:** "start radio" +> **AI:** "Tuning in to Kortexa Radio." +> *(Music plays again)* + +## Stream Details +- Format: MP3, 128kbps, 48kHz stereo +- Source: https://api.kortexa.ai/radio/stream +- Content: AI-generated music, DJ segments, news updates +- Programming changes by time of day + +## Logs +Look for `[KortexaRadio]` entries in OpenHome Live Editor logs. diff --git a/community/kortexa-radio/__init__.py b/community/kortexa-radio/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/community/kortexa-radio/main.py b/community/kortexa-radio/main.py new file mode 100644 index 00000000..d5b1b6cd --- /dev/null +++ b/community/kortexa-radio/main.py @@ -0,0 +1,106 @@ +"""OpenHome skill — Kortexa Radio. + +Trigger word: "start radio" + +Streams AI-generated radio from radio.kortexa.ai. +Stream runs in a session task. Main flow monitors for stop event +and cleans up properly so the agent regains control. +""" + +import asyncio +import httpx +from src.agent.capability import MatchingCapability +from src.main import AgentWorker +from src.agent.capability_worker import CapabilityWorker + +STREAM_URL = "https://api.kortexa.ai/radio/stream" +TAG = "[KortexaRadio]" +CHUNK_SIZE = 25 * 1024 + + +class KortexaRadioCapability(MatchingCapability): + worker: AgentWorker = None + capability_worker: CapabilityWorker = None + _streaming = False + + # Do not change following tag of register capability + #{{register capability}} + + def call(self, worker: AgentWorker): + self.worker = worker + self.capability_worker = CapabilityWorker(self.worker) + self.worker.session_tasks.create(self.run()) + + async def run(self): + try: + await self.capability_worker.speak("Tuning in to Kortexa Radio.") + + self.worker.music_mode_event.set() + await self.capability_worker.send_data_over_websocket("music-mode", {"mode": "on"}) + + # Launch streaming in a separate task + KortexaRadioCapability._streaming = True + stream_task = self.worker.session_tasks.create(self._stream()) + + # Monitor for stop event + while KortexaRadioCapability._streaming: + if self.worker.music_mode_stop_event.is_set(): + self.worker.editor_logging_handler.info(f"{TAG} Stop event detected") + KortexaRadioCapability._streaming = False + break + await self.worker.session_tasks.sleep(0.5) + + # Cancel the stream task + if stream_task: + stream_task.cancel() + + self.worker.editor_logging_handler.info(f"{TAG} Cleaning up") + + except Exception as e: + self.worker.editor_logging_handler.error(f"{TAG} Error: {e}") + finally: + KortexaRadioCapability._streaming = False + try: + await self.capability_worker.stream_end() + except Exception: + pass + try: + await self.capability_worker.send_data_over_websocket("music-mode", {"mode": "off"}) + except Exception: + pass + self.worker.music_mode_event.clear() + self.worker.music_mode_stop_event.clear() + self.capability_worker.resume_normal_flow() + self.worker.editor_logging_handler.info(f"{TAG} Done, control returned") + + async def _stream(self): + """Stream audio in a cancellable task.""" + try: + self.worker.editor_logging_handler.info(f"{TAG} Connecting...") + await self.capability_worker.stream_init() + + async with httpx.AsyncClient(timeout=None) as client: + async with client.stream("GET", STREAM_URL, follow_redirects=True) as response: + self.worker.editor_logging_handler.info(f"{TAG} Connected, status={response.status_code}") + + if response.status_code != 200: + KortexaRadioCapability._streaming = False + return + + async for chunk in response.aiter_bytes(chunk_size=CHUNK_SIZE): + if not chunk: + continue + if not KortexaRadioCapability._streaming: + break + + while self.worker.music_mode_pause_event.is_set(): + await asyncio.sleep(0.1) + + await self.capability_worker.send_audio_data_in_stream(chunk) + + except asyncio.CancelledError: + self.worker.editor_logging_handler.info(f"{TAG} Stream task cancelled") + except Exception as e: + self.worker.editor_logging_handler.error(f"{TAG} Stream error: {e}") + finally: + KortexaRadioCapability._streaming = False From 591ab099ca302dd3f8e393a1f124ac98a47458e5 Mon Sep 17 00:00:00 2001 From: Franci Penov Date: Thu, 19 Mar 2026 01:00:17 -0700 Subject: [PATCH 2/4] Missed one asyncio --- community/kortexa-radio/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/community/kortexa-radio/main.py b/community/kortexa-radio/main.py index d5b1b6cd..c1a44ba6 100644 --- a/community/kortexa-radio/main.py +++ b/community/kortexa-radio/main.py @@ -94,7 +94,7 @@ async def _stream(self): break while self.worker.music_mode_pause_event.is_set(): - await asyncio.sleep(0.1) + await self.worker.session_tasks.sleep(0.1) await self.capability_worker.send_audio_data_in_stream(chunk) From 6e0c791ebe98dbff8cc5a9296c70e8a52c9012f0 Mon Sep 17 00:00:00 2001 From: Franci Penov Date: Sun, 22 Mar 2026 16:54:24 -0700 Subject: [PATCH 3/4] Fixes for stopping logic --- community/kortexa-radio/main.py | 249 +++++++++++++++++++++++--------- 1 file changed, 179 insertions(+), 70 deletions(-) diff --git a/community/kortexa-radio/main.py b/community/kortexa-radio/main.py index c1a44ba6..ce8aa3c5 100644 --- a/community/kortexa-radio/main.py +++ b/community/kortexa-radio/main.py @@ -1,106 +1,215 @@ -"""OpenHome skill — Kortexa Radio. - -Trigger word: "start radio" - -Streams AI-generated radio from radio.kortexa.ai. -Stream runs in a session task. Main flow monitors for stop event -and cleans up properly so the agent regains control. -""" - import asyncio import httpx + from src.agent.capability import MatchingCapability from src.main import AgentWorker from src.agent.capability_worker import CapabilityWorker STREAM_URL = "https://api.kortexa.ai/radio/stream" -TAG = "[KortexaRadio]" +EVENTS_URL = "https://api.kortexa.ai/radio/events" CHUNK_SIZE = 25 * 1024 +TAG = "[KortexaRadio]" + +CONTINUE_PROMPT = "Say 'play' to start the radio, or 'stop' to exit." class KortexaRadioCapability(MatchingCapability): worker: AgentWorker = None capability_worker: CapabilityWorker = None - _streaming = False - - # Do not change following tag of register capability - #{{register capability}} - - def call(self, worker: AgentWorker): - self.worker = worker - self.capability_worker = CapabilityWorker(self.worker) - self.worker.session_tasks.create(self.run()) - async def run(self): - try: - await self.capability_worker.speak("Tuning in to Kortexa Radio.") - - self.worker.music_mode_event.set() - await self.capability_worker.send_data_over_websocket("music-mode", {"mode": "on"}) - - # Launch streaming in a separate task - KortexaRadioCapability._streaming = True - stream_task = self.worker.session_tasks.create(self._stream()) - - # Monitor for stop event - while KortexaRadioCapability._streaming: - if self.worker.music_mode_stop_event.is_set(): - self.worker.editor_logging_handler.info(f"{TAG} Stop event detected") - KortexaRadioCapability._streaming = False - break - await self.worker.session_tasks.sleep(0.5) - - # Cancel the stream task - if stream_task: - stream_task.cancel() - - self.worker.editor_logging_handler.info(f"{TAG} Cleaning up") - - except Exception as e: - self.worker.editor_logging_handler.error(f"{TAG} Error: {e}") - finally: - KortexaRadioCapability._streaming = False - try: - await self.capability_worker.stream_end() - except Exception: - pass - try: - await self.capability_worker.send_data_over_websocket("music-mode", {"mode": "off"}) - except Exception: - pass - self.worker.music_mode_event.clear() - self.worker.music_mode_stop_event.clear() - self.capability_worker.resume_normal_flow() - self.worker.editor_logging_handler.info(f"{TAG} Done, control returned") + #{{register_capability}} async def _stream(self): - """Stream audio in a cancellable task.""" + """Stream radio audio with pause/stop handling.""" try: - self.worker.editor_logging_handler.info(f"{TAG} Connecting...") - await self.capability_worker.stream_init() async with httpx.AsyncClient(timeout=None) as client: async with client.stream("GET", STREAM_URL, follow_redirects=True) as response: self.worker.editor_logging_handler.info(f"{TAG} Connected, status={response.status_code}") if response.status_code != 200: - KortexaRadioCapability._streaming = False + await self.capability_worker.speak("Could not connect to the radio stream.") return + await self.capability_worker.stream_init() + async for chunk in response.aiter_bytes(chunk_size=CHUNK_SIZE): if not chunk: + self.worker.editor_logging_handler.info(f"{TAG} No chunk") continue - if not KortexaRadioCapability._streaming: - break + + if self.worker.music_mode_stop_event.is_set(): + self.worker.editor_logging_handler.info(f"{TAG} Stop event, ending stream") + await self.capability_worker.stream_end() + return while self.worker.music_mode_pause_event.is_set(): - await self.worker.session_tasks.sleep(0.1) + await asyncio.sleep(0.1) await self.capability_worker.send_audio_data_in_stream(chunk) - except asyncio.CancelledError: - self.worker.editor_logging_handler.info(f"{TAG} Stream task cancelled") + await self.capability_worker.stream_end() + + except asyncio.CancelledError as e: + self.worker.editor_logging_handler.info(f"{TAG} Stream cancelled: {e}") except Exception as e: self.worker.editor_logging_handler.error(f"{TAG} Stream error: {e}") + + async def _keep_events_connected(self): + """Stay connected to SSE to register as a listener. Events are not processed.""" + try: + while True: + try: + async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client: + async with client.stream("GET", EVENTS_URL) as response: + async for _ in response.aiter_lines(): + pass + except httpx.HTTPError as e: + self.worker.editor_logging_handler.error(f"{TAG} SSE error: {e}") + pass + + await asyncio.sleep(5) + except asyncio.CancelledError as e: + self.worker.editor_logging_handler.error(f"{TAG} SSE cancelled: {e}") + + pass + + async def run(self): + """Main setup and conversation loop.""" + + first_time = True + is_playing = False + is_stopping = False + events_task = None + + # await self.capability_worker.wait_for_complete_transcription() + + # self.worker.music_mode_event.set() + # await self.capability_worker.send_data_over_websocket("music-mode", {"mode": "on"}) + + # await asyncio.sleep(5) + + # await self.capability_worker.send_data_over_websocket("music-mode", {"mode": "off"}) + # self.worker.music_mode_event.clear() + + # await self.capability_worker.speak("Nope, don't want to") + + # await asyncio.sleep(1) + + # self.capability_worker.resume_normal_flow() + + try: + while True: + if is_stopping: + self.worker.editor_logging_handler.info(f"{TAG} Waiting for stream to finish") + + await asyncio.sleep(0.1) + + if not events_task: + self.worker.editor_logging_handler.info(f"{TAG} No events task") + + break + + continue + + if first_time: + self.worker.editor_logging_handler.info(f"{TAG} First trigger") + + msg = "start" + first_time = False + else: + self.worker.editor_logging_handler.info(f"{TAG} Interruption") + + msg = await self.capability_worker.user_response() + + if not msg or not msg.strip(): + self.worker.editor_logging_handler.info(f"{TAG} User silent") + + if is_playing: + msg = "stop" + else: + msg = "start" + + normalized = msg.strip().lower() + + self.worker.editor_logging_handler.info(f"{TAG} Command: {normalized}") + + if "stop" in normalized or "off" in normalized: + is_stopping = True + + self.worker.editor_logging_handler.info(f"{TAG} Stop stream") + + self.worker.music_mode_stop_event.set() + + self.worker.editor_logging_handler.info(f"{TAG} Turn off music mode") + + await self.capability_worker.send_data_over_websocket("music-mode", {"mode": "off"}) + self.worker.music_mode_event.clear() + + elif "play" in normalized or "start" in normalized or "on" in normalized or "radio" in normalized: + self.worker.editor_logging_handler.info(f"{TAG} Start playing") + + is_playing = True + + await self.capability_worker.speak("Tuning in to Kortexa Radio.") + + # Subscribe to events (registers us as a listener) + self.worker.editor_logging_handler.info(f"{TAG} Register as a listener") + events_task = self.worker.session_tasks.create(self._keep_events_connected()) + + self.worker.editor_logging_handler.info(f"{TAG} Turn on music mode") + + self.worker.music_mode_event.set() + await self.capability_worker.send_data_over_websocket("music-mode", {"mode": "on"}) + + # Stream audio (blocks until stop event or error) + await self._stream() + + if events_task: + self.worker.editor_logging_handler.info(f"{TAG} Unregister as a listener") + + events_task.cancel() + events_task = None + + await self.capability_worker.speak("Radio off! Catch you later.") + + self.worker.music_mode_stop_event.clear() + + is_playing = False + + self.worker.editor_logging_handler.info(f"{TAG} Stop playing") + + break + + except Exception as e: + self.worker.editor_logging_handler.error(f"{TAG} Error: {e}") + finally: - KortexaRadioCapability._streaming = False + self.worker.editor_logging_handler.info(f"{TAG} Clean up") + + if events_task: + self.worker.editor_logging_handler.info(f"{TAG} Unregister as a listener") + + events_task.cancel() + events_task = None + + if is_playing: + self.worker.editor_logging_handler.info(f"{TAG} Turn off music mode") + + await self.capability_worker.send_data_over_websocket("music-mode", {"mode": "off"}) + self.worker.music_mode_event.clear() + + self.worker.editor_logging_handler.info(f"{TAG} Radio OFF") + + await asyncio.sleep(1) + self.capability_worker.resume_normal_flow() + + def call(self, worker: AgentWorker): + try: + self.worker = worker + self.capability_worker = CapabilityWorker(self.worker) + self.worker.editor_logging_handler.info(f"{TAG} Radio ON") + + self.worker.session_tasks.create(self.run()) + except Exception as e: + self.worker.editor_logging_handler.warning(e) From 2244f9817bcf321633a500b4bea0aac89d60dca6 Mon Sep 17 00:00:00 2001 From: Franci Penov Date: Sun, 22 Mar 2026 16:56:09 -0700 Subject: [PATCH 4/4] Accidental asyncio --- community/kortexa-radio/main.py | 24 ++++-------------------- 1 file changed, 4 insertions(+), 20 deletions(-) diff --git a/community/kortexa-radio/main.py b/community/kortexa-radio/main.py index ce8aa3c5..46f18c8d 100644 --- a/community/kortexa-radio/main.py +++ b/community/kortexa-radio/main.py @@ -44,7 +44,7 @@ async def _stream(self): return while self.worker.music_mode_pause_event.is_set(): - await asyncio.sleep(0.1) + await self.worker.session_tasks.sleep(0.1) await self.capability_worker.send_audio_data_in_stream(chunk) @@ -68,7 +68,7 @@ async def _keep_events_connected(self): self.worker.editor_logging_handler.error(f"{TAG} SSE error: {e}") pass - await asyncio.sleep(5) + await self.worker.session_tasks.sleep(5) except asyncio.CancelledError as e: self.worker.editor_logging_handler.error(f"{TAG} SSE cancelled: {e}") @@ -82,28 +82,12 @@ async def run(self): is_stopping = False events_task = None - # await self.capability_worker.wait_for_complete_transcription() - - # self.worker.music_mode_event.set() - # await self.capability_worker.send_data_over_websocket("music-mode", {"mode": "on"}) - - # await asyncio.sleep(5) - - # await self.capability_worker.send_data_over_websocket("music-mode", {"mode": "off"}) - # self.worker.music_mode_event.clear() - - # await self.capability_worker.speak("Nope, don't want to") - - # await asyncio.sleep(1) - - # self.capability_worker.resume_normal_flow() - try: while True: if is_stopping: self.worker.editor_logging_handler.info(f"{TAG} Waiting for stream to finish") - await asyncio.sleep(0.1) + await self.worker.session_tasks.sleep(0.1) if not events_task: self.worker.editor_logging_handler.info(f"{TAG} No events task") @@ -201,7 +185,7 @@ async def run(self): self.worker.editor_logging_handler.info(f"{TAG} Radio OFF") - await asyncio.sleep(1) + await self.worker.session_tasks.sleep(1) self.capability_worker.resume_normal_flow() def call(self, worker: AgentWorker):