Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions config/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,16 @@
// default and keeps the segment count (and per-request overhead) sane.
'segment_seconds' => 6,
],

// WebSocket server settings for SyncPlay realtime communication.
// The WS worker runs as count=1 (one authoritative SyncPlayManager for all
// connections) on a dedicated port separate from the HTTP workers.
'websocket' => [
'host' => '0.0.0.0',
'port' => 8097,
// Interval for cleaning up stale connections (seconds).
'stale_connection_timeout' => 300,
// Interval for cleaning up stale SyncPlay groups (seconds).
'stale_group_timeout' => 3600,
],
];
47 changes: 43 additions & 4 deletions src/Server/WebSocket/WebSocketServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Workerman\Connection\TcpConnection;
use Phlix\Common\Logger\LoggerFactory;
use Phlix\Common\Logger\LogChannels;
use Phlix\Session\SyncPlay\SyncPlayManager;

/**
* WebSocket server implementation for real-time communication.
Expand All @@ -33,13 +34,17 @@ class WebSocketServer
/** @var ConnectionPool Manages active WebSocket connections */
private ConnectionPool $connections;

/** @var SyncPlayManager|null SyncPlay manager for group state */
private ?SyncPlayManager $syncPlayManager = null;

/** @var array<string, mixed> Server configuration */
private array $config;

/**
* Creates a new WebSocket server instance.
*
* @param array<string, mixed> $config Server configuration with 'host' and 'port' keys
* @param MessageHandler|null $handler Optional message handler (for SP1 singletons)
*
* @example
* ```php
Expand All @@ -50,11 +55,11 @@ class WebSocketServer
* $server->run();
* ```
*/
public function __construct(array $config)
public function __construct(array $config, ?MessageHandler $handler = null)
{
$this->config = $config;
$this->connections = ConnectionPool::getInstance();
$this->handler = new MessageHandler($this->connections);
$this->handler = $handler ?? new MessageHandler($this->connections);

$host = $config['host'] ?? '0.0.0.0';
$port = $config['port'] ?? 8097;
Expand Down Expand Up @@ -86,11 +91,31 @@ public function onStart(): void
'port' => is_numeric($port) ? (int)$port : 8097,
]);

$staleConnectionTimeoutRaw = $this->config['stale_connection_timeout'] ?? 300;
$staleConnectionTimeout = is_numeric($staleConnectionTimeoutRaw) ? (int) $staleConnectionTimeoutRaw : 300;

$staleGroupTimeoutRaw = $this->config['stale_group_timeout'] ?? 3600;
$staleGroupTimeout = is_numeric($staleGroupTimeoutRaw) ? (int) $staleGroupTimeoutRaw : 3600;

// Start cleanup timer for stale connections (every 60 seconds)
if (function_exists('Workerman\Timer')) {
\Workerman\Timer::add(60, function (): void {
$this->connections->cleanupStaleConnections(300);
\Workerman\Timer::add(60, function () use ($staleConnectionTimeout): void {
$this->connections->cleanupStaleConnections($staleConnectionTimeout);
});

// Start cleanup timer for stale SyncPlay groups (every 5 minutes)
$syncPlayManager = $this->syncPlayManager;
if ($syncPlayManager !== null) {
\Workerman\Timer::add(300, function () use ($syncPlayManager, $staleGroupTimeout): void {
$removed = $syncPlayManager->cleanupStaleGroups($staleGroupTimeout);
if ($removed > 0) {
$logger = LoggerFactory::get(LogChannels::WEBSOCKET);
$logger->info('Cleaned up stale SyncPlay groups', [
'removed' => $removed,
]);
}
});
}
}
}

Expand Down Expand Up @@ -218,6 +243,20 @@ public function getHandler(): MessageHandler
return $this->handler;
}

/**
* Sets the SyncPlay manager for group state management.
*
* This must be called before onStart() to enable the stale groups
* cleanup timer.
*
* @param \Phlix\Session\SyncPlay\SyncPlayManager $manager The SyncPlay manager
* @return void
*/
public function setSyncPlayManager(\Phlix\Session\SyncPlay\SyncPlayManager $manager): void
{
$this->syncPlayManager = $manager;
}

/**
* Starts the WebSocket server.
*
Expand Down
65 changes: 65 additions & 0 deletions start.php
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,71 @@
// REST + SSR surface.
// -----------------------------------------------------------------------------

// -----------------------------------------------------------------------------
// 4a. WebSocket worker for SyncPlay realtime communication (SP1).
//
// SyncPlay requires exactly ONE authoritative SyncPlayManager shared across all
// WS connections, so this worker runs as count=1 on port 8097 (separate from the
// HTTP workers on 8096). The manager is constructed once in onWorkerStart and
// its state persists for the lifetime of this worker process.
//
// Architecture:
// - WebSocketServer accepts an injected MessageHandler so the same handler
// instance is used for both SyncPlay message routing and general WS events.
// - SyncPlayManager::initialize() registers the per-type callbacks that route
// incoming SyncPlay messages to the appropriate handler methods.
// - ConnectionPool and MessageHandler are singletons shared within this worker.
// -----------------------------------------------------------------------------

try {
$wsWorker = new Worker('websocket://0.0.0.0:8097');

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use the configured WebSocket listen address

The new config/server.php block exposes websocket.host and websocket.port, but the actual Workerman listener is still hard-coded to 0.0.0.0:8097. In any deployment that overrides the config, the code below may log/construct the inner server with the configured values while the socket that clients actually reach still binds to all interfaces on 8097, so the configured WebSocket endpoint is ignored. Build this listen URI from $config['websocket'] before creating $wsWorker.

Useful? React with 👍 / 👎.

$wsWorker->count = 1;
$wsWorker->name = 'phlix-server-ws';
$wsWorker->onWorkerStart = static function (Worker $w) use ($config, $applyCuratedCoroutineHooks): void {
$applyCuratedCoroutineHooks();

// Build the container inside the fork so each worker owns its own state.
$container = ContainerFactory::create($config);

// Create the shared MessageHandler and ConnectionPool singletons.
$connections = \Phlix\Server\WebSocket\ConnectionPool::getInstance();
$messageHandler = new \Phlix\Server\WebSocket\MessageHandler($connections);

// Construct ONE authoritative SyncPlayManager and initialize it with the
// message handler so SyncPlay message types are routed to their handlers.
/** @var \Phlix\Common\Logger\StructuredLogger $logger */
$logger = $container->get('logger.websocket');
$syncPlayManager = new \Phlix\Session\SyncPlay\SyncPlayManager($logger);
$syncPlayManager->initialize($messageHandler);

// Build and configure the WebSocket server with the shared manager.
$wsConfigRaw = $config['websocket'] ?? null;
$wsConfig = is_array($wsConfigRaw) ? $wsConfigRaw : [];
$wsConfig['host'] = $wsConfig['host'] ?? '0.0.0.0';
$wsConfig['port'] = $wsConfig['port'] ?? 8097;
$wsConfig['stale_connection_timeout'] = $wsConfig['stale_connection_timeout'] ?? 300;
$wsConfig['stale_group_timeout'] = $wsConfig['stale_group_timeout'] ?? 3600;

/** @var array<string, mixed> $wsConfig */
$wsServer = new \Phlix\Server\WebSocket\WebSocketServer($wsConfig, $messageHandler);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Attach the callbacks to the listening WebSocket worker

When the daemon is started with php start.php start, the worker created at new Worker('websocket://0.0.0.0:8097') is the one Workerman forks and uses to accept client connections. This line constructs a separate WebSocketServer inside onWorkerStart, which creates its own private Worker after Worker::runAll() has already selected the workers, so its onConnect/onMessage callbacks are not installed on the listening $w. As a result, clients connecting to port 8097 will not receive the connected welcome or have messages routed to MessageHandler/SyncPlayManager; bind the callbacks to $w or create the WebSocketServer worker before runAll() instead.

Useful? React with 👍 / 👎.

$wsServer->setSyncPlayManager($syncPlayManager);

// Trigger onStart to log the startup message and arm cleanup timers.
// The actual Workerman worker callbacks (onConnect, onMessage, onClose)
// are already bound in the WebSocketServer constructor.
$wsServer->onStart();

/** @var \Phlix\Common\Logger\StructuredLogger $wsLogger */
$wsLogger = $container->get('logger.websocket');
$wsLogger->info('SyncPlay manager initialized', [
'message' => 'One SyncPlayManager instance handles all WS connections',
]);
};
} catch (\Throwable $e) {
// The WS worker is best-effort; never block the HTTP server.
trigger_error('Failed to set up WebSocket worker: ' . $e->getMessage(), E_USER_WARNING);
}

// -----------------------------------------------------------------------------
// 4b. Managed worker processes (1.1b).
//
Expand Down
Loading
Loading