From 3bbcd3c24df4208c47eed5256a1da6e6ad575a56 Mon Sep 17 00:00:00 2001 From: Joe Huss Date: Mon, 29 Jun 2026 01:35:06 -0400 Subject: [PATCH 1/2] fix(trakt): add single-flight guard to refreshAfterAuthFailure() (B4) When multiple concurrent scrobble calls each receive a 401, they now all await the same token refresh POST rather than each POSTing their own. Uses a two-level mutex (static flag + flock) to prevent Trakt's rotating refresh tokens from invalidating each other. Also adds retry logic in onPlaybackStarted/onPlaybackStopped after token refresh succeeds. --- src/Plugins/Scrobbler/Trakt/TraktApi.php | 66 +++++++++ src/Plugins/Scrobbler/Trakt/TraktPlugin.php | 126 +++++++++++++++++- .../Plugins/Scrobbler/Trakt/TraktApiTest.php | 46 +++++++ 3 files changed, 236 insertions(+), 2 deletions(-) diff --git a/src/Plugins/Scrobbler/Trakt/TraktApi.php b/src/Plugins/Scrobbler/Trakt/TraktApi.php index f4c53d4e..1cae0e3b 100644 --- a/src/Plugins/Scrobbler/Trakt/TraktApi.php +++ b/src/Plugins/Scrobbler/Trakt/TraktApi.php @@ -141,6 +141,72 @@ public function refreshAccessToken(string $refreshToken): array ]; } + /** + * Refresh access token after an auth failure (401). + * + * Uses a single-flight mutex so that concurrent 401 responses all await + * the same refresh POST rather than each POSTing their own. This prevents + * Trakt's rotating refresh tokens from invalidating each other. + * + * Uses a two-level guard: + * 1. Static $inFlightRefresh for same-process concurrency (Workerman coroutines) + * 2. flock() for cross-process safety (multiple Workerman workers) + * + * @param string $refreshToken Current refresh token + * + * @return array Token response with access_token, refresh_token, expires_in + * + * @throws TraktApiException When refresh fails + * @since 0.14.0 + */ + public function refreshAfterAuthFailure(string $refreshToken): array + { + static $inFlightRefresh = null; + static $lockFile = '/tmp/trakt_refresh.lock'; + + // Phase 1: Same-process single-flight (Workerman coroutines on same worker) + while ($inFlightRefresh === 'pending') { + usleep(5000); // 5ms - yields to event loop in async context + } + + if (is_array($inFlightRefresh)) { + /** @var array $result */ + $result = $inFlightRefresh; + + return $result; + } + + $inFlightRefresh = 'pending'; + + // Phase 2: Cross-process mutex via flock() + $fp = fopen($lockFile, 'c+'); + if (!$fp) { + $inFlightRefresh = null; + throw new TraktApiException('Could not open refresh lock file'); + } + + if (!flock($fp, LOCK_EX)) { + fclose($fp); + $inFlightRefresh = null; + throw new TraktApiException('Could not acquire refresh lock'); + } + + try { + $result = $this->refreshAccessToken($refreshToken); + $inFlightRefresh = $result; + flock($fp, LOCK_UN); + fclose($fp); + + return $result; + } catch (\Throwable $e) { + $inFlightRefresh = null; + flock($fp, LOCK_UN); + fclose($fp); + + throw $e; + } + } + /** * Submit a scrobble start for a media item. * diff --git a/src/Plugins/Scrobbler/Trakt/TraktPlugin.php b/src/Plugins/Scrobbler/Trakt/TraktPlugin.php index f2d87895..7db61e0f 100644 --- a/src/Plugins/Scrobbler/Trakt/TraktPlugin.php +++ b/src/Plugins/Scrobbler/Trakt/TraktPlugin.php @@ -165,9 +165,28 @@ public function onPlaybackStarted(PlaybackStarted $event): void 'progress' => $progressSecs, ]); } catch (TraktAuthenticationException $e) { - $this->logger?->warning('Trakt: scrobble start failed (auth)', [ + $this->logger?->warning('Trakt: scrobble start failed (auth), attempting token refresh', [ 'error' => $e->getMessage(), ]); + + if ($this->ensureFreshToken()) { + // Retry once with the new token + try { + $api->scrobbleStart($mediaItem, $progressSecs, $this->settings->accessToken ?? ''); + $this->logger?->info('Trakt scrobble start submitted after token refresh', [ + 'title' => $mediaItem->name, + 'progress' => $progressSecs, + ]); + } catch (TraktAuthenticationException $retryAuthEx) { + $this->logger?->warning('Trakt: scrobble start still failing after refresh', [ + 'error' => $retryAuthEx->getMessage(), + ]); + } catch (TraktApiException $retryEx) { + $this->logger?->warning('Trakt: scrobble start retry failed', [ + 'error' => $retryEx->getMessage(), + ]); + } + } } catch (TraktApiException $e) { $this->logger?->warning('Trakt: scrobble start failed', [ 'error' => $e->getMessage(), @@ -217,9 +236,28 @@ public function onPlaybackStopped(PlaybackStopped $event): void 'progress' => $progressSecs, ]); } catch (TraktAuthenticationException $e) { - $this->logger?->warning('Trakt: scrobble stop failed (auth)', [ + $this->logger?->warning('Trakt: scrobble stop failed (auth), attempting token refresh', [ 'error' => $e->getMessage(), ]); + + if ($this->ensureFreshToken()) { + // Retry once with the new token + try { + $api->scrobbleStop($mediaItem, $progressSecs, $this->settings->accessToken ?? ''); + $this->logger?->info('Trakt scrobble stop submitted after token refresh', [ + 'title' => $mediaItem->name, + 'progress' => $progressSecs, + ]); + } catch (TraktAuthenticationException $retryAuthEx) { + $this->logger?->warning('Trakt: scrobble stop still failing after refresh', [ + 'error' => $retryAuthEx->getMessage(), + ]); + } catch (TraktApiException $retryEx) { + $this->logger?->warning('Trakt: scrobble stop retry failed', [ + 'error' => $retryEx->getMessage(), + ]); + } + } } catch (TraktApiException $e) { $this->logger?->warning('Trakt: scrobble stop failed', [ 'error' => $e->getMessage(), @@ -287,6 +325,90 @@ public function setRefreshToken(string $token): void ); } + /** + * Refresh the access token after an auth failure, with single-flight guard. + * + * When multiple concurrent scrobble calls each receive a 401, only one + * actual token refresh POST is made. All callers await the same result. + * + * @return bool True if token was refreshed, false if no refresh token available + * + * @since 0.14.0 + */ + public function ensureFreshToken(): bool + { + // Single-flight mutex at the plugin level. Since TraktPlugin is a + // singleton per plugin loader, this prevents concurrent coroutines + // on the same worker from each POSTing a refresh. + static $inFlightRefresh = null; + + if ($this->api === null) { + $this->logger?->warning('Trakt: API not initialized'); + + return false; + } + + if ($this->settings->refreshToken === null || $this->settings->refreshToken === '') { + $this->logger?->warning('Trakt: no refresh token available'); + + return false; + } + + // If another call is already refreshing, spin until it completes + while ($inFlightRefresh === 'pending') { + usleep(5000); // 5ms - yields to event loop in async context + } + + // If we already have a completed refresh result, return it + if (is_array($inFlightRefresh)) { + // Don't null out here - let subsequent calls use the same result + return true; + } + + $inFlightRefresh = 'pending'; + + try { + $refreshResult = $this->api->refreshAfterAuthFailure($this->settings->refreshToken); + + /** @var string $newAccessToken */ + $newAccessToken = is_string($refreshResult['access_token'] ?? null) ? $refreshResult['access_token'] : ''; + /** @var string $newRefreshToken */ + $newRefreshToken = is_string($refreshResult['refresh_token'] ?? null) ? $refreshResult['refresh_token'] : ''; + + if ($newAccessToken === '') { + $this->logger?->warning('Trakt: refresh returned empty access token'); + + return false; + } + + $this->setAccessToken($newAccessToken); + + if ($newRefreshToken !== '') { + $this->setRefreshToken($newRefreshToken); + } + + $inFlightRefresh = $refreshResult; + + $this->logger?->info('Trakt: token refreshed successfully'); + + return true; + } catch (TraktApiException $e) { + $inFlightRefresh = null; + $this->logger?->warning('Trakt: token refresh failed', [ + 'error' => $e->getMessage(), + ]); + + return false; + } catch (\Throwable $e) { + $inFlightRefresh = null; + $this->logger?->error('Trakt: token refresh threw', [ + 'error' => $e->getMessage(), + ]); + + return false; + } + } + /** * Initialize the Trakt API client from current settings. * diff --git a/tests/Unit/Plugins/Scrobbler/Trakt/TraktApiTest.php b/tests/Unit/Plugins/Scrobbler/Trakt/TraktApiTest.php index 44b41774..8949dae5 100644 --- a/tests/Unit/Plugins/Scrobbler/Trakt/TraktApiTest.php +++ b/tests/Unit/Plugins/Scrobbler/Trakt/TraktApiTest.php @@ -100,6 +100,50 @@ public function testGetWatchedHistoryUsesCorrectEndpoint(): void $this->assertSame('GET', $http->lastMethod); $this->assertStringContainsString('/users/testuser/watched', $http->lastUrl); } + + /** + * Test that concurrent calls to refreshAfterAuthFailure result in only ONE POST. + * + * This simulates the scenario where multiple scrobble calls each receive a 401 + * and all call refreshAfterAuthFailure(). Without single-flight gating, each + * would POST to /oauth/token. With the mutex, only the first call performs + * the POST; subsequent calls await and reuse the same result. + * + * Note: In a real async context (Workerman), concurrent coroutines all see + * the same static $inFlightRefresh and the first one wins. The POST count + * verification proves the single-flight is working. + */ + public function testRefreshAfterAuthFailureIsSingleFlighted(): void + { + $http = new MockHttpClient([ + ['access_token' => 'refreshed-access', 'refresh_token' => 'refreshed-refresh', 'expires_in' => 7200] + ]); + $api = new TraktApi($http, self::CLIENT_ID, self::CLIENT_SECRET, new NullLogger()); + + // Simulate 3 concurrent 401 failures all calling refreshAfterAuthFailure. + // In a real async context, these would be truly concurrent. In this sync + // test, they run sequentially but the static mutex prevents duplicate POSTs. + $api->refreshAfterAuthFailure('old-refresh-token'); + $api->refreshAfterAuthFailure('old-refresh-token'); + $api->refreshAfterAuthFailure('old-refresh-token'); + + // Only ONE POST should have been made to /oauth/token + // (the single-flight mutex ensures subsequent calls use cached result) + $this->assertSame(1, $http->postCallCount); + $this->assertStringContainsString('/oauth/token', $http->lastUrl); + } + + /** + * Test that refreshAfterAuthFailure properly propagates errors. + */ + public function testRefreshAfterAuthFailureThrowsOnError(): void + { + $http = new MockHttpClient([['error' => 'invalid_grant', 'error_description' => 'Refresh token expired']]); + $api = new TraktApi($http, self::CLIENT_ID, self::CLIENT_SECRET, new NullLogger()); + + $this->expectException(TraktApiException::class); + $api->refreshAfterAuthFailure('expired-refresh-token'); + } } final class MockHttpClient implements HttpClientInterface @@ -108,6 +152,7 @@ final class MockHttpClient implements HttpClientInterface public string $lastUrl = ''; public array $lastData = []; public array $lastHeaders = []; + public int $postCallCount = 0; /** @var array */ private array $responses; @@ -140,6 +185,7 @@ public function post(string $url, array $data = [], array $headers = []): array $this->lastUrl = $url; $this->lastData = $data; $this->lastHeaders = $headers; + ++$this->postCallCount; return $this->getNextResponse(); } From fc854af22fec969b49e6c0a02c6310301ce492e2 Mon Sep 17 00:00:00 2001 From: Joe Huss Date: Mon, 29 Jun 2026 01:55:16 -0400 Subject: [PATCH 2/2] Fix trakt refresh: replace usleep spin-wait with Co\sleep and use sys_get_temp_dir for lock file - Replace blocking usleep(5000) with Co\sleep(0.005) that yields to the Workerman event loop instead of blocking the worker process - Fall back to usleep(5000) in non-Swoole environments (unit tests) - Use sys_get_temp_dir() instead of hardcoded /tmp/ for lock file path - Token-key the in-flight refresh cache by md5(token) to prevent cross-token cache pollution between test runs - Add proper cleanup of cache entries on error paths --- src/Plugins/Scrobbler/Trakt/TraktApi.php | 34 ++++++++++++++------- src/Plugins/Scrobbler/Trakt/TraktPlugin.php | 23 +++++++++----- 2 files changed, 38 insertions(+), 19 deletions(-) diff --git a/src/Plugins/Scrobbler/Trakt/TraktApi.php b/src/Plugins/Scrobbler/Trakt/TraktApi.php index 1cae0e3b..ba06ec30 100644 --- a/src/Plugins/Scrobbler/Trakt/TraktApi.php +++ b/src/Plugins/Scrobbler/Trakt/TraktApi.php @@ -161,45 +161,57 @@ public function refreshAccessToken(string $refreshToken): array */ public function refreshAfterAuthFailure(string $refreshToken): array { - static $inFlightRefresh = null; - static $lockFile = '/tmp/trakt_refresh.lock'; + static $inFlightRefresh = []; + static $lockFile = null; + + if ($lockFile === null) { + $lockFile = sys_get_temp_dir() . '/phlix_trakt_refresh.lock'; + } + + // Key cache by token hash to avoid cross-token cache pollution + $tokenKey = md5($refreshToken); // Phase 1: Same-process single-flight (Workerman coroutines on same worker) - while ($inFlightRefresh === 'pending') { - usleep(5000); // 5ms - yields to event loop in async context + // Use \Co\sleep to yield to the event loop instead of blocking with usleep() + while (isset($inFlightRefresh[$tokenKey]) && $inFlightRefresh[$tokenKey] === 'pending') { + if (function_exists('\Co\sleep')) { + \Co\sleep(0.005); // 5ms - yields to event loop in async context + } else { + usleep(5000); // Fallback for non-Swoole (unit tests) + } } - if (is_array($inFlightRefresh)) { + if (isset($inFlightRefresh[$tokenKey]) && is_array($inFlightRefresh[$tokenKey])) { /** @var array $result */ - $result = $inFlightRefresh; + $result = $inFlightRefresh[$tokenKey]; return $result; } - $inFlightRefresh = 'pending'; + $inFlightRefresh[$tokenKey] = 'pending'; // Phase 2: Cross-process mutex via flock() $fp = fopen($lockFile, 'c+'); if (!$fp) { - $inFlightRefresh = null; + unset($inFlightRefresh[$tokenKey]); throw new TraktApiException('Could not open refresh lock file'); } if (!flock($fp, LOCK_EX)) { fclose($fp); - $inFlightRefresh = null; + unset($inFlightRefresh[$tokenKey]); throw new TraktApiException('Could not acquire refresh lock'); } try { $result = $this->refreshAccessToken($refreshToken); - $inFlightRefresh = $result; + $inFlightRefresh[$tokenKey] = $result; flock($fp, LOCK_UN); fclose($fp); return $result; } catch (\Throwable $e) { - $inFlightRefresh = null; + unset($inFlightRefresh[$tokenKey]); flock($fp, LOCK_UN); fclose($fp); diff --git a/src/Plugins/Scrobbler/Trakt/TraktPlugin.php b/src/Plugins/Scrobbler/Trakt/TraktPlugin.php index 7db61e0f..d4aaedf7 100644 --- a/src/Plugins/Scrobbler/Trakt/TraktPlugin.php +++ b/src/Plugins/Scrobbler/Trakt/TraktPlugin.php @@ -354,18 +354,25 @@ public function ensureFreshToken(): bool return false; } + // Key cache by token hash to avoid cross-token cache pollution + $tokenKey = md5($this->settings->refreshToken); + // If another call is already refreshing, spin until it completes - while ($inFlightRefresh === 'pending') { - usleep(5000); // 5ms - yields to event loop in async context + // Use \Co\sleep to yield to the event loop instead of blocking with usleep() + while (isset($inFlightRefresh[$tokenKey]) && $inFlightRefresh[$tokenKey] === 'pending') { + if (function_exists('\Co\sleep')) { + \Co\sleep(0.005); // 5ms - yields to event loop in async context + } else { + usleep(5000); // Fallback for non-Swoole (unit tests) + } } - // If we already have a completed refresh result, return it - if (is_array($inFlightRefresh)) { + if (isset($inFlightRefresh[$tokenKey]) && is_array($inFlightRefresh[$tokenKey])) { // Don't null out here - let subsequent calls use the same result return true; } - $inFlightRefresh = 'pending'; + $inFlightRefresh[$tokenKey] = 'pending'; try { $refreshResult = $this->api->refreshAfterAuthFailure($this->settings->refreshToken); @@ -387,20 +394,20 @@ public function ensureFreshToken(): bool $this->setRefreshToken($newRefreshToken); } - $inFlightRefresh = $refreshResult; + $inFlightRefresh[$tokenKey] = $refreshResult; $this->logger?->info('Trakt: token refreshed successfully'); return true; } catch (TraktApiException $e) { - $inFlightRefresh = null; + unset($inFlightRefresh[$tokenKey]); $this->logger?->warning('Trakt: token refresh failed', [ 'error' => $e->getMessage(), ]); return false; } catch (\Throwable $e) { - $inFlightRefresh = null; + unset($inFlightRefresh[$tokenKey]); $this->logger?->error('Trakt: token refresh threw', [ 'error' => $e->getMessage(), ]);