diff --git a/config/server.php b/config/server.php index 98ce764d..34a422fe 100644 --- a/config/server.php +++ b/config/server.php @@ -64,4 +64,16 @@ // default and keeps the segment count (and per-request overhead) sane. 'segment_seconds' => 6, ], + + // WebSocket server settings for SyncPlay realtime communication. + // The WS worker runs as count=1 (one authoritative SyncPlayManager for all + // connections) on a dedicated port separate from the HTTP workers. + 'websocket' => [ + 'host' => '0.0.0.0', + 'port' => 8097, + // Interval for cleaning up stale connections (seconds). + 'stale_connection_timeout' => 300, + // Interval for cleaning up stale SyncPlay groups (seconds). + 'stale_group_timeout' => 3600, + ], ]; diff --git a/src/Server/WebSocket/WebSocketServer.php b/src/Server/WebSocket/WebSocketServer.php index f9516cae..62376551 100644 --- a/src/Server/WebSocket/WebSocketServer.php +++ b/src/Server/WebSocket/WebSocketServer.php @@ -8,6 +8,7 @@ use Workerman\Connection\TcpConnection; use Phlix\Common\Logger\LoggerFactory; use Phlix\Common\Logger\LogChannels; +use Phlix\Session\SyncPlay\SyncPlayManager; /** * WebSocket server implementation for real-time communication. @@ -33,6 +34,9 @@ class WebSocketServer /** @var ConnectionPool Manages active WebSocket connections */ private ConnectionPool $connections; + /** @var SyncPlayManager|null SyncPlay manager for group state */ + private ?SyncPlayManager $syncPlayManager = null; + /** @var array Server configuration */ private array $config; @@ -40,6 +44,7 @@ class WebSocketServer * Creates a new WebSocket server instance. * * @param array $config Server configuration with 'host' and 'port' keys + * @param MessageHandler|null $handler Optional message handler (for SP1 singletons) * * @example * ```php @@ -50,11 +55,11 @@ class WebSocketServer * $server->run(); * ``` */ - public function __construct(array $config) + public function __construct(array $config, ?MessageHandler $handler = null) { $this->config = $config; $this->connections = ConnectionPool::getInstance(); - $this->handler = new MessageHandler($this->connections); + $this->handler = $handler ?? new MessageHandler($this->connections); $host = $config['host'] ?? '0.0.0.0'; $port = $config['port'] ?? 8097; @@ -86,11 +91,31 @@ public function onStart(): void 'port' => is_numeric($port) ? (int)$port : 8097, ]); + $staleConnectionTimeoutRaw = $this->config['stale_connection_timeout'] ?? 300; + $staleConnectionTimeout = is_numeric($staleConnectionTimeoutRaw) ? (int) $staleConnectionTimeoutRaw : 300; + + $staleGroupTimeoutRaw = $this->config['stale_group_timeout'] ?? 3600; + $staleGroupTimeout = is_numeric($staleGroupTimeoutRaw) ? (int) $staleGroupTimeoutRaw : 3600; + // Start cleanup timer for stale connections (every 60 seconds) if (function_exists('Workerman\Timer')) { - \Workerman\Timer::add(60, function (): void { - $this->connections->cleanupStaleConnections(300); + \Workerman\Timer::add(60, function () use ($staleConnectionTimeout): void { + $this->connections->cleanupStaleConnections($staleConnectionTimeout); }); + + // Start cleanup timer for stale SyncPlay groups (every 5 minutes) + $syncPlayManager = $this->syncPlayManager; + if ($syncPlayManager !== null) { + \Workerman\Timer::add(300, function () use ($syncPlayManager, $staleGroupTimeout): void { + $removed = $syncPlayManager->cleanupStaleGroups($staleGroupTimeout); + if ($removed > 0) { + $logger = LoggerFactory::get(LogChannels::WEBSOCKET); + $logger->info('Cleaned up stale SyncPlay groups', [ + 'removed' => $removed, + ]); + } + }); + } } } @@ -218,6 +243,20 @@ public function getHandler(): MessageHandler return $this->handler; } + /** + * Sets the SyncPlay manager for group state management. + * + * This must be called before onStart() to enable the stale groups + * cleanup timer. + * + * @param \Phlix\Session\SyncPlay\SyncPlayManager $manager The SyncPlay manager + * @return void + */ + public function setSyncPlayManager(\Phlix\Session\SyncPlay\SyncPlayManager $manager): void + { + $this->syncPlayManager = $manager; + } + /** * Starts the WebSocket server. * diff --git a/start.php b/start.php index af4a9994..c87a472d 100755 --- a/start.php +++ b/start.php @@ -168,6 +168,71 @@ // REST + SSR surface. // ----------------------------------------------------------------------------- +// ----------------------------------------------------------------------------- +// 4a. WebSocket worker for SyncPlay realtime communication (SP1). +// +// SyncPlay requires exactly ONE authoritative SyncPlayManager shared across all +// WS connections, so this worker runs as count=1 on port 8097 (separate from the +// HTTP workers on 8096). The manager is constructed once in onWorkerStart and +// its state persists for the lifetime of this worker process. +// +// Architecture: +// - WebSocketServer accepts an injected MessageHandler so the same handler +// instance is used for both SyncPlay message routing and general WS events. +// - SyncPlayManager::initialize() registers the per-type callbacks that route +// incoming SyncPlay messages to the appropriate handler methods. +// - ConnectionPool and MessageHandler are singletons shared within this worker. +// ----------------------------------------------------------------------------- + +try { + $wsWorker = new Worker('websocket://0.0.0.0:8097'); + $wsWorker->count = 1; + $wsWorker->name = 'phlix-server-ws'; + $wsWorker->onWorkerStart = static function (Worker $w) use ($config, $applyCuratedCoroutineHooks): void { + $applyCuratedCoroutineHooks(); + + // Build the container inside the fork so each worker owns its own state. + $container = ContainerFactory::create($config); + + // Create the shared MessageHandler and ConnectionPool singletons. + $connections = \Phlix\Server\WebSocket\ConnectionPool::getInstance(); + $messageHandler = new \Phlix\Server\WebSocket\MessageHandler($connections); + + // Construct ONE authoritative SyncPlayManager and initialize it with the + // message handler so SyncPlay message types are routed to their handlers. + /** @var \Phlix\Common\Logger\StructuredLogger $logger */ + $logger = $container->get('logger.websocket'); + $syncPlayManager = new \Phlix\Session\SyncPlay\SyncPlayManager($logger); + $syncPlayManager->initialize($messageHandler); + + // Build and configure the WebSocket server with the shared manager. + $wsConfigRaw = $config['websocket'] ?? null; + $wsConfig = is_array($wsConfigRaw) ? $wsConfigRaw : []; + $wsConfig['host'] = $wsConfig['host'] ?? '0.0.0.0'; + $wsConfig['port'] = $wsConfig['port'] ?? 8097; + $wsConfig['stale_connection_timeout'] = $wsConfig['stale_connection_timeout'] ?? 300; + $wsConfig['stale_group_timeout'] = $wsConfig['stale_group_timeout'] ?? 3600; + + /** @var array $wsConfig */ + $wsServer = new \Phlix\Server\WebSocket\WebSocketServer($wsConfig, $messageHandler); + $wsServer->setSyncPlayManager($syncPlayManager); + + // Trigger onStart to log the startup message and arm cleanup timers. + // The actual Workerman worker callbacks (onConnect, onMessage, onClose) + // are already bound in the WebSocketServer constructor. + $wsServer->onStart(); + + /** @var \Phlix\Common\Logger\StructuredLogger $wsLogger */ + $wsLogger = $container->get('logger.websocket'); + $wsLogger->info('SyncPlay manager initialized', [ + 'message' => 'One SyncPlayManager instance handles all WS connections', + ]); + }; +} catch (\Throwable $e) { + // The WS worker is best-effort; never block the HTTP server. + trigger_error('Failed to set up WebSocket worker: ' . $e->getMessage(), E_USER_WARNING); +} + // ----------------------------------------------------------------------------- // 4b. Managed worker processes (1.1b). // diff --git a/tests/Unit/Server/WebSocket/WebSocketServerTest.php b/tests/Unit/Server/WebSocket/WebSocketServerTest.php new file mode 100644 index 00000000..eeaecc30 --- /dev/null +++ b/tests/Unit/Server/WebSocket/WebSocketServerTest.php @@ -0,0 +1,223 @@ +clear(); + } + + /** + * @covers \Phlix\Server\WebSocket\WebSocketServer::__construct + * @covers \Phlix\Server\WebSocket\WebSocketServer::getHandler + */ + public function testCanConstructWithConfig(): void + { + $config = [ + 'host' => '0.0.0.0', + 'port' => 8097, + ]; + + $server = new WebSocketServer($config); + + $this->assertInstanceOf(WebSocketServer::class, $server); + $this->assertInstanceOf(MessageHandler::class, $server->getHandler()); + } + + /** + * @covers \Phlix\Server\WebSocket\WebSocketServer::__construct + * @covers \Phlix\Server\WebSocket\WebSocketServer::getHandler + */ + public function testCanConstructWithInjectedMessageHandler(): void + { + $config = [ + 'host' => '0.0.0.0', + 'port' => 8097, + ]; + + $pool = ConnectionPool::getInstance(); + $customHandler = new MessageHandler($pool); + + $server = new WebSocketServer($config, $customHandler); + + $this->assertSame($customHandler, $server->getHandler()); + } + + /** + * @covers \Phlix\Server\WebSocket\WebSocketServer::setSyncPlayManager + */ + public function testCanSetSyncPlayManager(): void + { + $config = [ + 'host' => '0.0.0.0', + 'port' => 8097, + ]; + + $server = new WebSocketServer($config); + $syncPlayManager = new SyncPlayManager(); + + // setSyncPlayManager should not throw + $server->setSyncPlayManager($syncPlayManager); + + $this->assertTrue(true); // If we get here, no exception was thrown + } + + /** + * @covers \Phlix\Session\SyncPlay\SyncPlayManager::initialize + * @covers \Phlix\Session\SyncPlay\SyncPlayManager::getStats + */ + public function testSyncPlayManagerCanBeInitializedWithMessageHandler(): void + { + $pool = ConnectionPool::getInstance(); + $handler = new MessageHandler($pool); + $syncPlayManager = new SyncPlayManager(); + + // Initialize should not throw + $syncPlayManager->initialize($handler); + + // getStats should return valid stats after initialization + $stats = $syncPlayManager->getStats(); + $this->assertArrayHasKey('total_groups', $stats); + $this->assertArrayHasKey('total_members', $stats); + $this->assertArrayHasKey('time_sync_status', $stats); + $this->assertEquals(0, $stats['total_groups']); + $this->assertEquals(0, $stats['total_members']); + } + + /** + * @covers \Phlix\Session\SyncPlay\SyncPlayManager::createGroup + * @covers \Phlix\Session\SyncPlay\SyncPlayManager::getGroupState + * @covers \Phlix\Session\SyncPlay\SyncPlayManager::cleanupStaleGroups + */ + public function testSyncPlayManagerGroupOperations(): void + { + $pool = ConnectionPool::getInstance(); + $handler = new MessageHandler($pool); + $syncPlayManager = new SyncPlayManager(); + $syncPlayManager->initialize($handler); + + // Create a group + $result = $syncPlayManager->createGroup('Test Group', null, 'member_1', 'Host User'); + $this->assertTrue($result['success']); + $this->assertArrayHasKey('group', $result); + + $groupId = $result['group']['group_id']; + + // Get group state + $state = $syncPlayManager->getGroupState($groupId); + $this->assertNotNull($state); + $this->assertEquals('Test Group', $state['group_name']); + + // Cleanup stale groups (none should be stale) + $removed = $syncPlayManager->cleanupStaleGroups(3600); + $this->assertEquals(0, $removed); + } + + /** + * @covers \Phlix\Session\SyncPlay\SyncPlayManager::joinGroup + * @covers \Phlix\Session\SyncPlay\SyncPlayManager::leaveGroup + * @covers \Phlix\Session\SyncPlay\SyncPlayManager::listGroups + */ + public function testSyncPlayManagerJoinAndLeaveGroup(): void + { + $pool = ConnectionPool::getInstance(); + $handler = new MessageHandler($pool); + $syncPlayManager = new SyncPlayManager(); + $syncPlayManager->initialize($handler); + + // Create a group + $result = $syncPlayManager->createGroup('Join Test Group', null, 'host_1', 'Host'); + $this->assertTrue($result['success']); + $groupId = $result['group']['group_id']; + + // Join the group + $joinResult = $syncPlayManager->joinGroup($groupId, 'member_2', 'Guest User'); + $this->assertTrue($joinResult['success']); + + // List groups + $groups = $syncPlayManager->listGroups(); + $this->assertCount(1, $groups); + $this->assertEquals('Join Test Group', $groups[0]['name']); + $this->assertEquals(2, $groups[0]['member_count']); + + // Leave the group + $leaveResult = $syncPlayManager->leaveGroup('member_2'); + $this->assertTrue($leaveResult['success']); + + // List groups again - should still have 1 group with 1 member + $groups = $syncPlayManager->listGroups(); + $this->assertCount(1, $groups); + $this->assertEquals(1, $groups[0]['member_count']); + } + + /** + * @covers \Phlix\Session\SyncPlay\SyncPlayManager::getTimeSync + */ + public function testSyncPlayManagerGetTimeSync(): void + { + $syncPlayManager = new SyncPlayManager(); + $timeSync = $syncPlayManager->getTimeSync(); + + $this->assertInstanceOf(\Phlix\Session\SyncPlay\TimeSync::class, $timeSync); + } + + /** + * @covers \Phlix\Session\SyncPlay\SyncPlayManager::getMemberGroup + */ + public function testSyncPlayManagerGetMemberGroup(): void + { + $pool = ConnectionPool::getInstance(); + $handler = new MessageHandler($pool); + $syncPlayManager = new SyncPlayManager(); + $syncPlayManager->initialize($handler); + + // Member not in any group + $this->assertNull($syncPlayManager->getMemberGroup('unknown_member')); + + // Create group and add member + $result = $syncPlayManager->createGroup('Member Test Group', null, 'member_x', 'Member X'); + $this->assertTrue($result['success']); + $groupId = $result['group']['group_id']; + + $this->assertEquals($groupId, $syncPlayManager->getMemberGroup('member_x')); + } + + /** + * @covers \Phlix\Server\WebSocket\WebSocketServer::onStart + */ + public function testOnStartDoesNotThrow(): void + { + $config = [ + 'host' => '0.0.0.0', + 'port' => 8097, + 'stale_connection_timeout' => 300, + 'stale_group_timeout' => 3600, + ]; + + $server = new WebSocketServer($config); + $syncPlayManager = new SyncPlayManager(); + $server->setSyncPlayManager($syncPlayManager); + + // onStart should not throw even without Workerman\Timer + // (the function_exists check will cause early return) + $server->onStart(); + + $this->assertTrue(true); // If we get here, no exception was thrown + } +}