diff --git a/src/server/api/routers/subscriptionRouter.ts b/src/server/api/routers/subscriptionRouter.ts index 127aeb2a..0d8120df 100644 --- a/src/server/api/routers/subscriptionRouter.ts +++ b/src/server/api/routers/subscriptionRouter.ts @@ -323,10 +323,7 @@ export const syncAfterCheckout = protectedProcedure.handler( // Per-user rate limit: one sync per SYNC_COOLDOWN_SECONDS const lockKey = `sync-checkout-lock:${context.user.id}`; if (redis) { - const acquired = await redis.set(lockKey, "1", { - nx: true, - ex: SYNC_COOLDOWN_SECONDS, - }); + const acquired = await redis.setNX(lockKey, "1", SYNC_COOLDOWN_SECONDS); if (!acquired) { // Cooldown active — return current state without re-syncing return getUserPlanLimits(context.db, context.user.id); diff --git a/src/server/kv.ts b/src/server/kv.ts index bc34f88e..c01495ae 100644 --- a/src/server/kv.ts +++ b/src/server/kv.ts @@ -5,9 +5,10 @@ import { env } from "~/env"; * For "none" (no Redis), falls back to an in-memory Map with TTL. */ -interface KVStore { +export interface KVStore { get: (key: string) => Promise; - set: (key: string, value: string, ttlSeconds: number) => Promise; + set: (key: string, value: string, ttlSeconds?: number) => Promise; + setNX: (key: string, value: string, ttlSeconds?: number) => Promise; } // ── In-memory fallback ────────────────────────────────────────────────────── @@ -34,8 +35,8 @@ class MemoryKV implements KVStore { return entry.value; } - async set(key: string, value: string, ttlSeconds: number) { - const expiresAt = Date.now() + ttlSeconds * 1000; + async set(key: string, value: string, ttlSeconds?: number) { + const expiresAt = ttlSeconds ? Date.now() + ttlSeconds * 1000 : Infinity; this.store.set(key, { value, expiresAt }); // Track the soonest expiry so we only run cleanup when needed @@ -44,6 +45,19 @@ class MemoryKV implements KVStore { } } + async setNX( + key: string, + value: string, + ttlSeconds?: number, + ): Promise { + const entry = this.store.get(key); + if (entry && Date.now() <= entry.expiresAt) { + return false; + } + await this.set(key, value, ttlSeconds); + return true; + } + /** Periodically sweep expired entries so they don't accumulate. */ private scheduleCleanup() { this.cleanupTimer = setInterval(() => { @@ -83,7 +97,18 @@ async function createKVStore(): Promise { return (await redis.get(key)) ?? null; }, async set(key, value, ttlSeconds) { - await redis.set(key, value, { ex: ttlSeconds }); + if (ttlSeconds && ttlSeconds > 0) { + await redis.set(key, value, { ex: ttlSeconds }); + } else { + await redis.set(key, value); + } + }, + async setNX(key, value, ttlSeconds) { + const result = + ttlSeconds && ttlSeconds > 0 + ? await redis.set(key, value, { nx: true, ex: ttlSeconds }) + : await redis.set(key, value, { nx: true }); + return result !== null; }, }; } @@ -93,13 +118,28 @@ async function createKVStore(): Promise { const client = new Redis(env.REDIS_URL!, { maxRetriesPerRequest: 3, }); + client.on("error", (err) => { + console.error("[kv] Redis error:", err.message); + }); return { async get(key) { return await client.get(key); }, async set(key, value, ttlSeconds) { - await client.set(key, value, "EX", ttlSeconds); + if (ttlSeconds && ttlSeconds > 0) { + await client.set(key, value, "EX", ttlSeconds); + } else { + await client.set(key, value); + } + }, + async setNX(key, value, ttlSeconds) { + if (ttlSeconds && ttlSeconds > 0) { + const result = await client.set(key, value, "EX", ttlSeconds, "NX"); + return result === "OK"; + } + const result = await client.set(key, value, "NX"); + return result === "OK"; }, }; } diff --git a/src/server/subscriptions/kv.ts b/src/server/subscriptions/kv.ts index 9f2257d9..436f9249 100644 --- a/src/server/subscriptions/kv.ts +++ b/src/server/subscriptions/kv.ts @@ -1,34 +1,15 @@ -import { Redis } from "@upstash/redis"; import { and, eq, inArray, sql } from "drizzle-orm"; -import { - determinePlanFromProductId, - IS_BILLING_ENABLED, - polarClient, -} from "./polar"; +import { determinePlanFromProductId, polarClient } from "./polar"; import { getEffectivePlanConfig, PLANS } from "./plans"; import { deactivateExcessFeeds, isAdminUser } from "./helpers"; import type { PlanId } from "./plans"; import type { db as Database } from "~/server/db"; import { feeds, user } from "~/server/db/schema"; -import { env } from "~/env"; +import { getKV } from "~/server/kv"; type DB = typeof Database; -// --------------------------------------------------------------------------- -// Upstash client — null when billing is disabled or creds are missing. -// Consumers must handle null gracefully (fall through to Polar API). -// --------------------------------------------------------------------------- - -const hasUpstashCredentials = - !!env.UPSTASH_REDIS_REST_URL && !!env.UPSTASH_REDIS_REST_TOKEN; - -export const redis = - IS_BILLING_ENABLED && hasUpstashCredentials - ? new Redis({ - url: env.UPSTASH_REDIS_REST_URL!, - token: env.UPSTASH_REDIS_REST_TOKEN!, - }) - : null; +export const redis = await getKV(); // --------------------------------------------------------------------------- // Cached subscription type — stored as JSON at `polar:sub:{userId}` @@ -52,11 +33,10 @@ function kvKey(userId: string) { return `polar:sub:${userId}`; } -const KV_TTL_SECONDS = 86_400; // 24 hours — safety net; active syncs keep data fresh - // --------------------------------------------------------------------------- // syncPolarDataToKV — the single source-of-truth sync function. // Fetches the latest subscription state from Polar and writes it to KV. +// No TTL: we keep the data indefinitely so we never need to hit Polar on reads. // --------------------------------------------------------------------------- export async function syncPolarDataToKV( @@ -117,9 +97,7 @@ export async function syncPolarDataToKV( // Write to KV (best-effort — failure here is non-fatal) if (redis) { try { - await redis.set(kvKey(userId), JSON.stringify(data), { - ex: KV_TTL_SECONDS, - }); + await redis.set(kvKey(userId), JSON.stringify(data)); } catch (e) { console.warn( `[kv] Failed to write subscription cache for user ${userId}:`, @@ -165,16 +143,10 @@ export async function getSubscriptionFromKV( if (!redis) return null; try { - const raw = await redis.get(kvKey(userId)); + const raw = await redis.get(kvKey(userId)); if (!raw) return null; - // @upstash/redis may auto-parse JSON, handle both string and object - const data = - typeof raw === "string" - ? (JSON.parse(raw) as PolarSubscriptionCache) - : (raw as unknown as PolarSubscriptionCache); - - return data; + return JSON.parse(raw) as PolarSubscriptionCache; } catch (e) { console.warn( `[kv] Failed to read subscription cache for user ${userId}:`,