Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions src/Server/WebSocket/WebSocketServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
use Phlix\Auth\JwtHandler;
use Phlix\Common\Logger\LoggerFactory;
use Phlix\Common\Logger\LogChannels;
use Phlix\Session\SyncPlay\SyncPlayManager;
Expand Down Expand Up @@ -122,23 +123,47 @@ public function onStart(): void
/**
* Called when a new client connects.
*
* Creates a Connection wrapper, adds it to the pool, and sends
* a welcome message with the connection ID.
* Validates JWT token from query string if present, then creates
* a Connection wrapper, adds it to the pool, and sends a welcome
* message with the connection ID.
*
* @param TcpConnection $connection The Workerman TCP connection
* @return void
*/
public function onConnect(TcpConnection $connection): void
{
$token = $_GET['token'] ?? null;
$userId = null;

if (is_string($token) && $token !== '') {
$jwtSecret = $this->config['jwt_secret'] ?? null;
if (is_string($jwtSecret) && $jwtSecret !== '') {
$jwtHandler = new JwtHandler($jwtSecret);
$payload = $jwtHandler->validateToken($token);
if ($payload === null) {
$connection->close();
return;
}
$sub = $payload['sub'] ?? null;
$userId = is_string($sub) ? $sub : null;
}
}

$wsConnection = new Connection($connection);

if (is_string($userId)) {
$wsConnection->setAuthenticated(true, $userId);
}

$this->connections->add($wsConnection);

$logger = LoggerFactory::get(LogChannels::WEBSOCKET);
$logger->debug('New WebSocket connection', [
'connection_id' => $wsConnection->getId(),
'authenticated' => $wsConnection->isAuthenticated(),
'user_id' => $wsConnection->getUserId(),
]);

// Send welcome message
$wsConnection->sendMessage('connected', [
'connection_id' => $wsConnection->getId(),
'timestamp' => time(),
Expand Down
10 changes: 5 additions & 5 deletions src/Session/SyncPlay/GroupState.php
Original file line number Diff line number Diff line change
Expand Up @@ -651,22 +651,22 @@ public function isInSync(int $memberPosition): bool
/**
* Get the full group state for broadcasting to clients.
*
* Returns a comprehensive state array including members list,
* Returns a comprehensive state array including members dictionary,
* playback info, queue, and timestamps.
*
* @return array<string, mixed> Full group state
*
* @example
* ```php
* $state = $group->getState();
* // ['group_id' => 'sp_abc123', 'group_name' => 'Movie Night', 'members' => [...], ...]
* // ['group_id' => 'sp_abc123', 'group_name' => 'Movie Night', 'members' => ['member_id' => [...]], ...]
* ```
*/
public function getState(): array
{
$membersList = [];
$membersDict = [];
foreach ($this->members as $id => $member) {
$membersList[] = [
$membersDict[$id] = [
'id' => $id,
'name' => $member['name'] ?? 'Unknown',
'is_host' => $id === $this->hostId,
Expand All @@ -678,7 +678,7 @@ public function getState(): array
'group_id' => $this->id,
'group_name' => $this->name,
'member_count' => $this->getMemberCount(),
'members' => $membersList,
'members' => $membersDict,
'host_id' => $this->hostId,
'current_media_id' => $this->currentMediaId,
'current_media_duration' => $this->currentMediaDuration,
Expand Down
121 changes: 74 additions & 47 deletions src/Session/SyncPlay/SyncPlayManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Phlix\Common\Logger\LogChannels;
use Phlix\Common\Logger\StructuredLogger;
use Phlix\Server\WebSocket\Connection;
use Phlix\Server\WebSocket\ConnectionInterface;
use Phlix\Server\WebSocket\ConnectionPool;
use Phlix\Server\WebSocket\MessageHandler;

Expand Down Expand Up @@ -134,7 +135,7 @@ private function registerMessageHandlers(): void
return;
}

$handler = function (Connection $connection, array $payload) {
$handler = function (ConnectionInterface $connection, array $payload) {
$this->handleMessage($connection, $payload);
};

Expand All @@ -156,13 +157,13 @@ private function registerMessageHandlers(): void
* Routes the message to the appropriate handler based on message type.
* All exceptions are caught and reported back to the client as error messages.
*
* @param Connection $connection The WebSocket connection that sent the message
* @param ConnectionInterface $connection The WebSocket connection that sent the message
* @param array<string, mixed> $payload The decoded message payload
* @return void
*
* @see Messages For all valid message type constants
*/
private function handleMessage(Connection $connection, array $payload): void
protected function handleMessage(ConnectionInterface $connection, array $payload): void
{
$type = $payload['type'] ?? '';

Expand Down Expand Up @@ -492,21 +493,23 @@ public function listGroups(): array
* Handle playback play command from a group host.
*
* Only the host can initiate playback commands. The position is updated
* and broadcast to all other group members.
* and broadcast to all other group members. Per SP4 spec, host authorization
* uses the server-derived member_id from the authenticated connection.
*
* @param Connection $connection The WebSocket connection
* @param array<string, mixed> $payload Payload containing member_id, position, server_time
* @param ConnectionInterface $connection The WebSocket connection
* @param array<string, mixed> $payload Payload containing position, server_time
* @return void
*
* @fires Messages::TYPE_PLAYBACK_PLAY Broadcast to group members
* @fires Messages::TYPE_PLAYBACK_PLAY Sent to host directly, then broadcast to other members
*/
private function handlePlaybackPlay(Connection $connection, array $payload): void
private function handlePlaybackPlay(ConnectionInterface $connection, array $payload): void
{
$memberId = self::stringFromMixed($payload['member_id'] ?? null);
if ($memberId === '') {
$this->sendError($connection, 'NOT_IN_GROUP', 'You are not in a group');
$memberId = $connection->getUserId();
if ($memberId === null) {
$this->sendError($connection, 'NOT_AUTHENTICATED', 'Authentication required');
return;
}

$groupId = $this->memberToGroup[$memberId] ?? null;
$group = $groupId !== null ? ($this->groups[$groupId] ?? null) : null;

Expand All @@ -525,29 +528,37 @@ private function handlePlaybackPlay(Connection $connection, array $payload): voi

$group->updatePlayback(GroupState::STATE_PLAYING, $position);

$this->broadcastToGroup($groupId, Messages::TYPE_PLAYBACK_PLAY, [
$playbackFrame = [
'member_id' => $memberId,
'position' => $position,
'server_time' => $serverTime,
], [$memberId]);
];

// Send directly to host as confirmation of their command
$connection->sendFlat(Messages::TYPE_PLAYBACK_PLAY, $playbackFrame);

// Broadcast to all OTHER members
$this->broadcastToGroup($groupId, Messages::TYPE_PLAYBACK_PLAY, $playbackFrame, [$memberId]);
}

/**
* Handle playback pause command from a group host.
* Per SP4 spec, host authorization uses server-derived member_id.
*
* @param Connection $connection The WebSocket connection
* @param array<string, mixed> $payload Payload containing member_id, position, server_time
* @param ConnectionInterface $connection The WebSocket connection
* @param array<string, mixed> $payload Payload containing position, server_time
* @return void
*
* @fires Messages::TYPE_PLAYBACK_PAUSE Broadcast to group members
*/
private function handlePlaybackPause(Connection $connection, array $payload): void
private function handlePlaybackPause(ConnectionInterface $connection, array $payload): void
{
$memberId = self::stringFromMixed($payload['member_id'] ?? null);
if ($memberId === '') {
$this->sendError($connection, 'NOT_IN_GROUP', 'You are not in a group');
$memberId = $connection->getUserId();
if ($memberId === null) {
$this->sendError($connection, 'NOT_AUTHENTICATED', 'Authentication required');
return;
}

$groupId = $this->memberToGroup[$memberId] ?? null;
$group = $groupId !== null ? ($this->groups[$groupId] ?? null) : null;

Expand Down Expand Up @@ -575,20 +586,22 @@ private function handlePlaybackPause(Connection $connection, array $payload): vo

/**
* Handle playback seek command from a group host.
* Per SP4 spec, host authorization uses server-derived member_id.
*
* @param Connection $connection The WebSocket connection
* @param array<string, mixed> $payload Payload containing member_id, from_position, to_position, server_time
* @param ConnectionInterface $connection The WebSocket connection
* @param array<string, mixed> $payload Payload containing from_position, to_position, server_time
* @return void
*
* @fires Messages::TYPE_PLAYBACK_SEEK Broadcast to group members
*/
private function handlePlaybackSeek(Connection $connection, array $payload): void
private function handlePlaybackSeek(ConnectionInterface $connection, array $payload): void
{
$memberId = self::stringFromMixed($payload['member_id'] ?? null);
if ($memberId === '') {
$this->sendError($connection, 'NOT_IN_GROUP', 'You are not in a group');
$memberId = $connection->getUserId();
if ($memberId === null) {
$this->sendError($connection, 'NOT_AUTHENTICATED', 'Authentication required');
return;
}

$groupId = $this->memberToGroup[$memberId] ?? null;
$group = $groupId !== null ? ($this->groups[$groupId] ?? null) : null;

Expand Down Expand Up @@ -620,20 +633,22 @@ private function handlePlaybackSeek(Connection $connection, array $payload): voi
* Handle playback queue update from a group host.
*
* Replaces the group's current playback queue with the provided queue items.
* Per SP4 spec, host authorization uses server-derived member_id.
*
* @param Connection $connection The WebSocket connection
* @param array<string, mixed> $payload Payload containing member_id, queue array
* @param ConnectionInterface $connection The WebSocket connection
* @param array<string, mixed> $payload Payload containing queue array
* @return void
*
* @fires Messages::TYPE_PLAYBACK_QUEUE Broadcast to group members
*/
private function handlePlaybackQueue(Connection $connection, array $payload): void
private function handlePlaybackQueue(ConnectionInterface $connection, array $payload): void
{
$memberId = self::stringFromMixed($payload['member_id'] ?? null);
if ($memberId === '') {
$this->sendError($connection, 'NOT_IN_GROUP', 'You are not in a group');
$memberId = $connection->getUserId();
if ($memberId === null) {
$this->sendError($connection, 'NOT_AUTHENTICATED', 'Authentication required');
return;
}

$groupId = $this->memberToGroup[$memberId] ?? null;
$group = $groupId !== null ? ($this->groups[$groupId] ?? null) : null;

Expand Down Expand Up @@ -684,13 +699,13 @@ private function handlePlaybackQueue(Connection $connection, array $payload): vo
* Broadcasts the chat message to all other group members along with
* the sender's name and timestamp.
*
* @param Connection $connection The WebSocket connection
* @param ConnectionInterface $connection The WebSocket connection
* @param array<string, mixed> $payload Payload containing member_id, message
* @return void
*
* @fires Messages::TYPE_CHAT_MESSAGE Broadcast to group members (excluding sender)
*/
private function handleChatMessage(Connection $connection, array $payload): void
private function handleChatMessage(ConnectionInterface $connection, array $payload): void
{
$memberId = self::stringFromMixed($payload['member_id'] ?? null);
if ($memberId === '') {
Expand Down Expand Up @@ -729,13 +744,13 @@ private function handleChatMessage(Connection $connection, array $payload): void
* Processes the ping and returns a pong with server timestamp for
* calculating network latency and clock offset.
*
* @param Connection $connection The WebSocket connection
* @param ConnectionInterface $connection The WebSocket connection
* @param array<string, mixed> $payload Payload containing client_time
* @return void
*
* @see TimeSync::processPing() For ping/pong protocol details
*/
private function handleTimePing(Connection $connection, array $payload): void
private function handleTimePing(ConnectionInterface $connection, array $payload): void
{
$pong = $this->timeSync->processPing($payload);
$message = Messages::timePong($pong['client_time'], $pong['server_time']);
Expand All @@ -746,17 +761,23 @@ private function handleTimePing(Connection $connection, array $payload): void
* Handle group creation request via WebSocket.
*
* Creates a new group with the requesting member as the host.
* Per SP4 spec, member_id is server-derived from the authenticated connection's userId.
*
* @param Connection $connection The WebSocket connection
* @param array<string, mixed> $payload Payload containing member_id, member_name, group_name, password
* @param ConnectionInterface $connection The WebSocket connection
* @param array<string, mixed> $payload Payload containing member_name, group_name, password
* @return void
*
* @fires Messages::TYPE_GROUP_STATE Sent to the creating member on success
* @fires Messages::TYPE_ERROR Sent on failure
*/
private function handleGroupCreate(Connection $connection, array $payload): void
private function handleGroupCreate(ConnectionInterface $connection, array $payload): void
{
$memberId = self::stringFromMixed($payload['member_id'] ?? $connection->getId());
$memberId = $connection->getUserId();
if ($memberId === null) {
$this->sendError($connection, 'NOT_AUTHENTICATED', 'Authentication required');
return;
}

$memberName = self::stringFromMixed($payload['member_name'] ?? 'Host');
$groupName = self::stringFromMixed($payload['group_name'] ?? 'New Group');
$password = self::stringOrNullFromMixed($payload['password'] ?? null);
Expand All @@ -775,18 +796,24 @@ private function handleGroupCreate(Connection $connection, array $payload): void

/**
* Handle group join request via WebSocket.
* Per SP4 spec, member_id is server-derived from the authenticated connection's userId.
*
* @param Connection $connection The WebSocket connection
* @param array<string, mixed> $payload Payload containing group_id, member_id, member_name, password
* @param ConnectionInterface $connection The WebSocket connection
* @param array<string, mixed> $payload Payload containing group_id, member_name, password
* @return void
*
* @fires Messages::TYPE_GROUP_STATE Sent to the joining member on success
* @fires Messages::TYPE_ERROR Sent on failure
*/
private function handleGroupJoin(Connection $connection, array $payload): void
private function handleGroupJoin(ConnectionInterface $connection, array $payload): void
{
$memberId = $connection->getUserId();
if ($memberId === null) {
$this->sendError($connection, 'NOT_AUTHENTICATED', 'Authentication required');
return;
}

$groupId = self::stringFromMixed($payload['group_id'] ?? '');
$memberId = self::stringFromMixed($payload['member_id'] ?? $connection->getId());
$memberName = self::stringFromMixed($payload['member_name'] ?? 'User');
$password = self::stringOrNullFromMixed($payload['password'] ?? null);

Expand All @@ -805,14 +832,14 @@ private function handleGroupJoin(Connection $connection, array $payload): void
/**
* Handle group leave request via WebSocket.
*
* @param Connection $connection The WebSocket connection
* @param ConnectionInterface $connection The WebSocket connection
* @param array<string, mixed> $payload Payload containing member_id
* @return void
*
* @fires Messages::TYPE_INFO Sent on success
* @fires Messages::TYPE_ERROR Sent on failure
*/
private function handleGroupLeave(Connection $connection, array $payload): void
private function handleGroupLeave(ConnectionInterface $connection, array $payload): void
{
$memberId = self::stringFromMixed($payload['member_id'] ?? null);
if ($memberId === '') {
Expand Down Expand Up @@ -880,14 +907,14 @@ private function broadcastToGroup(string $groupId, string $type, array $data, ar
/**
* Send an error message to a specific connection.
*
* @param Connection $connection The WebSocket connection to send to
* @param ConnectionInterface $connection The WebSocket connection to send to
* @param string $code Error code (e.g., 'NOT_IN_GROUP', 'NOT_HOST')
* @param string $message Human-readable error message
* @return void
*
* @see Messages::TYPE_ERROR For the error message format
*/
private function sendError(Connection $connection, string $code, string $message): void
private function sendError(ConnectionInterface $connection, string $code, string $message): void
{
$connection->sendFlat(Messages::TYPE_ERROR, [
'error_code' => $code,
Expand Down
Loading
Loading