Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions src/Plugins/Scrobbler/Trakt/TraktApi.php
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,84 @@ public function refreshAccessToken(string $refreshToken): array
];
}

/**
* Refresh access token after an auth failure (401).
*
* Uses a single-flight mutex so that concurrent 401 responses all await
* the same refresh POST rather than each POSTing their own. This prevents
* Trakt's rotating refresh tokens from invalidating each other.
*
* Uses a two-level guard:
* 1. Static $inFlightRefresh for same-process concurrency (Workerman coroutines)
* 2. flock() for cross-process safety (multiple Workerman workers)
*
* @param string $refreshToken Current refresh token
*
* @return array<string, mixed> Token response with access_token, refresh_token, expires_in
*
* @throws TraktApiException When refresh fails
* @since 0.14.0
*/
public function refreshAfterAuthFailure(string $refreshToken): array
{
static $inFlightRefresh = [];
static $lockFile = null;

if ($lockFile === null) {
$lockFile = sys_get_temp_dir() . '/phlix_trakt_refresh.lock';
}

// Key cache by token hash to avoid cross-token cache pollution
$tokenKey = md5($refreshToken);

// Phase 1: Same-process single-flight (Workerman coroutines on same worker)
// Use \Co\sleep to yield to the event loop instead of blocking with usleep()
while (isset($inFlightRefresh[$tokenKey]) && $inFlightRefresh[$tokenKey] === 'pending') {
if (function_exists('\Co\sleep')) {
\Co\sleep(0.005); // 5ms - yields to event loop in async context
} else {
usleep(5000); // Fallback for non-Swoole (unit tests)
}
}

if (isset($inFlightRefresh[$tokenKey]) && is_array($inFlightRefresh[$tokenKey])) {
/** @var array<string, mixed> $result */
$result = $inFlightRefresh[$tokenKey];

return $result;
}

$inFlightRefresh[$tokenKey] = 'pending';

// Phase 2: Cross-process mutex via flock()
$fp = fopen($lockFile, 'c+');
if (!$fp) {
unset($inFlightRefresh[$tokenKey]);
throw new TraktApiException('Could not open refresh lock file');
}

if (!flock($fp, LOCK_EX)) {
fclose($fp);
unset($inFlightRefresh[$tokenKey]);
throw new TraktApiException('Could not acquire refresh lock');
}

try {
$result = $this->refreshAccessToken($refreshToken);
$inFlightRefresh[$tokenKey] = $result;
flock($fp, LOCK_UN);
fclose($fp);

return $result;
} catch (\Throwable $e) {
unset($inFlightRefresh[$tokenKey]);
flock($fp, LOCK_UN);
fclose($fp);

throw $e;
}
}

/**
* Submit a scrobble start for a media item.
*
Expand Down
133 changes: 131 additions & 2 deletions src/Plugins/Scrobbler/Trakt/TraktPlugin.php
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,28 @@ public function onPlaybackStarted(PlaybackStarted $event): void
'progress' => $progressSecs,
]);
} catch (TraktAuthenticationException $e) {
$this->logger?->warning('Trakt: scrobble start failed (auth)', [
$this->logger?->warning('Trakt: scrobble start failed (auth), attempting token refresh', [
'error' => $e->getMessage(),
]);

if ($this->ensureFreshToken()) {
// Retry once with the new token
try {
$api->scrobbleStart($mediaItem, $progressSecs, $this->settings->accessToken ?? '');
$this->logger?->info('Trakt scrobble start submitted after token refresh', [
'title' => $mediaItem->name,
'progress' => $progressSecs,
]);
} catch (TraktAuthenticationException $retryAuthEx) {
$this->logger?->warning('Trakt: scrobble start still failing after refresh', [
'error' => $retryAuthEx->getMessage(),
]);
} catch (TraktApiException $retryEx) {
$this->logger?->warning('Trakt: scrobble start retry failed', [
'error' => $retryEx->getMessage(),
]);
}
}
} catch (TraktApiException $e) {
$this->logger?->warning('Trakt: scrobble start failed', [
'error' => $e->getMessage(),
Expand Down Expand Up @@ -217,9 +236,28 @@ public function onPlaybackStopped(PlaybackStopped $event): void
'progress' => $progressSecs,
]);
} catch (TraktAuthenticationException $e) {
$this->logger?->warning('Trakt: scrobble stop failed (auth)', [
$this->logger?->warning('Trakt: scrobble stop failed (auth), attempting token refresh', [
'error' => $e->getMessage(),
]);

if ($this->ensureFreshToken()) {
// Retry once with the new token
try {
$api->scrobbleStop($mediaItem, $progressSecs, $this->settings->accessToken ?? '');
$this->logger?->info('Trakt scrobble stop submitted after token refresh', [
'title' => $mediaItem->name,
'progress' => $progressSecs,
]);
} catch (TraktAuthenticationException $retryAuthEx) {
$this->logger?->warning('Trakt: scrobble stop still failing after refresh', [
'error' => $retryAuthEx->getMessage(),
]);
} catch (TraktApiException $retryEx) {
$this->logger?->warning('Trakt: scrobble stop retry failed', [
'error' => $retryEx->getMessage(),
]);
}
}
} catch (TraktApiException $e) {
$this->logger?->warning('Trakt: scrobble stop failed', [
'error' => $e->getMessage(),
Expand Down Expand Up @@ -287,6 +325,97 @@ public function setRefreshToken(string $token): void
);
}

/**
* Refresh the access token after an auth failure, with single-flight guard.
*
* When multiple concurrent scrobble calls each receive a 401, only one
* actual token refresh POST is made. All callers await the same result.
*
* @return bool True if token was refreshed, false if no refresh token available
*
* @since 0.14.0
*/
public function ensureFreshToken(): bool
{
// Single-flight mutex at the plugin level. Since TraktPlugin is a
// singleton per plugin loader, this prevents concurrent coroutines
// on the same worker from each POSTing a refresh.
static $inFlightRefresh = null;

if ($this->api === null) {
$this->logger?->warning('Trakt: API not initialized');

return false;
}

if ($this->settings->refreshToken === null || $this->settings->refreshToken === '') {
$this->logger?->warning('Trakt: no refresh token available');

return false;
}

// Key cache by token hash to avoid cross-token cache pollution
$tokenKey = md5($this->settings->refreshToken);

// If another call is already refreshing, spin until it completes
// Use \Co\sleep to yield to the event loop instead of blocking with usleep()
while (isset($inFlightRefresh[$tokenKey]) && $inFlightRefresh[$tokenKey] === 'pending') {
if (function_exists('\Co\sleep')) {
\Co\sleep(0.005); // 5ms - yields to event loop in async context
} else {
usleep(5000); // Fallback for non-Swoole (unit tests)
}
}

if (isset($inFlightRefresh[$tokenKey]) && is_array($inFlightRefresh[$tokenKey])) {
// Don't null out here - let subsequent calls use the same result
return true;
}

$inFlightRefresh[$tokenKey] = 'pending';

try {
$refreshResult = $this->api->refreshAfterAuthFailure($this->settings->refreshToken);

/** @var string $newAccessToken */
$newAccessToken = is_string($refreshResult['access_token'] ?? null) ? $refreshResult['access_token'] : '';
/** @var string $newRefreshToken */
$newRefreshToken = is_string($refreshResult['refresh_token'] ?? null) ? $refreshResult['refresh_token'] : '';

if ($newAccessToken === '') {
$this->logger?->warning('Trakt: refresh returned empty access token');

return false;
}

$this->setAccessToken($newAccessToken);

if ($newRefreshToken !== '') {
$this->setRefreshToken($newRefreshToken);
}

$inFlightRefresh[$tokenKey] = $refreshResult;

$this->logger?->info('Trakt: token refreshed successfully');

return true;
} catch (TraktApiException $e) {
unset($inFlightRefresh[$tokenKey]);
$this->logger?->warning('Trakt: token refresh failed', [
'error' => $e->getMessage(),
]);

return false;
} catch (\Throwable $e) {
unset($inFlightRefresh[$tokenKey]);
$this->logger?->error('Trakt: token refresh threw', [
'error' => $e->getMessage(),
]);

return false;
}
}

/**
* Initialize the Trakt API client from current settings.
*
Expand Down
46 changes: 46 additions & 0 deletions tests/Unit/Plugins/Scrobbler/Trakt/TraktApiTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,50 @@ public function testGetWatchedHistoryUsesCorrectEndpoint(): void
$this->assertSame('GET', $http->lastMethod);
$this->assertStringContainsString('/users/testuser/watched', $http->lastUrl);
}

/**
* Test that concurrent calls to refreshAfterAuthFailure result in only ONE POST.
*
* This simulates the scenario where multiple scrobble calls each receive a 401
* and all call refreshAfterAuthFailure(). Without single-flight gating, each
* would POST to /oauth/token. With the mutex, only the first call performs
* the POST; subsequent calls await and reuse the same result.
*
* Note: In a real async context (Workerman), concurrent coroutines all see
* the same static $inFlightRefresh and the first one wins. The POST count
* verification proves the single-flight is working.
*/
public function testRefreshAfterAuthFailureIsSingleFlighted(): void
{
$http = new MockHttpClient([
['access_token' => 'refreshed-access', 'refresh_token' => 'refreshed-refresh', 'expires_in' => 7200]
]);
$api = new TraktApi($http, self::CLIENT_ID, self::CLIENT_SECRET, new NullLogger());

// Simulate 3 concurrent 401 failures all calling refreshAfterAuthFailure.
// In a real async context, these would be truly concurrent. In this sync
// test, they run sequentially but the static mutex prevents duplicate POSTs.
$api->refreshAfterAuthFailure('old-refresh-token');
$api->refreshAfterAuthFailure('old-refresh-token');
$api->refreshAfterAuthFailure('old-refresh-token');

// Only ONE POST should have been made to /oauth/token
// (the single-flight mutex ensures subsequent calls use cached result)
$this->assertSame(1, $http->postCallCount);
$this->assertStringContainsString('/oauth/token', $http->lastUrl);
}

/**
* Test that refreshAfterAuthFailure properly propagates errors.
*/
public function testRefreshAfterAuthFailureThrowsOnError(): void
{
$http = new MockHttpClient([['error' => 'invalid_grant', 'error_description' => 'Refresh token expired']]);
$api = new TraktApi($http, self::CLIENT_ID, self::CLIENT_SECRET, new NullLogger());

$this->expectException(TraktApiException::class);
$api->refreshAfterAuthFailure('expired-refresh-token');
}
}

final class MockHttpClient implements HttpClientInterface
Expand All @@ -108,6 +152,7 @@ final class MockHttpClient implements HttpClientInterface
public string $lastUrl = '';
public array $lastData = [];
public array $lastHeaders = [];
public int $postCallCount = 0;

/** @var array<array> */
private array $responses;
Expand Down Expand Up @@ -140,6 +185,7 @@ public function post(string $url, array $data = [], array $headers = []): array
$this->lastUrl = $url;
$this->lastData = $data;
$this->lastHeaders = $headers;
++$this->postCallCount;

return $this->getNextResponse();
}
Expand Down
Loading