Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/server/api/routers/subscriptionRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,7 @@ export const syncAfterCheckout = protectedProcedure.handler(
// Per-user rate limit: one sync per SYNC_COOLDOWN_SECONDS
const lockKey = `sync-checkout-lock:${context.user.id}`;
if (redis) {
const acquired = await redis.set(lockKey, "1", {
nx: true,
ex: SYNC_COOLDOWN_SECONDS,
});
const acquired = await redis.setNX(lockKey, "1", SYNC_COOLDOWN_SECONDS);
if (!acquired) {
// Cooldown active — return current state without re-syncing
return getUserPlanLimits(context.db, context.user.id);
Expand Down
52 changes: 46 additions & 6 deletions src/server/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import { env } from "~/env";
* For "none" (no Redis), falls back to an in-memory Map with TTL.
*/

interface KVStore {
export interface KVStore {
get: (key: string) => Promise<string | null>;
set: (key: string, value: string, ttlSeconds: number) => Promise<void>;
set: (key: string, value: string, ttlSeconds?: number) => Promise<void>;
setNX: (key: string, value: string, ttlSeconds?: number) => Promise<boolean>;
}

// ── In-memory fallback ──────────────────────────────────────────────────────
Expand All @@ -34,8 +35,8 @@ class MemoryKV implements KVStore {
return entry.value;
}

async set(key: string, value: string, ttlSeconds: number) {
const expiresAt = Date.now() + ttlSeconds * 1000;
async set(key: string, value: string, ttlSeconds?: number) {
const expiresAt = ttlSeconds ? Date.now() + ttlSeconds * 1000 : Infinity;
this.store.set(key, { value, expiresAt });

// Track the soonest expiry so we only run cleanup when needed
Expand All @@ -44,6 +45,19 @@ class MemoryKV implements KVStore {
}
}

async setNX(
key: string,
value: string,
ttlSeconds?: number,
): Promise<boolean> {
const entry = this.store.get(key);
if (entry && Date.now() <= entry.expiresAt) {
return false;
}
await this.set(key, value, ttlSeconds);
return true;
}

/** Periodically sweep expired entries so they don't accumulate. */
private scheduleCleanup() {
this.cleanupTimer = setInterval(() => {
Expand Down Expand Up @@ -83,7 +97,18 @@ async function createKVStore(): Promise<KVStore> {
return (await redis.get<string>(key)) ?? null;
},
async set(key, value, ttlSeconds) {
await redis.set(key, value, { ex: ttlSeconds });
if (ttlSeconds && ttlSeconds > 0) {
await redis.set(key, value, { ex: ttlSeconds });
} else {
await redis.set(key, value);
}
},
async setNX(key, value, ttlSeconds) {
const result =
ttlSeconds && ttlSeconds > 0
? await redis.set(key, value, { nx: true, ex: ttlSeconds })
: await redis.set(key, value, { nx: true });
return result !== null;
},
};
}
Expand All @@ -93,13 +118,28 @@ async function createKVStore(): Promise<KVStore> {
const client = new Redis(env.REDIS_URL!, {
maxRetriesPerRequest: 3,
});
client.on("error", (err) => {
console.error("[kv] Redis error:", err.message);
});

return {
async get(key) {
return await client.get(key);
},
async set(key, value, ttlSeconds) {
await client.set(key, value, "EX", ttlSeconds);
if (ttlSeconds && ttlSeconds > 0) {
await client.set(key, value, "EX", ttlSeconds);
} else {
await client.set(key, value);
}
},
async setNX(key, value, ttlSeconds) {
if (ttlSeconds && ttlSeconds > 0) {
const result = await client.set(key, value, "EX", ttlSeconds, "NX");
return result === "OK";
}
const result = await client.set(key, value, "NX");
return result === "OK";
},
};
}
Expand Down
42 changes: 7 additions & 35 deletions src/server/subscriptions/kv.ts
Original file line number Diff line number Diff line change
@@ -1,34 +1,15 @@
import { Redis } from "@upstash/redis";
import { and, eq, inArray, sql } from "drizzle-orm";
import {
determinePlanFromProductId,
IS_BILLING_ENABLED,
polarClient,
} from "./polar";
import { determinePlanFromProductId, polarClient } from "./polar";
import { getEffectivePlanConfig, PLANS } from "./plans";
import { deactivateExcessFeeds, isAdminUser } from "./helpers";
import type { PlanId } from "./plans";
import type { db as Database } from "~/server/db";
import { feeds, user } from "~/server/db/schema";
import { env } from "~/env";
import { getKV } from "~/server/kv";

type DB = typeof Database;

// ---------------------------------------------------------------------------
// Upstash client — null when billing is disabled or creds are missing.
// Consumers must handle null gracefully (fall through to Polar API).
// ---------------------------------------------------------------------------

const hasUpstashCredentials =
!!env.UPSTASH_REDIS_REST_URL && !!env.UPSTASH_REDIS_REST_TOKEN;

export const redis =
IS_BILLING_ENABLED && hasUpstashCredentials
? new Redis({
url: env.UPSTASH_REDIS_REST_URL!,
token: env.UPSTASH_REDIS_REST_TOKEN!,
})
: null;
export const redis = await getKV();

// ---------------------------------------------------------------------------
// Cached subscription type — stored as JSON at `polar:sub:{userId}`
Expand All @@ -52,11 +33,10 @@ function kvKey(userId: string) {
return `polar:sub:${userId}`;
}

const KV_TTL_SECONDS = 86_400; // 24 hours — safety net; active syncs keep data fresh

// ---------------------------------------------------------------------------
// syncPolarDataToKV — the single source-of-truth sync function.
// Fetches the latest subscription state from Polar and writes it to KV.
// No TTL: we keep the data indefinitely so we never need to hit Polar on reads.
// ---------------------------------------------------------------------------

export async function syncPolarDataToKV(
Expand Down Expand Up @@ -117,9 +97,7 @@ export async function syncPolarDataToKV(
// Write to KV (best-effort — failure here is non-fatal)
if (redis) {
try {
await redis.set(kvKey(userId), JSON.stringify(data), {
ex: KV_TTL_SECONDS,
});
await redis.set(kvKey(userId), JSON.stringify(data));
} catch (e) {
console.warn(
`[kv] Failed to write subscription cache for user ${userId}:`,
Expand Down Expand Up @@ -165,16 +143,10 @@ export async function getSubscriptionFromKV(
if (!redis) return null;

try {
const raw = await redis.get<string>(kvKey(userId));
const raw = await redis.get(kvKey(userId));
if (!raw) return null;

// @upstash/redis may auto-parse JSON, handle both string and object
const data =
typeof raw === "string"
? (JSON.parse(raw) as PolarSubscriptionCache)
: (raw as unknown as PolarSubscriptionCache);

return data;
return JSON.parse(raw) as PolarSubscriptionCache;
} catch (e) {
console.warn(
`[kv] Failed to read subscription cache for user ${userId}:`,
Expand Down
Loading