From 47602b5724a7323635fda0b61a531a77ac9acc9d Mon Sep 17 00:00:00 2001 From: Henry <48483883+hfellerhoff@users.noreply.github.com> Date: Wed, 6 May 2026 02:42:48 -0400 Subject: [PATCH 1/2] fix ioredis setup --- src/server/api/routers/subscriptionRouter.ts | 5 +- src/server/subscriptions/kv.ts | 106 ++++++++++++++----- 2 files changed, 83 insertions(+), 28 deletions(-) diff --git a/src/server/api/routers/subscriptionRouter.ts b/src/server/api/routers/subscriptionRouter.ts index 127aeb2a..0d8120df 100644 --- a/src/server/api/routers/subscriptionRouter.ts +++ b/src/server/api/routers/subscriptionRouter.ts @@ -323,10 +323,7 @@ export const syncAfterCheckout = protectedProcedure.handler( // Per-user rate limit: one sync per SYNC_COOLDOWN_SECONDS const lockKey = `sync-checkout-lock:${context.user.id}`; if (redis) { - const acquired = await redis.set(lockKey, "1", { - nx: true, - ex: SYNC_COOLDOWN_SECONDS, - }); + const acquired = await redis.setNX(lockKey, "1", SYNC_COOLDOWN_SECONDS); if (!acquired) { // Cooldown active — return current state without re-syncing return getUserPlanLimits(context.db, context.user.id); diff --git a/src/server/subscriptions/kv.ts b/src/server/subscriptions/kv.ts index 9f2257d9..d442c05a 100644 --- a/src/server/subscriptions/kv.ts +++ b/src/server/subscriptions/kv.ts @@ -1,4 +1,3 @@ -import { Redis } from "@upstash/redis"; import { and, eq, inArray, sql } from "drizzle-orm"; import { determinePlanFromProductId, @@ -15,20 +14,88 @@ import { env } from "~/env"; type DB = typeof Database; // --------------------------------------------------------------------------- -// Upstash client — null when billing is disabled or creds are missing. +// Unified KV client — supports Upstash REST, ioredis, or null. // Consumers must handle null gracefully (fall through to Polar API). // --------------------------------------------------------------------------- -const hasUpstashCredentials = - !!env.UPSTASH_REDIS_REST_URL && !!env.UPSTASH_REDIS_REST_TOKEN; +type KVClient = { + get: (key: string) => Promise; + set: (key: string, value: string, ttlSeconds?: number) => Promise; + /** Set only if key does not exist. Returns true if set, false if key already existed. */ + setNX: (key: string, value: string, ttlSeconds?: number) => Promise; +}; + +async function createKVClient(): Promise { + if (!IS_BILLING_ENABLED) return null; -export const redis = - IS_BILLING_ENABLED && hasUpstashCredentials - ? new Redis({ - url: env.UPSTASH_REDIS_REST_URL!, - token: env.UPSTASH_REDIS_REST_TOKEN!, - }) - : null; + if (env.KV_STORE === "upstash") { + if (!env.UPSTASH_REDIS_REST_URL || !env.UPSTASH_REDIS_REST_TOKEN) { + return null; + } + const { Redis } = await import("@upstash/redis"); + const client = new Redis({ + url: env.UPSTASH_REDIS_REST_URL, + token: env.UPSTASH_REDIS_REST_TOKEN, + }); + return { + async get(key) { + const raw = await client.get(key); + if (raw == null) return null; + return typeof raw === "string" ? raw : JSON.stringify(raw); + }, + async set(key, value, ttlSeconds) { + if (ttlSeconds && ttlSeconds > 0) { + await client.set(key, value, { ex: ttlSeconds }); + } else { + await client.set(key, value); + } + }, + async setNX(key, value, ttlSeconds) { + const result = + ttlSeconds && ttlSeconds > 0 + ? await client.set(key, value, { nx: true, ex: ttlSeconds }) + : await client.set(key, value, { nx: true }); + return result !== null; + }, + }; + } + + if (env.KV_STORE === "ioredis") { + if (!env.REDIS_URL) return null; + const { default: Redis } = await import("ioredis"); + const client = new Redis(env.REDIS_URL, { + maxRetriesPerRequest: 3, + }); + client.on("error", (err) => { + console.error("[kv] Redis error:", err.message); + }); + return { + async get(key) { + const raw = await client.get(key); + return raw ?? null; + }, + async set(key, value, ttlSeconds) { + if (ttlSeconds && ttlSeconds > 0) { + await client.set(key, value, "EX", ttlSeconds); + } else { + await client.set(key, value); + } + }, + async setNX(key, value, ttlSeconds) { + if (ttlSeconds && ttlSeconds > 0) { + const result = await client.set(key, value, "EX", ttlSeconds, "NX"); + return result === "OK"; + } + const result = await client.set(key, value, "NX"); + return result === "OK"; + }, + }; + } + + return null; +} + +export const redis = await createKVClient(); // --------------------------------------------------------------------------- // Cached subscription type — stored as JSON at `polar:sub:{userId}` @@ -52,11 +119,10 @@ function kvKey(userId: string) { return `polar:sub:${userId}`; } -const KV_TTL_SECONDS = 86_400; // 24 hours — safety net; active syncs keep data fresh - // --------------------------------------------------------------------------- // syncPolarDataToKV — the single source-of-truth sync function. // Fetches the latest subscription state from Polar and writes it to KV. +// No TTL: we keep the data indefinitely so we never need to hit Polar on reads. // --------------------------------------------------------------------------- export async function syncPolarDataToKV( @@ -117,9 +183,7 @@ export async function syncPolarDataToKV( // Write to KV (best-effort — failure here is non-fatal) if (redis) { try { - await redis.set(kvKey(userId), JSON.stringify(data), { - ex: KV_TTL_SECONDS, - }); + await redis.set(kvKey(userId), JSON.stringify(data)); } catch (e) { console.warn( `[kv] Failed to write subscription cache for user ${userId}:`, @@ -165,16 +229,10 @@ export async function getSubscriptionFromKV( if (!redis) return null; try { - const raw = await redis.get(kvKey(userId)); + const raw = await redis.get(kvKey(userId)); if (!raw) return null; - // @upstash/redis may auto-parse JSON, handle both string and object - const data = - typeof raw === "string" - ? (JSON.parse(raw) as PolarSubscriptionCache) - : (raw as unknown as PolarSubscriptionCache); - - return data; + return JSON.parse(raw) as PolarSubscriptionCache; } catch (e) { console.warn( `[kv] Failed to read subscription cache for user ${userId}:`, From 41691fcf31522c02b1cee0f2ebe96aa0c029f316 Mon Sep 17 00:00:00 2001 From: Henry <48483883+hfellerhoff@users.noreply.github.com> Date: Thu, 7 May 2026 13:53:45 -0400 Subject: [PATCH 2/2] consolidate kv logic --- src/server/kv.ts | 52 ++++++++++++++++--- src/server/subscriptions/kv.ts | 92 ++-------------------------------- 2 files changed, 49 insertions(+), 95 deletions(-) diff --git a/src/server/kv.ts b/src/server/kv.ts index bc34f88e..c01495ae 100644 --- a/src/server/kv.ts +++ b/src/server/kv.ts @@ -5,9 +5,10 @@ import { env } from "~/env"; * For "none" (no Redis), falls back to an in-memory Map with TTL. */ -interface KVStore { +export interface KVStore { get: (key: string) => Promise; - set: (key: string, value: string, ttlSeconds: number) => Promise; + set: (key: string, value: string, ttlSeconds?: number) => Promise; + setNX: (key: string, value: string, ttlSeconds?: number) => Promise; } // ── In-memory fallback ────────────────────────────────────────────────────── @@ -34,8 +35,8 @@ class MemoryKV implements KVStore { return entry.value; } - async set(key: string, value: string, ttlSeconds: number) { - const expiresAt = Date.now() + ttlSeconds * 1000; + async set(key: string, value: string, ttlSeconds?: number) { + const expiresAt = ttlSeconds ? Date.now() + ttlSeconds * 1000 : Infinity; this.store.set(key, { value, expiresAt }); // Track the soonest expiry so we only run cleanup when needed @@ -44,6 +45,19 @@ class MemoryKV implements KVStore { } } + async setNX( + key: string, + value: string, + ttlSeconds?: number, + ): Promise { + const entry = this.store.get(key); + if (entry && Date.now() <= entry.expiresAt) { + return false; + } + await this.set(key, value, ttlSeconds); + return true; + } + /** Periodically sweep expired entries so they don't accumulate. */ private scheduleCleanup() { this.cleanupTimer = setInterval(() => { @@ -83,7 +97,18 @@ async function createKVStore(): Promise { return (await redis.get(key)) ?? null; }, async set(key, value, ttlSeconds) { - await redis.set(key, value, { ex: ttlSeconds }); + if (ttlSeconds && ttlSeconds > 0) { + await redis.set(key, value, { ex: ttlSeconds }); + } else { + await redis.set(key, value); + } + }, + async setNX(key, value, ttlSeconds) { + const result = + ttlSeconds && ttlSeconds > 0 + ? await redis.set(key, value, { nx: true, ex: ttlSeconds }) + : await redis.set(key, value, { nx: true }); + return result !== null; }, }; } @@ -93,13 +118,28 @@ async function createKVStore(): Promise { const client = new Redis(env.REDIS_URL!, { maxRetriesPerRequest: 3, }); + client.on("error", (err) => { + console.error("[kv] Redis error:", err.message); + }); return { async get(key) { return await client.get(key); }, async set(key, value, ttlSeconds) { - await client.set(key, value, "EX", ttlSeconds); + if (ttlSeconds && ttlSeconds > 0) { + await client.set(key, value, "EX", ttlSeconds); + } else { + await client.set(key, value); + } + }, + async setNX(key, value, ttlSeconds) { + if (ttlSeconds && ttlSeconds > 0) { + const result = await client.set(key, value, "EX", ttlSeconds, "NX"); + return result === "OK"; + } + const result = await client.set(key, value, "NX"); + return result === "OK"; }, }; } diff --git a/src/server/subscriptions/kv.ts b/src/server/subscriptions/kv.ts index d442c05a..436f9249 100644 --- a/src/server/subscriptions/kv.ts +++ b/src/server/subscriptions/kv.ts @@ -1,101 +1,15 @@ import { and, eq, inArray, sql } from "drizzle-orm"; -import { - determinePlanFromProductId, - IS_BILLING_ENABLED, - polarClient, -} from "./polar"; +import { determinePlanFromProductId, polarClient } from "./polar"; import { getEffectivePlanConfig, PLANS } from "./plans"; import { deactivateExcessFeeds, isAdminUser } from "./helpers"; import type { PlanId } from "./plans"; import type { db as Database } from "~/server/db"; import { feeds, user } from "~/server/db/schema"; -import { env } from "~/env"; +import { getKV } from "~/server/kv"; type DB = typeof Database; -// --------------------------------------------------------------------------- -// Unified KV client — supports Upstash REST, ioredis, or null. -// Consumers must handle null gracefully (fall through to Polar API). -// --------------------------------------------------------------------------- - -type KVClient = { - get: (key: string) => Promise; - set: (key: string, value: string, ttlSeconds?: number) => Promise; - /** Set only if key does not exist. Returns true if set, false if key already existed. */ - setNX: (key: string, value: string, ttlSeconds?: number) => Promise; -}; - -async function createKVClient(): Promise { - if (!IS_BILLING_ENABLED) return null; - - if (env.KV_STORE === "upstash") { - if (!env.UPSTASH_REDIS_REST_URL || !env.UPSTASH_REDIS_REST_TOKEN) { - return null; - } - const { Redis } = await import("@upstash/redis"); - const client = new Redis({ - url: env.UPSTASH_REDIS_REST_URL, - token: env.UPSTASH_REDIS_REST_TOKEN, - }); - return { - async get(key) { - const raw = await client.get(key); - if (raw == null) return null; - return typeof raw === "string" ? raw : JSON.stringify(raw); - }, - async set(key, value, ttlSeconds) { - if (ttlSeconds && ttlSeconds > 0) { - await client.set(key, value, { ex: ttlSeconds }); - } else { - await client.set(key, value); - } - }, - async setNX(key, value, ttlSeconds) { - const result = - ttlSeconds && ttlSeconds > 0 - ? await client.set(key, value, { nx: true, ex: ttlSeconds }) - : await client.set(key, value, { nx: true }); - return result !== null; - }, - }; - } - - if (env.KV_STORE === "ioredis") { - if (!env.REDIS_URL) return null; - const { default: Redis } = await import("ioredis"); - const client = new Redis(env.REDIS_URL, { - maxRetriesPerRequest: 3, - }); - client.on("error", (err) => { - console.error("[kv] Redis error:", err.message); - }); - return { - async get(key) { - const raw = await client.get(key); - return raw ?? null; - }, - async set(key, value, ttlSeconds) { - if (ttlSeconds && ttlSeconds > 0) { - await client.set(key, value, "EX", ttlSeconds); - } else { - await client.set(key, value); - } - }, - async setNX(key, value, ttlSeconds) { - if (ttlSeconds && ttlSeconds > 0) { - const result = await client.set(key, value, "EX", ttlSeconds, "NX"); - return result === "OK"; - } - const result = await client.set(key, value, "NX"); - return result === "OK"; - }, - }; - } - - return null; -} - -export const redis = await createKVClient(); +export const redis = await getKV(); // --------------------------------------------------------------------------- // Cached subscription type — stored as JSON at `polar:sub:{userId}`