From bccd1a9ff2378ea3d19c53442992fc61d76e30d8 Mon Sep 17 00:00:00 2001 From: Pooya Parsa Date: Tue, 5 May 2026 00:09:01 +0200 Subject: [PATCH 1/5] feat: add ifMatch/ifNoneMatch (CAS) support Conditional `setItem` / `setItemRaw` writes: storage.setItem(key, value, { ifNoneMatch: "*" }) // create-only storage.setItem(key, value, { ifMatch: meta.etag }) // optimistic swap Drivers expose `etag` via `getMeta` and throw `CASMismatchError` on precondition failure. Drivers without `flags.cas` throw `CASUnsupportedError` upfront so silent precondition-skip is impossible. Implemented in: memory, lru-cache, fs (atomic create via temp+link; single-process etag mutex), redis (SET NX + WATCH/MULTI/EXEC, content SHA-1 etag). --- src/drivers/fs.ts | 103 ++++++++++++++++++++++++++++++--- src/drivers/lru-cache.ts | 34 ++++++++++- src/drivers/memory.ts | 31 +++++++++- src/drivers/redis.ts | 85 +++++++++++++++++++++++++++ src/drivers/utils/cas.ts | 90 ++++++++++++++++++++++++++++ src/drivers/utils/node-fs.ts | 20 +++++++ src/errors.ts | 11 ++++ src/index.ts | 1 + src/storage.ts | 25 ++++++-- src/types.ts | 59 +++++++++++++++++-- test/drivers/fs.test.ts | 1 + test/drivers/lru-cache.test.ts | 2 + test/drivers/memory.test.ts | 1 + test/drivers/redis.test.ts | 1 + test/drivers/utils.ts | 62 +++++++++++++++++++- 15 files changed, 501 insertions(+), 25 deletions(-) create mode 100644 src/drivers/utils/cas.ts create mode 100644 src/errors.ts diff --git a/src/drivers/fs.ts b/src/drivers/fs.ts index 3facd0c79..cf634a395 100644 --- a/src/drivers/fs.ts +++ b/src/drivers/fs.ts @@ -5,11 +5,13 @@ import { createError, createRequiredError, type DriverFactory } from "./utils/in import { readFile, writeFile, + writeFileExclusive, readdirRecursive, rmRecursive, unlink, ensuredir, } from "./utils/node-fs.ts"; +import { CASMismatchError, checkCAS } from "./utils/cas.ts"; export interface FSStorageOptions { base?: string; @@ -52,6 +54,32 @@ const driver: DriverFactory = (userOptions = {}) => { return resolved; }; + // Per-key in-process write serialization for non-O_EXCL CAS paths. + // POSIX has no portable file-CAS primitive; this protects against races + // within a single process. Cross-process correctness for `ifMatch` is not + // guaranteed — use an external lock or a CAS-native driver for that. + const writeLocks = new Map>(); + const withLock = async (key: string, fn: () => Promise): Promise => { + const previous = writeLocks.get(key) || Promise.resolve(); + let release!: () => void; + const next = new Promise((r) => { + release = r; + }); + writeLocks.set( + key, + previous.then(() => next), + ); + await previous; + try { + return await fn(); + } finally { + release(); + if (writeLocks.get(key) === next) { + writeLocks.delete(key); + } + } + }; + let _watcher: FSWatcher | undefined; const _unwatch = async () => { if (_watcher) { @@ -60,11 +88,56 @@ const driver: DriverFactory = (userOptions = {}) => { } }; + const writeWithCAS = async ( + key: string, + plainWrite: (path: string) => Promise, + exclusiveWrite: (path: string) => Promise, + opts: { ifMatch?: string; ifNoneMatch?: string } | undefined, + ): Promise<{ etag: string } | undefined> => { + const path = r(key); + const wantsCAS = !!(opts && (opts.ifMatch !== undefined || opts.ifNoneMatch !== undefined)); + + if (!wantsCAS) { + await plainWrite(path); + return undefined; + } + + // Atomic create-only via `link()`; correct across processes. + if (opts!.ifNoneMatch === "*" && opts!.ifMatch === undefined) { + try { + await exclusiveWrite(path); + } catch (err: any) { + if (err?.code === "EEXIST") { + throw new CASMismatchError(DRIVER_NAME, key); + } + throw err; + } + const stats = await fsp.stat(path); + return { etag: statEtag(stats) }; + } + + // Etag-based CAS: best-effort, single-process. POSIX has no portable + // file-CAS primitive; cross-process callers should layer an external lock. + return withLock(path, async () => { + const stats = await fsp.stat(path).catch(() => null); + checkCAS( + DRIVER_NAME, + key, + { exists: !!stats, etag: stats ? statEtag(stats) : undefined }, + opts!, + ); + await plainWrite(path); + const newStats = await fsp.stat(path); + return { etag: statEtag(newStats) }; + }); + }; + return { name: DRIVER_NAME, options: userOptions, flags: { maxDepth: true, + cas: true, }, hasItem(key) { return existsSync(r(key)); @@ -76,22 +149,32 @@ const driver: DriverFactory = (userOptions = {}) => { return readFile(r(key)); }, async getMeta(key) { - const { atime, mtime, size, birthtime, ctime } = await fsp - .stat(r(key)) - .catch(() => ({}) as Stats); - return { atime, mtime, size, birthtime, ctime }; + const stats = await fsp.stat(r(key)).catch(() => ({}) as Stats); + const { atime, mtime, size, birthtime, ctime } = stats; + const etag = stats && stats.mtimeMs ? statEtag(stats) : undefined; + return { atime, mtime, size, birthtime, ctime, etag }; }, - setItem(key, value) { + async setItem(key, value, opts) { if (userOptions.readOnly) { return; } - return writeFile(r(key), value, "utf8"); + return writeWithCAS( + key, + (path) => writeFile(path, value, "utf8"), + (path) => writeFileExclusive(path, value, "utf8"), + opts, + ); }, - setItemRaw(key, value) { + async setItemRaw(key, value, opts) { if (userOptions.readOnly) { return; } - return writeFile(r(key), value); + return writeWithCAS( + key, + (path) => writeFile(path, value), + (path) => writeFileExclusive(path, value), + opts, + ); }, removeItem(key) { if (userOptions.readOnly) { @@ -151,4 +234,8 @@ const driver: DriverFactory = (userOptions = {}) => { }; }; +function statEtag(stats: { mtimeMs: number; size: number; ino: number }): string { + return `${stats.mtimeMs.toString(16)}-${stats.size.toString(16)}-${stats.ino.toString(16)}`; +} + export default driver; diff --git a/src/drivers/lru-cache.ts b/src/drivers/lru-cache.ts index c2d73ccbd..3935d5902 100644 --- a/src/drivers/lru-cache.ts +++ b/src/drivers/lru-cache.ts @@ -1,5 +1,6 @@ import { type DriverFactory } from "./utils/index.ts"; import { LRUCache } from "lru-cache"; +import { checkCAS } from "./utils/cas.ts"; type LRUCacheOptions = LRUCache.OptionsBase & Partial> & @@ -11,6 +12,11 @@ export interface LRUDriverOptions extends LRUCacheOptions {} const DRIVER_NAME = "lru-cache"; const driver: DriverFactory> = (opts = {}) => { + const etags = new Map(); + let counter = 0; + const nextEtag = () => String(++counter); + + const userDispose = opts.dispose; const cache = new LRUCache({ max: 1000, sizeCalculation: @@ -20,10 +26,15 @@ const driver: DriverFactory> = (opt } : undefined, ...opts, + dispose(value, key, reason) { + etags.delete(key); + userDispose?.(value, key, reason); + }, }); return { name: DRIVER_NAME, + flags: { cas: true }, options: opts, getInstance: () => cache, hasItem(key) { @@ -35,11 +46,28 @@ const driver: DriverFactory> = (opt getItemRaw(key) { return cache.get(key) ?? null; }, - setItem(key, value) { + getMeta(key) { + return cache.has(key) ? { etag: etags.get(key) } : null; + }, + setItem(key, value, tOpts) { + const wantsCAS = tOpts?.ifMatch !== undefined || tOpts?.ifNoneMatch !== undefined; + if (wantsCAS) { + checkCAS(DRIVER_NAME, key, { exists: cache.has(key), etag: etags.get(key) }, tOpts); + } cache.set(key, value); + const etag = nextEtag(); + etags.set(key, etag); + return wantsCAS ? { etag } : undefined; }, - setItemRaw(key, value) { + setItemRaw(key, value, tOpts) { + const wantsCAS = tOpts?.ifMatch !== undefined || tOpts?.ifNoneMatch !== undefined; + if (wantsCAS) { + checkCAS(DRIVER_NAME, key, { exists: cache.has(key), etag: etags.get(key) }, tOpts); + } cache.set(key, value); + const etag = nextEtag(); + etags.set(key, etag); + return wantsCAS ? { etag } : undefined; }, removeItem(key) { cache.delete(key); @@ -49,9 +77,11 @@ const driver: DriverFactory> = (opt }, clear() { cache.clear(); + etags.clear(); }, dispose() { cache.clear(); + etags.clear(); }, }; }; diff --git a/src/drivers/memory.ts b/src/drivers/memory.ts index 8a8246504..6b8f55038 100644 --- a/src/drivers/memory.ts +++ b/src/drivers/memory.ts @@ -1,13 +1,18 @@ import { type DriverFactory } from "./utils/index.ts"; +import { checkCAS } from "./utils/cas.ts"; const DRIVER_NAME = "memory"; const driver: DriverFactory> = () => { const data = new Map(); + const etags = new Map(); const timers = new Map>(); + let counter = 0; + const nextEtag = () => String(++counter); return { name: DRIVER_NAME, + flags: { cas: true }, getInstance: () => data, hasItem(key) { return data.has(key); @@ -18,19 +23,37 @@ const driver: DriverFactory> = () => { getItemRaw(key) { return data.get(key) ?? null; }, + getMeta(key) { + return data.has(key) ? { etag: etags.get(key) } : null; + }, setItem(key, value, opts) { + const wantsCAS = opts?.ifMatch !== undefined || opts?.ifNoneMatch !== undefined; + if (wantsCAS) { + checkCAS(DRIVER_NAME, key, { exists: data.has(key), etag: etags.get(key) }, opts); + } _clearTimer(timers, key); data.set(key, value); - _scheduleExpiry(data, timers, key, opts?.ttl); + const etag = nextEtag(); + etags.set(key, etag); + _scheduleExpiry(data, etags, timers, key, opts?.ttl); + return wantsCAS ? { etag } : undefined; }, setItemRaw(key, value, opts) { + const wantsCAS = opts?.ifMatch !== undefined || opts?.ifNoneMatch !== undefined; + if (wantsCAS) { + checkCAS(DRIVER_NAME, key, { exists: data.has(key), etag: etags.get(key) }, opts); + } _clearTimer(timers, key); data.set(key, value); - _scheduleExpiry(data, timers, key, opts?.ttl); + const etag = nextEtag(); + etags.set(key, etag); + _scheduleExpiry(data, etags, timers, key, opts?.ttl); + return wantsCAS ? { etag } : undefined; }, removeItem(key) { _clearTimer(timers, key); data.delete(key); + etags.delete(key); }, getKeys() { return [...data.keys()]; @@ -41,6 +64,7 @@ const driver: DriverFactory> = () => { } timers.clear(); data.clear(); + etags.clear(); }, dispose() { for (const timer of timers.values()) { @@ -48,6 +72,7 @@ const driver: DriverFactory> = () => { } timers.clear(); data.clear(); + etags.clear(); }, }; }; @@ -66,6 +91,7 @@ function _clearTimer(timers: Map>, key: st function _scheduleExpiry( data: Map, + etags: Map, timers: Map>, key: string, ttl?: number, @@ -76,6 +102,7 @@ function _scheduleExpiry( const ttlMs = ttl * 1000; const timer = setTimeout(() => { data.delete(key); + etags.delete(key); timers.delete(key); }, ttlMs); if (timer && typeof timer === "object" && "unref" in timer) { diff --git a/src/drivers/redis.ts b/src/drivers/redis.ts index 8d00fa116..14bfc888c 100644 --- a/src/drivers/redis.ts +++ b/src/drivers/redis.ts @@ -1,8 +1,20 @@ +import { createHash } from "node:crypto"; import { type DriverFactory, joinKeys } from "./utils/index.ts"; import { Cluster, Redis } from "ioredis"; +import { CASMismatchError } from "./utils/cas.ts"; import type { ClusterOptions, ClusterNode, RedisOptions as _RedisOptions } from "ioredis"; +// Content-addressable etag: SHA-1 of the stored bytes. Stable across writers +// of the same value; no companion key or Lua script (compatible with the +// ioredis-mock subset that lacks `redis.sha1hex` / full EVAL). +const computeEtag = (value: string | Buffer | number): string => { + const buf = Buffer.isBuffer(value) + ? value + : Buffer.from(typeof value === "string" ? value : String(value)); + return createHash("sha1").update(buf).digest("hex"); +}; + export interface RedisOptions extends _RedisOptions { /** * Optional prefix to use for all keys. Can be used for namespacing. @@ -66,6 +78,68 @@ const driver: DriverFactory = (opts) => { const p = (...keys: string[]) => joinKeys(base, ...keys); // Prefix a key. Uses base for backwards compatibility const d = (key: string) => (base ? key.replace(`${base}:`, "") : key); // Deprefix a key + const setWithCAS = async ( + key: string, + value: string | Buffer | number, + tOptions: { ifMatch?: string; ifNoneMatch?: string; ttl?: number } | undefined, + ): Promise<{ etag: string }> => { + const k = p(key); + const client = getRedisClient(); + const ttl = tOptions?.ttl ?? opts.ttl ?? 0; + const ifMatch = tOptions?.ifMatch; + const ifNoneMatch = tOptions?.ifNoneMatch; + + // Fast path: atomic create-only via `SET ... NX`. + if (ifNoneMatch === "*" && ifMatch === undefined) { + const result = ttl + ? await client.set(k, value as any, "EX", ttl, "NX") + : await client.set(k, value as any, "NX"); + if (result === null) { + throw new CASMismatchError(DRIVER_NAME, key); + } + return { etag: computeEtag(value) }; + } + + // General path: WATCH + check + MULTI/EXEC. EXEC returns null if the + // watched key was modified between WATCH and EXEC, which we surface as + // a CAS mismatch (the caller's read became stale). + await client.watch(k); + try { + const cur = await client.getBuffer(k); + const exists = cur !== null; + const curEtag = exists ? computeEtag(cur) : undefined; + + let mismatch = false; + if (ifNoneMatch !== undefined) { + mismatch = + ifNoneMatch === "*" ? exists : exists && curEtag === ifNoneMatch; + } + if (!mismatch && ifMatch !== undefined) { + mismatch = + ifMatch === "*" ? !exists : !exists || curEtag !== ifMatch; + } + if (mismatch) { + await client.unwatch(); + throw new CASMismatchError(DRIVER_NAME, key); + } + + const multi = client.multi(); + if (ttl) { + multi.set(k, value as any, "EX", ttl); + } else { + multi.set(k, value as any); + } + const result = await multi.exec(); + if (result === null) { + throw new CASMismatchError(DRIVER_NAME, key); + } + return { etag: computeEtag(value) }; + } catch (err) { + await client.unwatch().catch(() => {}); + throw err; + } + }; + if (opts.preConnect) { try { getRedisClient(); @@ -90,6 +164,7 @@ const driver: DriverFactory = (opts) => { return { name: DRIVER_NAME, + flags: { cas: true }, options: opts, getInstance: getRedisClient, async hasItem(key) { @@ -114,7 +189,14 @@ const driver: DriverFactory = (opts) => { }; }); }, + async getMeta(key) { + const cur = await getRedisClient().getBuffer(p(key)); + return cur === null ? null : { etag: computeEtag(cur) }; + }, async setItem(key, value, tOptions) { + if (tOptions?.ifMatch !== undefined || tOptions?.ifNoneMatch !== undefined) { + return setWithCAS(key, value, tOptions); + } const ttl = tOptions?.ttl ?? opts.ttl; if (ttl) { await getRedisClient().set(p(key), value, "EX", ttl); @@ -124,6 +206,9 @@ const driver: DriverFactory = (opts) => { }, async setItemRaw(key, value, tOptions) { const _value = normalizeValue(value); + if (tOptions?.ifMatch !== undefined || tOptions?.ifNoneMatch !== undefined) { + return setWithCAS(key, _value, tOptions); + } const ttl = tOptions?.ttl ?? opts.ttl; if (ttl) { await getRedisClient().set(p(key), _value, "EX", ttl); diff --git a/src/drivers/utils/cas.ts b/src/drivers/utils/cas.ts new file mode 100644 index 000000000..56f062855 --- /dev/null +++ b/src/drivers/utils/cas.ts @@ -0,0 +1,90 @@ +/** + * Compare-and-swap helpers shared by drivers. + * + * Lives under `src/drivers/utils/` because drivers are transpiled per-file + * and must not import from `..` (the rest of `src` is bundled). The bundled + * side re-exports these via `src/errors.ts`. + * + * Note on cross-bundle identity: the bundled main and the per-driver output + * each receive their own copy of these classes. Prefer + * `CASMismatchError.is(err)` / `err.code === "ERR_CAS_MISMATCH"` over + * `instanceof` for catch-blocks that may receive errors thrown across that + * boundary. + */ + +export const ERR_CAS_MISMATCH: "ERR_CAS_MISMATCH" = "ERR_CAS_MISMATCH"; +export const ERR_CAS_UNSUPPORTED: "ERR_CAS_UNSUPPORTED" = "ERR_CAS_UNSUPPORTED"; + +/** Thrown by CAS-aware drivers when an `ifMatch` / `ifNoneMatch` precondition fails. */ +export class CASMismatchError extends Error { + readonly code: typeof ERR_CAS_MISMATCH = ERR_CAS_MISMATCH; + + constructor(driver: string, key?: string, message?: string) { + super( + message ?? + `[unstorage] [${driver}] CAS mismatch${key === undefined ? "" : ` for key "${key}"`}`, + ); + this.name = "CASMismatchError"; + } + + /** Cross-bundle-safe check (uses `code` field, not `instanceof`). */ + static is(err: unknown): err is CASMismatchError { + return !!err && typeof err === "object" && (err as any).code === ERR_CAS_MISMATCH; + } +} + +/** Thrown when CAS preconditions are passed to a driver that does not implement CAS. */ +export class CASUnsupportedError extends Error { + readonly code: typeof ERR_CAS_UNSUPPORTED = ERR_CAS_UNSUPPORTED; + + constructor(driver: string) { + super(`[unstorage] [${driver}] driver does not support ifMatch/ifNoneMatch`); + this.name = "CASUnsupportedError"; + } + + /** Cross-bundle-safe check (uses `code` field, not `instanceof`). */ + static is(err: unknown): err is CASUnsupportedError { + return !!err && typeof err === "object" && (err as any).code === ERR_CAS_UNSUPPORTED; + } +} + +/** Throw {@link CASUnsupportedError} if `opts` carries any CAS precondition. */ +export function assertCASUnsupported(driver: string, opts: unknown): void { + if (opts && typeof opts === "object") { + const o = opts as { ifMatch?: unknown; ifNoneMatch?: unknown }; + if (o.ifMatch !== undefined || o.ifNoneMatch !== undefined) { + throw new CASUnsupportedError(driver); + } + } +} + +/** + * Evaluate `ifMatch` / `ifNoneMatch` preconditions against the current state + * of a key. Throws {@link CASMismatchError} on failure. Used by drivers whose + * underlying backend exposes etag/version metadata to the client. + */ +export function checkCAS( + driver: string, + key: string, + current: { exists: boolean; etag?: string }, + opts: { ifMatch?: string; ifNoneMatch?: string }, +): void { + const { ifMatch, ifNoneMatch } = opts; + const { exists, etag } = current; + + if (ifNoneMatch !== undefined) { + if (ifNoneMatch === "*") { + if (exists) throw new CASMismatchError(driver, key); + } else if (exists && etag === ifNoneMatch) { + throw new CASMismatchError(driver, key); + } + } + + if (ifMatch !== undefined) { + if (ifMatch === "*") { + if (!exists) throw new CASMismatchError(driver, key); + } else if (!exists || etag !== ifMatch) { + throw new CASMismatchError(driver, key); + } + } +} diff --git a/src/drivers/utils/node-fs.ts b/src/drivers/utils/node-fs.ts index 7f031af0e..05d1396d3 100644 --- a/src/drivers/utils/node-fs.ts +++ b/src/drivers/utils/node-fs.ts @@ -19,6 +19,26 @@ export async function writeFile( return fsPromises.writeFile(path, data, encoding); } +/** + * Atomic create-only write. Writes to a temp file then `link()`s into place; + * `link()` fails with `EEXIST` if the target already exists, giving true + * cross-process atomicity (no window where readers see an empty/partial file). + */ +export async function writeFileExclusive( + path: string, + data: WriteFileData, + encoding?: BufferEncoding, +): Promise { + await ensuredir(dirname(path)); + const tmp = `${path}.${process.pid}.${Date.now().toString(36)}.${Math.random().toString(36).slice(2, 8)}.tmp`; + await fsPromises.writeFile(tmp, data, encoding); + try { + await fsPromises.link(tmp, path); + } finally { + await fsPromises.unlink(tmp).catch(() => {}); + } +} + export function readFile(path: string, encoding?: BufferEncoding): Promise { return fsPromises.readFile(path, encoding).catch(ignoreNotfound); } diff --git a/src/errors.ts b/src/errors.ts new file mode 100644 index 000000000..a408adb34 --- /dev/null +++ b/src/errors.ts @@ -0,0 +1,11 @@ +// Re-exported from `src/drivers/utils/cas.ts` so drivers (transpiled per-file) +// and the bundled main share a single source. See cas.ts for cross-bundle +// identity caveats — prefer `CASMismatchError.is(err)` / `err.code` over +// `instanceof` when catching errors that originated in a driver. +export { + CASMismatchError, + CASUnsupportedError, + ERR_CAS_MISMATCH, + ERR_CAS_UNSUPPORTED, + assertCASUnsupported, +} from "./drivers/utils/cas.ts"; diff --git a/src/index.ts b/src/index.ts index 748a6b72c..7b130682e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,4 @@ +export * from "./errors.ts"; export * from "./storage.ts"; export * from "./types.ts"; export * from "./utils.ts"; diff --git a/src/storage.ts b/src/storage.ts index 0c25ad8d0..a86ae7d32 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -7,7 +7,9 @@ import type { StorageValue, WatchEvent, TransactionOptions, + SetItemResult, } from "./types.ts"; +import { CASUnsupportedError } from "./errors.ts"; import memory from "./drivers/memory.ts"; import { asyncCall, deserializeRaw, serializeRaw, stringify } from "./_utils.ts"; import { @@ -204,19 +206,24 @@ export function createStorage( } return asyncCall(driver.getItem, relativeKey, opts).then((value) => deserializeRaw(value)); }, - async setItem(key: string, value: T, opts = {}) { + async setItem(key: string, value: T, opts: TransactionOptions = {}) { if (value === undefined) { return storage.removeItem(key); } key = normalizeKey(key); const { relativeKey, driver } = getMount(key); + const wantsCAS = opts.ifMatch !== undefined || opts.ifNoneMatch !== undefined; + if (wantsCAS && !driver.flags?.cas) { + throw new CASUnsupportedError(driver.name || "unknown"); + } if (!driver.setItem) { return; // Readonly } - await asyncCall(driver.setItem, relativeKey, stringify(value), opts); + const result = await asyncCall(driver.setItem, relativeKey, stringify(value), opts); if (!driver.watch) { onChange("update", key); } + return result as SetItemResult | void; }, async setItems(items, commonOptions) { await runBatch(items, commonOptions, async (batch) => { @@ -252,16 +259,22 @@ export function createStorage( } key = normalizeKey(key); const { relativeKey, driver } = getMount(key); + const wantsCAS = opts.ifMatch !== undefined || opts.ifNoneMatch !== undefined; + if (wantsCAS && !driver.flags?.cas) { + throw new CASUnsupportedError(driver.name || "unknown"); + } + let result: void | SetItemResult; if (driver.setItemRaw) { - await asyncCall(driver.setItemRaw, relativeKey, value, opts); + result = await asyncCall(driver.setItemRaw, relativeKey, value, opts); } else if (driver.setItem) { - await asyncCall(driver.setItem, relativeKey, serializeRaw(value), opts); + result = await asyncCall(driver.setItem, relativeKey, serializeRaw(value), opts); } else { return; // Readonly } if (!driver.watch) { onChange("update", key); } + return result; }, async removeItem( key: string, @@ -313,8 +326,8 @@ export function createStorage( } return meta; }, - setMeta(key: string, value: any, opts = {}) { - return this.setItem(key + "$", value, opts); + async setMeta(key: string, value: any, opts = {}) { + await this.setItem(key + "$", value, opts); }, removeMeta(key: string, opts = {}) { return this.removeItem(key + "$", opts); diff --git a/src/types.ts b/src/types.ts index 1f11e02f4..4ef588c65 100644 --- a/src/types.ts +++ b/src/types.ts @@ -12,11 +12,39 @@ export interface StorageMeta { atime?: Date; mtime?: Date; ttl?: number; + /** + * Opaque version token used for atomic compare-and-swap via `ifMatch`/`ifNoneMatch`. + * Drivers that do not support CAS leave this `undefined`. + */ + etag?: string; [key: string]: StorageValue | Date | undefined; } +/** + * Result returned by `setItem`/`setItemRaw` from a CAS-aware driver. Drivers + * without CAS support return `void` instead. + */ +export interface SetItemResult { + etag?: string; +} + +/** + * Conditional-write preconditions (HTTP-style): + * - `ifMatch: ""` — write only if current etag equals this value. + * - `ifMatch: "*"` — write only if a value currently exists. + * - `ifNoneMatch: ""` — write only if current etag is NOT this value. + * - `ifNoneMatch: "*"` — write only if no value currently exists (create-only). + * + * On precondition failure, drivers throw `CASMismatchError`. Drivers that do + * not implement CAS throw on any of these options. + */ +export interface CASOptions { + ifMatch?: string; + ifNoneMatch?: string; +} + // TODO: type ttl -export type TransactionOptions = Record; +export type TransactionOptions = Record & CASOptions; export type GetKeysOptions = TransactionOptions & { maxDepth?: number; @@ -25,6 +53,13 @@ export type GetKeysOptions = TransactionOptions & { export interface DriverFlags { maxDepth?: boolean; ttl?: boolean; + /** + * Driver honors `ifMatch` / `ifNoneMatch` on `setItem` / `setItemRaw` and + * exposes `etag` via `getMeta`. The user-facing capability signal is the + * presence of `etag` in `getMeta`; this flag is the contract between the + * driver and the storage core (used to fail fast before issuing a write). + */ + cas?: boolean; } export interface Driver { @@ -41,14 +76,22 @@ export interface Driver { ) => MaybePromise<{ key: string; value: StorageValue }[]>; /** @experimental */ getItemRaw?: (key: string, opts: TransactionOptions) => MaybePromise; - setItem?: (key: string, value: string, opts: TransactionOptions) => MaybePromise; + setItem?: ( + key: string, + value: string, + opts: TransactionOptions, + ) => MaybePromise; /** @experimental */ setItems?: ( items: { key: string; value: string; options?: TransactionOptions }[], commonOptions?: TransactionOptions, ) => MaybePromise; /** @experimental */ - setItemRaw?: (key: string, value: any, opts: TransactionOptions) => MaybePromise; + setItemRaw?: ( + key: string, + value: any, + opts: TransactionOptions, + ) => MaybePromise; removeItem?: (key: string, opts: TransactionOptions) => MaybePromise; getMeta?: (key: string, opts: TransactionOptions) => MaybePromise; getKeys: (base: string, opts: GetKeysOptions) => MaybePromise; @@ -98,8 +141,12 @@ export interface Storage { key: K, value: StorageItemType, opts?: TransactionOptions, - ): Promise; - setItem(key: string, value: U, opts?: TransactionOptions): Promise; + ): Promise; + setItem( + key: string, + value: U, + opts?: TransactionOptions, + ): Promise; /** @experimental */ setItems: ( @@ -111,7 +158,7 @@ export interface Storage { key: string, value: MaybeDefined, opts?: TransactionOptions, - ) => Promise; + ) => Promise; removeItem, K extends keyof StorageItemMap>( key: K, diff --git a/test/drivers/fs.test.ts b/test/drivers/fs.test.ts index 366f5f0e0..de95ff551 100644 --- a/test/drivers/fs.test.ts +++ b/test/drivers/fs.test.ts @@ -10,6 +10,7 @@ describe("drivers: fs", () => { testDriver({ driver: driver({ base: dir }), + supportsCAS: true, additionalTests(ctx) { it("check filesystem", async () => { await ctx.storage.setItem("s1:a", "test_data"); diff --git a/test/drivers/lru-cache.test.ts b/test/drivers/lru-cache.test.ts index 131e561d2..d6115f9d5 100644 --- a/test/drivers/lru-cache.test.ts +++ b/test/drivers/lru-cache.test.ts @@ -5,6 +5,7 @@ import { testDriver } from "./utils.ts"; describe("drivers: lru-cache", () => { testDriver({ driver: driver({}), + supportsCAS: true, }); }); @@ -13,6 +14,7 @@ describe("drivers: lru-cache with size", () => { driver: driver({ maxEntrySize: 50, }), + supportsCAS: true, additionalTests(ctx) { it("should not store large items", async () => { await ctx.storage.setItem( diff --git a/test/drivers/memory.test.ts b/test/drivers/memory.test.ts index e5fd3c65c..187e4f8ff 100644 --- a/test/drivers/memory.test.ts +++ b/test/drivers/memory.test.ts @@ -5,6 +5,7 @@ import { testDriver } from "./utils.ts"; describe("drivers: memory", () => { testDriver({ driver: driver(), + supportsCAS: true, }); // Regression: nitrojs/nitro#2138 — expired entries should be proactively diff --git a/test/drivers/redis.test.ts b/test/drivers/redis.test.ts index 295f2c85d..479256c8e 100644 --- a/test/drivers/redis.test.ts +++ b/test/drivers/redis.test.ts @@ -17,6 +17,7 @@ describe("drivers: redis", () => { testDriver({ driver: binaryDriver, + supportsCAS: true, additionalTests(ctx) { it("saves raw data as binary", async () => { const helloBuffer = Buffer.from("Hello, world!", "utf8"); diff --git a/test/drivers/utils.ts b/test/drivers/utils.ts index 4c6df2b9b..75cd5d9a9 100644 --- a/test/drivers/utils.ts +++ b/test/drivers/utils.ts @@ -1,5 +1,12 @@ import { it, expect, beforeAll, afterAll, afterEach } from "vitest"; -import { type Storage, type Driver, createStorage, restoreSnapshot } from "../../src/index.ts"; +import { + type Storage, + type Driver, + CASMismatchError, + CASUnsupportedError, + createStorage, + restoreSnapshot, +} from "../../src/index.ts"; export interface TestContext { storage: Storage; @@ -9,6 +16,8 @@ export interface TestContext { export interface TestOptions { driver: Driver | (() => Driver); noKeysSupport?: boolean; + /** Driver supports `ifMatch`/`ifNoneMatch` preconditions. */ + supportsCAS?: boolean; additionalTests?: (ctx: TestContext) => void; } @@ -199,4 +208,55 @@ export function testDriver(opts: TestOptions): void { await ctx.storage.clear(); expect(await ctx.storage.getKeys()).toMatchObject([]); }); + + if (opts.supportsCAS) { + it("CAS: ifNoneMatch:* creates only when absent", async () => { + const r1 = await ctx.storage.setItem("cas:create", "first", { ifNoneMatch: "*" }); + expect(r1).toMatchObject({ etag: expect.any(String) }); + expect(await ctx.storage.getItem("cas:create")).toBe("first"); + + await expect( + ctx.storage.setItem("cas:create", "second", { ifNoneMatch: "*" }), + ).rejects.toBeInstanceOf(CASMismatchError); + expect(await ctx.storage.getItem("cas:create")).toBe("first"); + }); + + it("CAS: ifMatch: swaps only when version matches", async () => { + await ctx.storage.setItem("cas:swap", "v1"); + const meta1 = await ctx.storage.getMeta("cas:swap"); + expect(meta1.etag).toBeTruthy(); + + const r = await ctx.storage.setItem("cas:swap", "v2", { ifMatch: meta1.etag as string }); + expect(r).toMatchObject({ etag: expect.any(String) }); + expect(await ctx.storage.getItem("cas:swap")).toBe("v2"); + + await expect( + ctx.storage.setItem("cas:swap", "v3", { ifMatch: meta1.etag as string }), + ).rejects.toBeInstanceOf(CASMismatchError); + expect(await ctx.storage.getItem("cas:swap")).toBe("v2"); + }); + + it("CAS: ifMatch:* requires existence", async () => { + await expect( + ctx.storage.setItem("cas:absent", "x", { ifMatch: "*" }), + ).rejects.toBeInstanceOf(CASMismatchError); + + await ctx.storage.setItem("cas:absent", "x"); + const r = await ctx.storage.setItem("cas:absent", "y", { ifMatch: "*" }); + expect(r).toMatchObject({ etag: expect.any(String) }); + expect(await ctx.storage.getItem("cas:absent")).toBe("y"); + }); + + it("CAS: getMeta returns etag after write", async () => { + const r = await ctx.storage.setItem("cas:meta", "hello", { ifNoneMatch: "*" }); + const meta = await ctx.storage.getMeta("cas:meta"); + expect(meta.etag).toBe((r as { etag: string }).etag); + }); + } else { + it("CAS: throws CASUnsupportedError on ifMatch/ifNoneMatch", async () => { + await expect( + ctx.storage.setItem("cas:unsupported", "x", { ifNoneMatch: "*" }), + ).rejects.toBeInstanceOf(CASUnsupportedError); + }); + } } From 458b7f03e8fd95f8f77bf9fb916a32bc57349983 Mon Sep 17 00:00:00 2001 From: Pooya Parsa Date: Tue, 5 May 2026 02:35:22 +0200 Subject: [PATCH 2/5] feat: extend CAS support to remaining drivers Adds ifMatch/ifNoneMatch CAS to deno-kv (+node), mongodb, s3, cloudflare-r2-binding, netlify-blobs, vercel-blob, db0, http (+server), and overlay. Native primitives where available; emulated with checkCAS + in-process lock where not. - deno-kv: atomic check/set with versionstamp as etag - mongodb: insertOne/updateOne with _etag (SHA-1) field + unique index - s3: If-Match/If-None-Match headers, 412/409 -> mismatch - cloudflare-r2-binding: onlyIf etagMatches/etagDoesNotMatch - netlify-blobs: onlyIfNew/onlyIfMatch (native + emulated for *) - vercel-blob: native ifMatch + head-then-write for other shapes - db0: SELECT-then-write under in-process lock; new etag column - http: forward If-Match/If-None-Match; server maps 412/501 - overlay: delegate to top layer; flags.cas derives from layers[0] --- src/drivers/cloudflare-r2-binding.ts | 32 +++- src/drivers/db0.ts | 179 +++++++++++++++++++-- src/drivers/deno-kv.ts | 66 ++++++++ src/drivers/http.ts | 75 +++++++-- src/drivers/mongodb.ts | 125 +++++++++++++- src/drivers/netlify-blobs.ts | 72 ++++++++- src/drivers/overlay.ts | 13 +- src/drivers/s3.ts | 87 ++++++++-- src/drivers/vercel-blob.ts | 143 ++++++++++++++-- src/server.ts | 76 +++++++-- test/drivers/cloudflare-r2-binding.test.ts | 1 + test/drivers/db0.test.ts | 1 + test/drivers/deno-kv-node.test.ts | 1 + test/drivers/deno-kv.test.ts | 1 + test/drivers/http.test.ts | 1 + test/drivers/mongodb.test.ts | 1 + test/drivers/netlify-blobs.test.ts | 8 + test/drivers/overlay.test.ts | 1 + test/drivers/s3.test.ts | 1 + test/drivers/utils.ts | 11 +- test/drivers/vercel-blob.test.ts | 2 + 21 files changed, 808 insertions(+), 89 deletions(-) diff --git a/src/drivers/cloudflare-r2-binding.ts b/src/drivers/cloudflare-r2-binding.ts index cd814f50c..06c5f3a6d 100644 --- a/src/drivers/cloudflare-r2-binding.ts +++ b/src/drivers/cloudflare-r2-binding.ts @@ -1,6 +1,7 @@ import type * as CF from "@cloudflare/workers-types"; import { type DriverFactory, joinKeys } from "./utils/index.ts"; import { getR2Binding } from "./utils/cloudflare.ts"; +import { CASMismatchError } from "./utils/cas.ts"; export interface CloudflareR2Options { binding?: string | CF.R2Bucket; @@ -20,9 +21,25 @@ const driver: DriverFactory = (opts = {}) => { return kvList.objects.map((obj) => obj.key); }; + const buildPutOpts = ( + topts: (CF.R2PutOptions & { ifMatch?: string; ifNoneMatch?: string }) | undefined, + ): CF.R2PutOptions | undefined => { + if (!topts) return undefined; + const { ifMatch, ifNoneMatch, ...rest } = topts; + if (ifMatch === undefined && ifNoneMatch === undefined) { + return rest as CF.R2PutOptions; + } + // R2 accepts `"*"` as a wildcard etag for both etagMatches / etagDoesNotMatch. + const onlyIf: CF.R2Conditional = {}; + if (ifNoneMatch !== undefined) onlyIf.etagDoesNotMatch = ifNoneMatch; + if (ifMatch !== undefined) onlyIf.etagMatches = ifMatch; + return { ...(rest as CF.R2PutOptions), onlyIf }; + }; + return { name: DRIVER_NAME, options: opts, + flags: { cas: true }, getInstance: () => getR2Binding(opts.binding), async hasItem(key) { key = r(key); @@ -38,6 +55,7 @@ const driver: DriverFactory = (opts = {}) => { mtime: obj.uploaded, atime: obj.uploaded, ...obj, + etag: obj.etag, }; }, getItem(key, topts) { @@ -54,12 +72,22 @@ const driver: DriverFactory = (opts = {}) => { async setItem(key, value, topts) { key = r(key); const binding = getR2Binding(opts.binding); - await binding.put(key, value, topts); + const wantsCAS = topts?.ifMatch !== undefined || topts?.ifNoneMatch !== undefined; + const result = await binding.put(key, value, buildPutOpts(topts) as any); + if (wantsCAS && result === null) { + throw new CASMismatchError(DRIVER_NAME, key); + } + return wantsCAS ? { etag: result!.etag } : undefined; }, async setItemRaw(key, value, topts) { key = r(key); const binding = getR2Binding(opts.binding); - await binding.put(key, value, topts); + const wantsCAS = topts?.ifMatch !== undefined || topts?.ifNoneMatch !== undefined; + const result = await binding.put(key, value, buildPutOpts(topts) as any); + if (wantsCAS && result === null) { + throw new CASMismatchError(DRIVER_NAME, key); + } + return wantsCAS ? { etag: result!.etag } : undefined; }, async removeItem(key) { key = r(key); diff --git a/src/drivers/db0.ts b/src/drivers/db0.ts index 7fdc15fb2..c6f052093 100644 --- a/src/drivers/db0.ts +++ b/src/drivers/db0.ts @@ -1,10 +1,13 @@ +import { createHash } from "node:crypto"; import type { Connector, Database } from "db0"; import { createError, type DriverFactory } from "./utils/index.ts"; +import { checkCAS } from "./utils/cas.ts"; interface ResultSchema { rows: Array<{ key: string; value: string; + etag: string | null; created_at: string; updated_at: string; }>; @@ -20,6 +23,13 @@ const DEFAULT_TABLE_NAME = "unstorage"; const kExperimentalWarning = "__unstorage_db0_experimental_warning__"; +const computeEtag = (value: unknown): string => { + const buf = Buffer.isBuffer(value) + ? value + : Buffer.from(typeof value === "string" ? value : String(value)); + return createHash("sha1").update(buf).digest("hex"); +}; + const driver: DriverFactory>> = (opts) => { opts.tableName = opts.tableName || DEFAULT_TABLE_NAME; @@ -46,18 +56,115 @@ const driver: DriverFactory>> = (o const isMysql = opts.database.dialect === "mysql"; + // Per-key in-process serialization for CAS writes. db0 has no portable + // transaction primitive across connectors, so we serialize SELECT-then-write + // sequences in-process. Cross-process correctness is not guaranteed. + const writeLocks = new Map>(); + const withLock = async (key: string, fn: () => Promise): Promise => { + const previous = writeLocks.get(key) || Promise.resolve(); + let release!: () => void; + const next = new Promise((r) => { + release = r; + }); + writeLocks.set( + key, + previous.then(() => next), + ); + await previous; + try { + return await fn(); + } finally { + release(); + if (writeLocks.get(key) === next) { + writeLocks.delete(key); + } + } + }; + + const readState = async ( + key: string, + ): Promise<{ exists: boolean; etag?: string }> => { + const { rows } = isMysql + ? await opts.database.sql + /* sql */ `SELECT etag FROM {${opts.tableName}} WHERE \`key\` = ${key}` + : await opts.database.sql + /* sql */ `SELECT etag FROM {${opts.tableName}} WHERE key = ${key}`; + const row = rows?.[0]; + if (!row) { + return { exists: false }; + } + return { exists: true, etag: row.etag ?? undefined }; + }; + + const writeWithCAS = async ( + key: string, + value: string | Buffer, + column: "value" | "blob", + casOpts: { ifMatch?: string; ifNoneMatch?: string }, + ): Promise<{ etag: string }> => { + const newEtag = computeEtag(value); + return withLock(key, async () => { + const { exists, etag: currentEtag } = await readState(key); + checkCAS(DRIVER_NAME, key, { exists, etag: currentEtag }, casOpts); + const v = value as any; + if (exists) { + if (isMysql) { + if (column === "value") { + await opts.database.sql + /* sql */ `UPDATE {${opts.tableName}} SET \`value\` = ${v}, etag = ${newEtag}, updated_at = CURRENT_TIMESTAMP WHERE \`key\` = ${key}`; + } else { + await opts.database.sql + /* sql */ `UPDATE {${opts.tableName}} SET \`blob\` = ${v}, etag = ${newEtag}, updated_at = CURRENT_TIMESTAMP WHERE \`key\` = ${key}`; + } + } else { + if (column === "value") { + await opts.database.sql + /* sql */ `UPDATE {${opts.tableName}} SET value = ${v}, etag = ${newEtag}, updated_at = CURRENT_TIMESTAMP WHERE key = ${key}`; + } else { + await opts.database.sql + /* sql */ `UPDATE {${opts.tableName}} SET blob = ${v}, etag = ${newEtag}, updated_at = CURRENT_TIMESTAMP WHERE key = ${key}`; + } + } + } else { + if (isMysql) { + if (column === "value") { + await opts.database.sql + /* sql */ `INSERT INTO {${opts.tableName}} (\`key\`, \`value\`, etag, created_at, updated_at) VALUES (${key}, ${v}, ${newEtag}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)`; + } else { + await opts.database.sql + /* sql */ `INSERT INTO {${opts.tableName}} (\`key\`, \`blob\`, etag, created_at, updated_at) VALUES (${key}, ${v}, ${newEtag}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)`; + } + } else { + if (column === "value") { + await opts.database.sql + /* sql */ `INSERT INTO {${opts.tableName}} (key, value, etag, created_at, updated_at) VALUES (${key}, ${v}, ${newEtag}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)`; + } else { + await opts.database.sql + /* sql */ `INSERT INTO {${opts.tableName}} (key, blob, etag, created_at, updated_at) VALUES (${key}, ${v}, ${newEtag}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)`; + } + } + } + return { etag: newEtag }; + }); + }; + + const hasRow = async (key: string): Promise => { + const { rows } = isMysql + ? await opts.database.sql + /* sql */ `SELECT EXISTS (SELECT 1 FROM {${opts.tableName}} WHERE \`key\` = ${key}) AS \`value\`` + : await opts.database.sql + /* sql */ `SELECT EXISTS (SELECT 1 FROM {${opts.tableName}} WHERE key = ${key}) AS value`; + return rows?.[0]?.value == "1"; + }; + return { name: DRIVER_NAME, + flags: { cas: true }, options: opts, getInstance: () => opts.database, async hasItem(key) { await ensureTable(); - const { rows } = isMysql - ? await opts.database.sql - /* sql */ `SELECT EXISTS (SELECT 1 FROM {${opts.tableName}} WHERE \`key\` = ${key}) AS \`value\`` - : await opts.database.sql - /* sql */ `SELECT EXISTS (SELECT 1 FROM {${opts.tableName}} WHERE key = ${key}) AS value`; - return rows?.[0]?.value == "1"; + return hasRow(key); }, getItem: async (key) => { await ensureTable(); @@ -77,25 +184,38 @@ const driver: DriverFactory>> = (o /* sql */ `SELECT blob as value FROM {${opts.tableName}} WHERE key = ${key}`; return rows?.[0]?.value ?? null; }, - setItem: async (key, value) => { + setItem: async (key, value, tOptions) => { await ensureTable(); + const wantsCAS = + tOptions?.ifMatch !== undefined || tOptions?.ifNoneMatch !== undefined; + if (wantsCAS) { + return writeWithCAS(key, value, "value", tOptions); + } + const etag = computeEtag(value); if (isMysql) { await opts.database.sql - /* sql */ `INSERT INTO {${opts.tableName}} (\`key\`, \`value\`, created_at, updated_at) VALUES (${key}, ${value}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON DUPLICATE KEY UPDATE value = ${value}, updated_at = CURRENT_TIMESTAMP`; + /* sql */ `INSERT INTO {${opts.tableName}} (\`key\`, \`value\`, etag, created_at, updated_at) VALUES (${key}, ${value}, ${etag}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON DUPLICATE KEY UPDATE value = ${value}, etag = ${etag}, updated_at = CURRENT_TIMESTAMP`; } else { await opts.database.sql - /* sql */ `INSERT INTO {${opts.tableName}} (key, value, created_at, updated_at) VALUES (${key}, ${value}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT(key) DO UPDATE SET value = ${value}, updated_at = CURRENT_TIMESTAMP`; + /* sql */ `INSERT INTO {${opts.tableName}} (key, value, etag, created_at, updated_at) VALUES (${key}, ${value}, ${etag}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT(key) DO UPDATE SET value = ${value}, etag = ${etag}, updated_at = CURRENT_TIMESTAMP`; } }, - async setItemRaw(key, value) { + async setItemRaw(key, value, tOptions) { await ensureTable(); + const wantsCAS = + tOptions?.ifMatch !== undefined || tOptions?.ifNoneMatch !== undefined; + if (wantsCAS) { + const blob = isMysql ? (Buffer.from(value) as any) : value; + return writeWithCAS(key, blob, "blob", tOptions); + } + const etag = computeEtag(value); if (isMysql) { const blob = Buffer.from(value) as any; await opts.database.sql - /* sql */ `INSERT INTO {${opts.tableName}} (\`key\`, \`blob\`, created_at, updated_at) VALUES (${key}, ${blob}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON DUPLICATE KEY UPDATE \`blob\` = ${blob}, updated_at = CURRENT_TIMESTAMP`; + /* sql */ `INSERT INTO {${opts.tableName}} (\`key\`, \`blob\`, etag, created_at, updated_at) VALUES (${key}, ${blob}, ${etag}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON DUPLICATE KEY UPDATE \`blob\` = ${blob}, etag = ${etag}, updated_at = CURRENT_TIMESTAMP`; } else { await opts.database.sql - /* sql */ `INSERT INTO {${opts.tableName}} (key, blob, created_at, updated_at) VALUES (${key}, ${value}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT(key) DO UPDATE SET blob = ${value}, updated_at = CURRENT_TIMESTAMP`; + /* sql */ `INSERT INTO {${opts.tableName}} (key, blob, etag, created_at, updated_at) VALUES (${key}, ${value}, ${etag}, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP) ON CONFLICT(key) DO UPDATE SET blob = ${value}, etag = ${etag}, updated_at = CURRENT_TIMESTAMP`; } }, removeItem: async (key) => { @@ -110,13 +230,15 @@ const driver: DriverFactory>> = (o await ensureTable(); const { rows } = isMysql ? await opts.database.sql - /* sql */ `SELECT created_at, updated_at FROM {${opts.tableName}} WHERE \`key\` = ${key}` + /* sql */ `SELECT etag, created_at, updated_at FROM {${opts.tableName}} WHERE \`key\` = ${key}` : await opts.database.sql - /* sql */ `SELECT created_at, updated_at FROM {${opts.tableName}} WHERE key = ${key}`; + /* sql */ `SELECT etag, created_at, updated_at FROM {${opts.tableName}} WHERE key = ${key}`; + const row = rows?.[0]; return { - birthtime: toDate(rows?.[0]?.created_at), - mtime: toDate(rows?.[0]?.updated_at), + birthtime: toDate(row?.created_at), + mtime: toDate(row?.updated_at), + etag: row?.etag ?? undefined, }; }, getKeys: async (base = "") => { @@ -149,10 +271,12 @@ async function setupTable(opts: DB0DriverOptions) { key TEXT PRIMARY KEY, value TEXT, blob BLOB, + etag TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ); `; + await addEtagColumn(opts, "TEXT"); return; } case "postgresql": { @@ -161,10 +285,12 @@ async function setupTable(opts: DB0DriverOptions) { key VARCHAR(255) NOT NULL PRIMARY KEY, value TEXT, blob BYTEA, + etag VARCHAR(64), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); `; + await addEtagColumn(opts, "VARCHAR(64)"); return; } case "mysql": { @@ -173,10 +299,12 @@ async function setupTable(opts: DB0DriverOptions) { \`key\` VARCHAR(255) NOT NULL PRIMARY KEY, \`value\` LONGTEXT, \`blob\` BLOB, + \`etag\` VARCHAR(64), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); `; + await addEtagColumn(opts, "VARCHAR(64)"); return; } default: { @@ -185,6 +313,25 @@ async function setupTable(opts: DB0DriverOptions) { } } +// Best-effort migration for tables created before the etag column existed. +// Existing rows will have NULL etag and `ifMatch:` will fail until they +// are rewritten — a soft-breaking change documented in the changelog. +async function addEtagColumn(opts: DB0DriverOptions, type: string): Promise { + try { + if (opts.database.dialect === "mysql") { + await opts.database.exec( + `ALTER TABLE \`${opts.tableName}\` ADD COLUMN \`etag\` ${type}`, + ); + } else { + await opts.database.exec( + `ALTER TABLE "${opts.tableName}" ADD COLUMN etag ${type}`, + ); + } + } catch { + // Column already exists or dialect doesn't support IF NOT EXISTS for ALTER. + } +} + function toDate(timestamp: string | undefined): Date | undefined { return timestamp ? new Date(timestamp) : undefined; } diff --git a/src/drivers/deno-kv.ts b/src/drivers/deno-kv.ts index da795aa6c..f7196748f 100644 --- a/src/drivers/deno-kv.ts +++ b/src/drivers/deno-kv.ts @@ -1,4 +1,5 @@ import { type DriverFactory, createError, normalizeKey } from "./utils/index.ts"; +import { CASMismatchError } from "./utils/cas.ts"; import type * as DenoKV from "@deno/kv"; // https://docs.deno.com/deploy/kv/manual/ @@ -17,6 +18,8 @@ interface DenoKVSetOptions { * TTL in seconds. */ ttl?: number; + ifMatch?: string; + ifNoneMatch?: string; } const DRIVER_NAME = "deno-kv"; @@ -51,8 +54,60 @@ const driver: DriverFactory> = (opts) => { return _kv; }; + const setWithCAS = async ( + key: string, + value: unknown, + tOptions: DenoKVSetOptions, + ): Promise<{ etag: string }> => { + const kv = await getKv(); + const k = r(key); + const ttl = normalizeTTL(tOptions.ttl ?? opts?.ttl); + const { ifMatch, ifNoneMatch } = tOptions; + + // Fast path: create-only via versionstamp:null check. + if (ifNoneMatch === "*" && ifMatch === undefined) { + const result = await kv + .atomic() + .check({ key: k, versionstamp: null }) + .set(k, value, { expireIn: ttl }) + .commit(); + if (!result.ok) { + throw new CASMismatchError(DRIVER_NAME, key); + } + return { etag: result.versionstamp }; + } + + // General path: read current versionstamp, validate preconditions, then + // atomic check+set on that versionstamp to detect races. + const cur = await kv.get(k); + const exists = cur.value !== null; + const curEtag = exists ? cur.versionstamp : undefined; + + let mismatch = false; + if (ifNoneMatch !== undefined) { + mismatch = ifNoneMatch === "*" ? exists : exists && curEtag === ifNoneMatch; + } + if (!mismatch && ifMatch !== undefined) { + mismatch = ifMatch === "*" ? !exists : !exists || curEtag !== ifMatch; + } + if (mismatch) { + throw new CASMismatchError(DRIVER_NAME, key); + } + + const result = await kv + .atomic() + .check({ key: k, versionstamp: curEtag ?? null }) + .set(k, value, { expireIn: ttl }) + .commit(); + if (!result.ok) { + throw new CASMismatchError(DRIVER_NAME, key); + } + return { etag: result.versionstamp }; + }; + return { name: DRIVER_NAME, + flags: { cas: true }, getInstance() { return getKv(); }, @@ -71,12 +126,23 @@ const driver: DriverFactory> = (opts) => { const value = await kv.get(r(key)); return value.value; }, + async getMeta(key) { + const kv = await getKv(); + const entry = await kv.get(r(key)); + return entry.value === null ? null : { etag: entry.versionstamp }; + }, async setItem(key, value, tOptions: DenoKVSetOptions) { + if (tOptions?.ifMatch !== undefined || tOptions?.ifNoneMatch !== undefined) { + return setWithCAS(key, value, tOptions); + } const ttl = normalizeTTL(tOptions?.ttl ?? opts?.ttl); const kv = await getKv(); await kv.set(r(key), value, { expireIn: ttl }); }, async setItemRaw(key, value, tOptions: DenoKVSetOptions) { + if (tOptions?.ifMatch !== undefined || tOptions?.ifNoneMatch !== undefined) { + return setWithCAS(key, value, tOptions); + } const ttl = normalizeTTL(tOptions?.ttl ?? opts?.ttl); const kv = await getKv(); await kv.set(r(key), value, { expireIn: ttl }); diff --git a/src/drivers/http.ts b/src/drivers/http.ts index 95d85ac4d..f83361f70 100644 --- a/src/drivers/http.ts +++ b/src/drivers/http.ts @@ -2,6 +2,7 @@ import type { TransactionOptions } from "../types.ts"; import { type DriverFactory } from "./utils/index.ts"; import { type FetchError, $fetch as _fetch } from "ofetch"; import { joinURL } from "./utils/path.ts"; +import { CASMismatchError } from "./utils/cas.ts"; export interface HTTPOptions { base: string; @@ -26,7 +27,7 @@ const driver: DriverFactory = (opts) => { topts: TransactionOptions | undefined, defaultHeaders?: Record, ) => { - const headers = { + const headers: Record = { ...defaultHeaders, ...opts.headers, ...topts?.headers, @@ -34,11 +35,42 @@ const driver: DriverFactory = (opts) => { if (topts?.ttl && !headers["x-ttl"]) { headers["x-ttl"] = topts.ttl + ""; } + if (topts?.ifMatch !== undefined && !headers["if-match"]) { + headers["if-match"] = formatCondition(topts.ifMatch); + } + if (topts?.ifNoneMatch !== undefined && !headers["if-none-match"]) { + headers["if-none-match"] = formatCondition(topts.ifNoneMatch); + } return headers; }; + const setItemHTTP = async ( + key: string, + value: any, + topts: TransactionOptions | undefined, + defaultHeaders?: Record, + ): Promise<{ etag: string } | undefined> => { + const wantsCAS = topts?.ifMatch !== undefined || topts?.ifNoneMatch !== undefined; + try { + const res = await _fetch.raw(r(key), { + method: "PUT", + body: value, + headers: getHeaders(topts, defaultHeaders), + }); + if (!wantsCAS) return undefined; + const etag = parseEtag(res.headers.get("etag")); + return etag === undefined ? undefined : { etag }; + } catch (error: any) { + if (error?.response?.status === 412) { + throw new CASMismatchError(DRIVER_NAME, key); + } + throw error; + } + }; + return { name: DRIVER_NAME, + flags: { cas: true }, options: opts, hasItem(key, topts) { return _fetch(r(key), { @@ -78,26 +110,20 @@ const driver: DriverFactory = (opts) => { if (_ttl) { ttl = Number.parseInt(_ttl, 10); } + const etag = parseEtag(res.headers.get("etag")); return { status: res.status, mtime, ttl, + etag, }; }, - async setItem(key, value, topts) { - await _fetch(r(key), { - method: "PUT", - body: value, - headers: getHeaders(topts), - }); + setItem(key, value, topts) { + return setItemHTTP(key, value, topts); }, - async setItemRaw(key, value, topts) { - await _fetch(r(key), { - method: "PUT", - body: value, - headers: getHeaders(topts, { - "content-type": "application/octet-stream", - }), + setItemRaw(key, value, topts) { + return setItemHTTP(key, value, topts, { + "content-type": "application/octet-stream", }); }, async removeItem(key, topts) { @@ -122,3 +148,24 @@ const driver: DriverFactory = (opts) => { }; export default driver; + +// --- Internal helpers --- + +// HTTP spec: ETag values are quoted (`"abc"`) and `If-Match: *` is a literal +// `*` (no quotes). Strip surrounding quotes when parsing inbound, add them +// when sending — but never quote `*`. +function parseEtag(raw: string | null | undefined): string | undefined { + if (!raw) return undefined; + const v = raw.trim(); + if (v === "*") return "*"; + if (v.length >= 2 && v.startsWith('"') && v.endsWith('"')) { + return v.slice(1, -1); + } + return v; +} + +function formatCondition(value: string): string { + if (value === "*") return "*"; + if (value.startsWith('"') && value.endsWith('"')) return value; + return `"${value}"`; +} diff --git a/src/drivers/mongodb.ts b/src/drivers/mongodb.ts index d13df8d3b..75712d0e1 100644 --- a/src/drivers/mongodb.ts +++ b/src/drivers/mongodb.ts @@ -1,4 +1,6 @@ +import { createHash } from "node:crypto"; import { createRequiredError, type DriverFactory } from "./utils/index.ts"; +import { CASMismatchError } from "./utils/cas.ts"; import { MongoClient, type Collection, type MongoClientOptions } from "mongodb"; export interface MongoDbOptions { @@ -27,8 +29,18 @@ export interface MongoDbOptions { const DRIVER_NAME = "mongodb"; +// SHA-1 of the serialized value. Content-addressable; matches the redis driver. +const computeEtag = (value: unknown): string => { + const buf = Buffer.from(typeof value === "string" ? value : JSON.stringify(value)); + return createHash("sha1").update(buf).digest("hex"); +}; + +const isDuplicateKeyError = (err: unknown): boolean => + !!err && typeof err === "object" && (err as { code?: number }).code === 11_000; + const driver: DriverFactory = (opts) => { let collection: Collection; + let indexReady: Promise | undefined; const getMongoCollection = () => { if (!collection) { if (!opts.connectionString) { @@ -37,12 +49,107 @@ const driver: DriverFactory = (opts) => { const mongoClient = new MongoClient(opts.connectionString, opts.clientOptions); const db = mongoClient.db(opts.databaseName || "unstorage"); collection = db.collection(opts.collectionName || "unstorage"); + indexReady = collection.createIndex({ key: 1 }, { unique: true }).catch(() => {}); } return collection; }; + const setWithCAS = async ( + key: string, + value: unknown, + tOptions: { ifMatch?: string; ifNoneMatch?: string }, + ): Promise<{ etag: string }> => { + const col = getMongoCollection(); + await indexReady; + const now = new Date(); + const etag = computeEtag(value); + const { ifMatch, ifNoneMatch } = tOptions; + + // Create-only: rely on the unique index for atomicity. + if (ifNoneMatch === "*" && ifMatch === undefined) { + try { + await col.insertOne({ + key, + value, + _etag: etag, + createdAt: now, + modifiedAt: now, + }); + } catch (err) { + if (isDuplicateKeyError(err)) { + throw new CASMismatchError(DRIVER_NAME, key); + } + throw err; + } + return { etag }; + } + + // ifMatch:* — require existence; no upsert. + if (ifMatch === "*" && ifNoneMatch === undefined) { + const r = await col.updateOne({ key }, { $set: { value, _etag: etag, modifiedAt: now } }); + if (r.matchedCount === 0) { + throw new CASMismatchError(DRIVER_NAME, key); + } + return { etag }; + } + + // ifMatch: — exact-match update; no upsert. + if (ifMatch !== undefined && ifNoneMatch === undefined) { + const r = await col.updateOne( + { key, _etag: ifMatch }, + { $set: { value, _etag: etag, modifiedAt: now } }, + ); + if (r.matchedCount === 0) { + throw new CASMismatchError(DRIVER_NAME, key); + } + return { etag }; + } + + // ifNoneMatch: — succeed when absent or current etag differs. + // Filter excludes the forbidden etag; unique-index dup-key on upsert + // signals "current etag matches the forbidden one" → mismatch. + if (ifNoneMatch !== undefined && ifMatch === undefined) { + try { + await col.updateOne( + { key, _etag: { $ne: ifNoneMatch } }, + { + $set: { value, _etag: etag, modifiedAt: now }, + $setOnInsert: { key, createdAt: now }, + }, + { upsert: true }, + ); + } catch (err) { + if (isDuplicateKeyError(err)) { + throw new CASMismatchError(DRIVER_NAME, key); + } + throw err; + } + return { etag }; + } + + // Combined ifMatch + ifNoneMatch — both must hold; ifMatch implies + // existence so no upsert is needed. + if (ifNoneMatch === "*" || (ifMatch !== undefined && ifMatch === ifNoneMatch)) { + throw new CASMismatchError(DRIVER_NAME, key); + } + const filter: Record = { key }; + if (ifMatch !== undefined && ifMatch !== "*") { + filter._etag = ifMatch; + } else if (ifNoneMatch !== undefined) { + filter._etag = { $ne: ifNoneMatch }; + } + const r = await col.updateOne(filter, { + $set: { value, _etag: etag, modifiedAt: now }, + }); + if (r.matchedCount === 0) { + throw new CASMismatchError(DRIVER_NAME, key); + } + return { etag }; + }; + return { name: DRIVER_NAME, + flags: { cas: true }, options: opts, getInstance: getMongoCollection, async hasItem(key) { @@ -66,25 +173,28 @@ const driver: DriverFactory = (opts) => { return { key: key, value: resultMap.get(key)?.value ?? null }; }); }, - async setItem(key, value) { - const currentDateTime = new Date(); + async setItem(key, value, tOptions) { + if (tOptions?.ifMatch !== undefined || tOptions?.ifNoneMatch !== undefined) { + return setWithCAS(key, value, tOptions); + } + const now = new Date(); await getMongoCollection().updateOne( { key }, { - $set: { key, value, modifiedAt: currentDateTime }, - $setOnInsert: { createdAt: currentDateTime }, + $set: { key, value, _etag: computeEtag(value), modifiedAt: now }, + $setOnInsert: { createdAt: now }, }, { upsert: true }, ); }, async setItems(items) { - const currentDateTime = new Date(); + const now = new Date(); const operations = items.map(({ key, value }) => ({ updateOne: { filter: { key }, update: { - $set: { key, value, modifiedAt: currentDateTime }, - $setOnInsert: { createdAt: currentDateTime }, + $set: { key, value, _etag: computeEtag(value), modifiedAt: now }, + $setOnInsert: { createdAt: now }, }, upsert: true, }, @@ -107,6 +217,7 @@ const driver: DriverFactory = (opts) => { ? { mtime: document.modifiedAt, birthtime: document.createdAt, + etag: document._etag, } : {}; }, diff --git a/src/drivers/netlify-blobs.ts b/src/drivers/netlify-blobs.ts index e0dd24b85..4ac8392a2 100644 --- a/src/drivers/netlify-blobs.ts +++ b/src/drivers/netlify-blobs.ts @@ -1,4 +1,5 @@ import { createError, createRequiredError, type DriverFactory } from "./utils/index.ts"; +import { CASMismatchError } from "./utils/cas.ts"; import type { GetKeysOptions } from "../types.ts"; import { getStore, getDeployStore } from "@netlify/blobs"; import type { @@ -62,8 +63,57 @@ const driver: DriverFactory = (options) => { return store; }; + // Native conditional write. Maps `ifMatch:` to `onlyIfMatch` and + // `ifNoneMatch:"*"` to `onlyIfNew`. The remaining shapes (`ifMatch:"*"` and + // `ifNoneMatch:`) are emulated by reading current metadata then + // submitting an etag-pinned `onlyIfMatch` write so the precondition is still + // checked atomically server-side. Throws CASMismatchError on failure. + const setWithCAS = async ( + key: string, + value: string | ArrayBuffer | Blob, + opts: { ifMatch?: string; ifNoneMatch?: string } | undefined, + ): Promise<{ etag: string }> => { + const client = getClient(); + const ifMatch = opts?.ifMatch; + const ifNoneMatch = opts?.ifNoneMatch; + + // Native fast paths. + if (ifNoneMatch === "*" && ifMatch === undefined) { + const r = await client.set(key, value as any, { onlyIfNew: true }); + if (!r.modified) throw new CASMismatchError(DRIVER_NAME, key); + return { etag: r.etag ?? "" }; + } + if (ifMatch !== undefined && ifMatch !== "*" && ifNoneMatch === undefined) { + const r = await client.set(key, value as any, { onlyIfMatch: ifMatch }); + if (!r.modified) throw new CASMismatchError(DRIVER_NAME, key); + return { etag: r.etag ?? "" }; + } + + // Emulated paths: derive an etag-pinned write from the current metadata. + const meta = await client.getMetadata(key); + const exists = meta !== null; + const curEtag = meta?.etag; + + if (ifNoneMatch === "*" && exists) throw new CASMismatchError(DRIVER_NAME, key); + if (ifNoneMatch !== undefined && ifNoneMatch !== "*" && exists && curEtag === ifNoneMatch) { + throw new CASMismatchError(DRIVER_NAME, key); + } + if (ifMatch === "*" && !exists) throw new CASMismatchError(DRIVER_NAME, key); + if (ifMatch !== undefined && ifMatch !== "*" && (!exists || curEtag !== ifMatch)) { + throw new CASMismatchError(DRIVER_NAME, key); + } + + const setOpts: SetOptions = exists + ? ({ onlyIfMatch: curEtag } as SetOptions) + : ({ onlyIfNew: true } as SetOptions); + const r = await client.set(key, value as any, setOpts); + if (!r.modified) throw new CASMismatchError(DRIVER_NAME, key); + return { etag: r.etag ?? "" }; + }; + return { name: DRIVER_NAME, + flags: { cas: true }, options, getInstance: getClient, async hasItem(key) { @@ -73,22 +123,30 @@ const driver: DriverFactory = (options) => { // @ts-expect-error has trouble with the overloaded types return getClient().get(key, tops); }, - getMeta(key) { - return getClient().getMetadata(key); + async getMeta(key) { + const m = await getClient().getMetadata(key); + return m ? { ...m.metadata, etag: m.etag } : null; }, getItemRaw(key, topts?: GetOptions) { // @ts-expect-error has trouble with the overloaded types return getClient().get(key, { type: topts?.type ?? "arrayBuffer" }); }, - async setItem(key, value, topts?: SetOptions) { + async setItem(key, value, topts?: SetOptions & { ifMatch?: string; ifNoneMatch?: string }) { + if (topts?.ifMatch !== undefined || topts?.ifNoneMatch !== undefined) { + return setWithCAS(key, value, topts); + } // NOTE: this returns either Promise (pre-v10) or Promise (v10+) - // TODO(serhalp): Allow drivers to return a value from `setItem`. The @netlify/blobs v10 - // functionality isn't usable without this. await getClient().set(key, value, topts); }, - async setItemRaw(key, value: string | ArrayBuffer | Blob, topts?: SetOptions) { + async setItemRaw( + key, + value: string | ArrayBuffer | Blob, + topts?: SetOptions & { ifMatch?: string; ifNoneMatch?: string }, + ) { + if (topts?.ifMatch !== undefined || topts?.ifNoneMatch !== undefined) { + return setWithCAS(key, value, topts); + } // NOTE: this returns either Promise (pre-v10) or Promise (v10+) - // See TODO above. await getClient().set(key, value, topts); }, removeItem(key) { diff --git a/src/drivers/overlay.ts b/src/drivers/overlay.ts index 40fb2bdb2..651f2b410 100644 --- a/src/drivers/overlay.ts +++ b/src/drivers/overlay.ts @@ -13,6 +13,9 @@ const DRIVER_NAME = "overlay"; const driver: DriverFactory = (options) => { return { name: DRIVER_NAME, + // CAS is delegated to the top (writable) layer; preconditions evaluate + // against its state, not the merged overlay view. + flags: { cas: !!options.layers[0]?.flags?.cas }, options: options, async hasItem(key, opts) { for (const layer of options.layers) { @@ -40,10 +43,14 @@ const driver: DriverFactory = (options) => { } return null; }, - // TODO: Support native meta - // async getMeta (key) {}, + async getMeta(key, opts) { + return (await options.layers[0]?.getMeta?.(key, opts)) ?? null; + }, async setItem(key, value, opts) { - await options.layers[0]?.setItem?.(key, value, opts); + return options.layers[0]?.setItem?.(key, value, opts); + }, + async setItemRaw(key, value, opts) { + return options.layers[0]?.setItemRaw?.(key, value, opts); }, async removeItem(key, opts) { await options.layers[0]?.setItem?.(key, OVERLAY_REMOVED, opts); diff --git a/src/drivers/s3.ts b/src/drivers/s3.ts index 170f7aac0..53e8f5f69 100644 --- a/src/drivers/s3.ts +++ b/src/drivers/s3.ts @@ -4,6 +4,7 @@ import { normalizeKey, createError, } from "./utils/index.ts"; +import { CASMismatchError } from "./utils/cas.ts"; import { AwsClient } from "aws4fetch"; export interface S3DriverOptions { @@ -94,13 +95,20 @@ const driver: DriverFactory = (options) => { const url = (key: string = "") => `${baseURL}/${normalizeKey(key, "/")}`; - const awsFetch = async (url: string, opts?: RequestInit) => { + const awsFetch = async ( + url: string, + opts?: RequestInit, + allowedStatuses?: ReadonlySet, + ) => { const request = await getAwsClient().sign(url, opts); const res = await fetch(request); if (!res.ok) { if (res.status === 404) { return null; } + if (allowedStatuses?.has(res.status)) { + return res; + } throw createError( DRIVER_NAME, `[${request.method}] ${url}: ${res.status} ${res.statusText} ${await res.text()}`, @@ -115,13 +123,17 @@ const driver: DriverFactory = (options) => { if (!res) { return null; } - const metaHeaders: HeadersInit = {}; + const metaHeaders: Record = {}; for (const [key, value] of res.headers.entries()) { const match = /x-amz-meta-(.*)/.exec(key); if (match?.[1]) { metaHeaders[match[1]] = value; } } + const etag = stripQuotes(res.headers.get("etag")); + if (etag) { + metaHeaders.etag = etag; + } return metaHeaders; }; @@ -145,16 +157,55 @@ const driver: DriverFactory = (options) => { key: string, value: BufferSource | string, headers?: Record, + allowedStatuses?: ReadonlySet, ) => { - return awsFetch(url(key), { - method: "PUT", - headers: headers - ? (Object.fromEntries( - Object.entries(headers).filter(([_, v]) => v !== undefined), - ) as Record) - : undefined, - body: value, - }); + return awsFetch( + url(key), + { + method: "PUT", + headers: headers + ? (Object.fromEntries( + Object.entries(headers).filter(([_, v]) => v !== undefined), + ) as Record) + : undefined, + body: value, + }, + allowedStatuses, + ); + }; + + // 412 Precondition Failed (If-Match/If-None-Match), 409 Conflict (some + // S3-compatible backends return this for create-only races). + const CAS_FAIL_STATUSES = new Set([412, 409]); + + const putWithPreconditions = async ( + key: string, + value: BufferSource | string, + topts: (S3ItemOptions & { ifMatch?: string; ifNoneMatch?: string }) | undefined, + ): Promise<{ etag: string } | undefined> => { + const wantsCAS = + topts?.ifMatch !== undefined || topts?.ifNoneMatch !== undefined; + const headers: Record = { ...topts?.headers }; + if (topts?.ifNoneMatch !== undefined) { + headers["If-None-Match"] = + topts.ifNoneMatch === "*" ? "*" : `"${topts.ifNoneMatch}"`; + } + if (topts?.ifMatch !== undefined) { + headers["If-Match"] = topts.ifMatch === "*" ? "*" : `"${topts.ifMatch}"`; + } + const res = await putObject( + key, + value, + headers, + wantsCAS ? CAS_FAIL_STATUSES : undefined, + ); + if (wantsCAS && res && CAS_FAIL_STATUSES.has(res.status)) { + throw new CASMismatchError(DRIVER_NAME, key); + } + if (wantsCAS) { + return { etag: stripQuotes(res?.headers.get("etag")) }; + } + return undefined; }; // https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html @@ -188,6 +239,7 @@ const driver: DriverFactory = (options) => { return { name: DRIVER_NAME, + flags: { cas: true }, options, getItem(key) { return getObject(key).then((res) => (res ? res.text() : null)); @@ -195,11 +247,11 @@ const driver: DriverFactory = (options) => { getItemRaw(key) { return getObject(key).then((res) => (res ? res.arrayBuffer() : null)); }, - async setItem(key, value, topts?: S3ItemOptions) { - await putObject(key, value, topts?.headers); + async setItem(key, value, topts?: S3ItemOptions & { ifMatch?: string; ifNoneMatch?: string }) { + return putWithPreconditions(key, value, topts); }, - async setItemRaw(key, value, topts?: S3ItemOptions) { - await putObject(key, value, topts?.headers); + async setItemRaw(key, value, topts?: S3ItemOptions & { ifMatch?: string; ifNoneMatch?: string }) { + return putWithPreconditions(key, value, topts); }, getMeta(key) { return headObject(key); @@ -221,6 +273,11 @@ const driver: DriverFactory = (options) => { // --- utils --- +function stripQuotes(value: string | null | undefined): string { + if (!value) return ""; + return value.startsWith('"') && value.endsWith('"') ? value.slice(1, -1) : value; +} + function deleteKeysReq(keys: string[]) { return `${keys .map((key) => { diff --git a/src/drivers/vercel-blob.ts b/src/drivers/vercel-blob.ts index 478b6b686..35d1476f1 100644 --- a/src/drivers/vercel-blob.ts +++ b/src/drivers/vercel-blob.ts @@ -1,5 +1,6 @@ import * as blob from "@vercel/blob"; import { type DriverFactory, normalizeKey, joinKeys, createError } from "./utils/index.ts"; +import { CASMismatchError } from "./utils/cas.ts"; export interface VercelBlobOptions { /** @@ -47,9 +48,86 @@ const driver: DriverFactory = (opts) => { const get = (key: string) => blob.get(r(key), { token: getToken(), access: opts.access }); + // Vercel Blob exposes ETags and supports `ifMatch` natively on `put()`. The + // `ifNoneMatch: "*"` precondition (create-only) is mapped onto + // `allowOverwrite: false`, which is its default behavior. Other variants + // (`ifMatch: "*"`, `ifNoneMatch: ""`) have no native primitive and + // are emulated with a `head()` pre-check + native ifMatch on the write — + // best-effort across processes (a delete between check and write would + // race), but consistent with the fs/lru-cache CAS pattern. + const writeWithCAS = async ( + key: string, + write: (putOpts: { + ifMatch?: string; + allowOverwrite?: boolean; + }) => Promise<{ etag: string }>, + casOpts: { ifMatch?: string; ifNoneMatch?: string }, + ): Promise<{ etag: string }> => { + const { ifMatch, ifNoneMatch } = casOpts; + + // Atomic create-only: `allowOverwrite: false` is enforced server-side. + if (ifNoneMatch === "*" && ifMatch === undefined) { + try { + return await write({ allowOverwrite: false }); + } catch (err: any) { + if (isPreconditionOrExistsError(err)) { + throw new CASMismatchError(DRIVER_NAME, key); + } + throw err; + } + } + + // Atomic ifMatch:: forwarded directly to Vercel Blob. + if (typeof ifMatch === "string" && ifMatch !== "*" && ifNoneMatch === undefined) { + try { + return await write({ ifMatch, allowOverwrite: true }); + } catch (err: any) { + if (isPreconditionOrExistsError(err)) { + throw new CASMismatchError(DRIVER_NAME, key); + } + throw err; + } + } + + // Emulated paths (`ifMatch:*`, `ifNoneMatch:`, or combinations). + // Best-effort: head() then write. Cross-process races are not prevented. + const head = await blob + .head(r(key), { token: getToken() }) + .catch(() => null); + const exists = !!head; + const curEtag = head?.etag; + + if (ifNoneMatch !== undefined) { + if (ifNoneMatch === "*" ? exists : exists && curEtag === ifNoneMatch) { + throw new CASMismatchError(DRIVER_NAME, key); + } + } + if (ifMatch !== undefined) { + if (ifMatch === "*" ? !exists : !exists || curEtag !== ifMatch) { + throw new CASMismatchError(DRIVER_NAME, key); + } + } + + // If we have a concrete etag to assert, use native ifMatch for atomicity. + const putOpts = + ifMatch && ifMatch !== "*" + ? { ifMatch, allowOverwrite: true as const } + : { allowOverwrite: true as const }; + + try { + return await write(putOpts); + } catch (err: any) { + if (isPreconditionOrExistsError(err)) { + throw new CASMismatchError(DRIVER_NAME, key); + } + throw err; + } + }; + return { name: DRIVER_NAME, options: opts, + flags: { cas: true }, async hasItem(key: string) { try { await blob.head(r(key), { token: getToken() }); @@ -80,20 +158,40 @@ const driver: DriverFactory = (opts) => { } }, async setItem(key, value, callOpts) { - await blob.put(r(key), value, { - access: opts.access, - addRandomSuffix: false, - token: getToken(), - ...callOpts, - }); + const wantsCAS = + callOpts?.ifMatch !== undefined || callOpts?.ifNoneMatch !== undefined; + const doPut = (extra: { ifMatch?: string; allowOverwrite?: boolean }) => + blob + .put(r(key), value, { + access: opts.access, + addRandomSuffix: false, + token: getToken(), + ...callOpts, + ...extra, + }) + .then((res) => ({ etag: res.etag })); + if (wantsCAS) { + return writeWithCAS(key, doPut, callOpts); + } + await doPut({}); }, async setItemRaw(key, value, callOpts) { - await blob.put(r(key), value, { - access: opts.access, - addRandomSuffix: false, - token: getToken(), - ...callOpts, - }); + const wantsCAS = + callOpts?.ifMatch !== undefined || callOpts?.ifNoneMatch !== undefined; + const doPut = (extra: { ifMatch?: string; allowOverwrite?: boolean }) => + blob + .put(r(key), value, { + access: opts.access, + addRandomSuffix: false, + token: getToken(), + ...callOpts, + ...extra, + }) + .then((res) => ({ etag: res.etag })); + if (wantsCAS) { + return writeWithCAS(key, doPut, callOpts); + } + await doPut({}); }, async removeItem(key: string) { await blob.del(r(key), { token: getToken() }); @@ -142,3 +240,24 @@ const driver: DriverFactory = (opts) => { }; export default driver; + +// --- Internal helpers --- + +// Detects a CAS-relevant failure from `@vercel/blob`. We avoid `instanceof` +// against the SDK's classes so the check is resilient across SDK versions +// and dual-bundle (ESM/CJS) duplication. Two cases produce a CAS mismatch: +// - native `BlobPreconditionFailedError` (server-side `ifMatch` rejection) +// - `allowOverwrite: false` collision (currently surfaced as a generic +// `BlobError` whose message mentions the conflict) +function isPreconditionOrExistsError(err: unknown): boolean { + if (!err || typeof err !== "object") return false; + const name = (err as { name?: string }).name; + if (name === "BlobPreconditionFailedError") return true; + const message = (err as { message?: string }).message ?? ""; + return ( + /precondition/i.test(message) || + /etag mismatch/i.test(message) || + /already exists/i.test(message) || + /overwrite/i.test(message) + ); +} diff --git a/src/server.ts b/src/server.ts index f83bd1e5e..3a2543ba2 100644 --- a/src/server.ts +++ b/src/server.ts @@ -1,7 +1,8 @@ -import type { Storage, TransactionOptions, StorageMeta } from "./types.ts"; +import type { Storage, TransactionOptions, StorageMeta, SetItemResult } from "./types.ts"; import { H3Event, HTTPError, defineHandler } from "h3"; import { stringify } from "./_utils.ts"; import { normalizeKey, normalizeBaseKey } from "./utils.ts"; +import { CASMismatchError, CASUnsupportedError } from "./errors.ts"; export type StorageServerRequest = { request: globalThis.Request; @@ -31,7 +32,11 @@ export type FetchHandler = ( * The storage server will handle HEAD, GET, PUT and DELETE requests. * - HEAD: Return if the request item exists in the storage, including a last-modified header if the storage supports it and the meta is stored * - GET: Return the item if it exists - * - PUT: Sets the item + * - PUT: Sets the item. Honors `If-Match` / `If-None-Match` precondition + * headers (HTTP-style CAS). Returns `412 Precondition Failed` on CAS + * mismatch and `501 Not Implemented` if the underlying driver doesn't + * support CAS. On success, sets the `ETag` response header when the + * driver returns one. * - DELETE: Removes the item (or clears the whole storage if the base key was used) * * If the request sets the `Accept` header to `application/octet-stream`, the server will handle the item as raw data. @@ -89,7 +94,7 @@ export function createStorageHandler( return isRaw ? driverValue : stringify(driverValue); } - // HEAD => hasItem + meta (mtime, ttl) + // HEAD => hasItem + meta (mtime, ttl, etag) if (event.req.method === "HEAD") { if (!(await storage.hasItem(key))) { throw new HTTPError({ @@ -101,20 +106,48 @@ export function createStorageHandler( return ""; } - // PUT => setItem + // PUT => setItem (with optional If-Match / If-None-Match CAS) if (event.req.method === "PUT") { const isRaw = event.req.headers.get("content-type") === "application/octet-stream"; + const ifMatch = parseConditionHeader(event.req.headers.get("if-match")); + const ifNoneMatch = parseConditionHeader(event.req.headers.get("if-none-match")); const topts: TransactionOptions = { ttl: Number(event.req.headers.get("x-ttl")) || undefined, + ifMatch, + ifNoneMatch, }; - if (isRaw) { - const value = await event.req.bytes(); - await storage.setItemRaw(key, value, topts); - } else { - const value = await event.req.text(); - if (value !== undefined) { - await storage.setItem(key, value, topts); + try { + let result: void | SetItemResult | undefined; + if (isRaw) { + const value = await event.req.bytes(); + result = await storage.setItemRaw(key, value, topts); + } else { + const value = await event.req.text(); + if (value !== undefined) { + result = await storage.setItem(key, value, topts); + } } + if (result && (result as SetItemResult).etag) { + event.res.headers.set("etag", formatEtag((result as SetItemResult).etag!)); + } + } catch (error: any) { + if (CASMismatchError.is(error)) { + throw new HTTPError({ + status: 412, + statusText: "Precondition Failed", + message: error.message, + cause: error, + }); + } + if (CASUnsupportedError.is(error)) { + throw new HTTPError({ + status: 501, + statusText: "Not Implemented", + message: error.message, + cause: error, + }); + } + throw error; } return "OK"; } @@ -142,4 +175,25 @@ function setMetaHeaders(event: H3Event, meta: StorageMeta) { event.res.headers.set("x-ttl", `${meta.ttl}`); event.res.headers.set("cache-control", `max-age=${meta.ttl}`); } + if (meta.etag) { + event.res.headers.set("etag", formatEtag(meta.etag)); + } +} + +// Parse a single-value `If-Match` / `If-None-Match` header into the value +// understood by drivers (`*` or the bare etag with surrounding quotes +// stripped). We don't support multi-value lists or weak validators (`W/"..."`) +// — the storage CAS contract is exact equality. +function parseConditionHeader(raw: string | null | undefined): string | undefined { + if (!raw) return undefined; + const v = raw.trim(); + if (v === "*") return "*"; + if (v.length >= 2 && v.startsWith('"') && v.endsWith('"')) { + return v.slice(1, -1); + } + return v; +} + +function formatEtag(etag: string): string { + return etag === "*" || (etag.startsWith('"') && etag.endsWith('"')) ? etag : `"${etag}"`; } diff --git a/test/drivers/cloudflare-r2-binding.test.ts b/test/drivers/cloudflare-r2-binding.test.ts index b50d5cd04..dcfa1020b 100644 --- a/test/drivers/cloudflare-r2-binding.test.ts +++ b/test/drivers/cloudflare-r2-binding.test.ts @@ -15,6 +15,7 @@ describe("drivers: cloudflare-r2-binding", async () => { testDriver({ driver: CloudflareR2Binding({ base: "base" }), + supportsCAS: true, async additionalTests(ctx) { test("snapshot", async () => { await ctx.storage.setItem("s1:a", "test_data"); diff --git a/test/drivers/db0.test.ts b/test/drivers/db0.test.ts index cb53e2322..ad2942666 100644 --- a/test/drivers/db0.test.ts +++ b/test/drivers/db0.test.ts @@ -53,6 +53,7 @@ for (const driver of drivers) { testDriver({ driver: () => db0Driver({ database: db }), + supportsCAS: true, additionalTests: (ctx) => { it("meta", async () => { await ctx.storage.setItem("meta:test", "test_data"); diff --git a/test/drivers/deno-kv-node.test.ts b/test/drivers/deno-kv-node.test.ts index 629f0493a..098f6cad0 100644 --- a/test/drivers/deno-kv-node.test.ts +++ b/test/drivers/deno-kv-node.test.ts @@ -8,5 +8,6 @@ describe("drivers: deno-kv-node", async () => { path: ":memory:", base: Math.round(Math.random() * 1_000_000).toString(16), }), + supportsCAS: true, }); }); diff --git a/test/drivers/deno-kv.test.ts b/test/drivers/deno-kv.test.ts index 0c396bc2d..8b433ca71 100644 --- a/test/drivers/deno-kv.test.ts +++ b/test/drivers/deno-kv.test.ts @@ -32,5 +32,6 @@ describe.skipIf(!hasDeno)("drivers: deno-kv", async () => { driver: httpDriver({ base: `http://localhost:${randomPort}`, }), + supportsCAS: true, }); }); diff --git a/test/drivers/http.test.ts b/test/drivers/http.test.ts index 5ac7cfc50..0dd76a6e7 100644 --- a/test/drivers/http.test.ts +++ b/test/drivers/http.test.ts @@ -29,6 +29,7 @@ describe("drivers: http", async () => { }); testDriver({ + supportsCAS: true, driver: driver({ base: listener!.url!, headers: { "x-global-header": "1" }, diff --git a/test/drivers/mongodb.test.ts b/test/drivers/mongodb.test.ts index e859778e5..c60f7a823 100644 --- a/test/drivers/mongodb.test.ts +++ b/test/drivers/mongodb.test.ts @@ -22,6 +22,7 @@ describe("drivers: mongodb", async () => { databaseName: "test", collectionName: "test", }), + supportsCAS: true, additionalTests: (ctx) => { it("should throw error if no connection string is provided", async () => { await expect(() => diff --git a/test/drivers/netlify-blobs.test.ts b/test/drivers/netlify-blobs.test.ts index e19104734..1d7ea4b27 100644 --- a/test/drivers/netlify-blobs.test.ts +++ b/test/drivers/netlify-blobs.test.ts @@ -23,7 +23,13 @@ describe("drivers: netlify-blobs", async () => { await server.start(); }); + // The in-process BlobsServer mock does not echo the etag header on + // HEAD/getMetadata responses (only on PUT and list), so the etag-readback + // assertions are skipped here. The driver itself surfaces the etag + // correctly against the real Netlify Blobs API. testDriver({ + supportsCAS: true, + casNoMetaEtag: true, driver: driver({ name: "test", edgeURL: `http://localhost:8971`, @@ -33,6 +39,8 @@ describe("drivers: netlify-blobs", async () => { }); testDriver({ + supportsCAS: true, + casNoMetaEtag: true, driver: driver({ deployScoped: true, edgeURL: `http://localhost:8971`, diff --git a/test/drivers/overlay.test.ts b/test/drivers/overlay.test.ts index 3bdb2c696..d34fb110f 100644 --- a/test/drivers/overlay.test.ts +++ b/test/drivers/overlay.test.ts @@ -6,6 +6,7 @@ import { testDriver } from "./utils.ts"; describe("drivers: overlay", () => { const [s1, s2] = [memory(), memory()]; testDriver({ + supportsCAS: true, driver: driver({ layers: [s1, s2], }), diff --git a/test/drivers/s3.test.ts b/test/drivers/s3.test.ts index 5e5c935df..683584a28 100644 --- a/test/drivers/s3.test.ts +++ b/test/drivers/s3.test.ts @@ -21,6 +21,7 @@ describe.skipIf(!accessKeyId || !secretAccessKey || !bucket || !endpoint || !reg endpoint: endpoint!, region: region!, }), + supportsCAS: true, additionalTests(ctx) { it("can access directly with / separator", async () => { await ctx.storage.set("foo/bar:baz", "ok"); diff --git a/test/drivers/utils.ts b/test/drivers/utils.ts index 75cd5d9a9..147ea6ee1 100644 --- a/test/drivers/utils.ts +++ b/test/drivers/utils.ts @@ -18,6 +18,13 @@ export interface TestOptions { noKeysSupport?: boolean; /** Driver supports `ifMatch`/`ifNoneMatch` preconditions. */ supportsCAS?: boolean; + /** + * Backend supports CAS preconditions but the test environment (e.g. an + * in-process mock server) does not expose an etag through `getMeta`. Skips + * the etag-readback assertions that would otherwise verify + * `getMeta().etag === setItem().etag`. + */ + casNoMetaEtag?: boolean; additionalTests?: (ctx: TestContext) => void; } @@ -221,7 +228,7 @@ export function testDriver(opts: TestOptions): void { expect(await ctx.storage.getItem("cas:create")).toBe("first"); }); - it("CAS: ifMatch: swaps only when version matches", async () => { + it.skipIf(opts.casNoMetaEtag)("CAS: ifMatch: swaps only when version matches", async () => { await ctx.storage.setItem("cas:swap", "v1"); const meta1 = await ctx.storage.getMeta("cas:swap"); expect(meta1.etag).toBeTruthy(); @@ -247,7 +254,7 @@ export function testDriver(opts: TestOptions): void { expect(await ctx.storage.getItem("cas:absent")).toBe("y"); }); - it("CAS: getMeta returns etag after write", async () => { + it.skipIf(opts.casNoMetaEtag)("CAS: getMeta returns etag after write", async () => { const r = await ctx.storage.setItem("cas:meta", "hello", { ifNoneMatch: "*" }); const meta = await ctx.storage.getMeta("cas:meta"); expect(meta.etag).toBe((r as { etag: string }).etag); diff --git a/test/drivers/vercel-blob.test.ts b/test/drivers/vercel-blob.test.ts index f2d08636e..b9613df11 100644 --- a/test/drivers/vercel-blob.test.ts +++ b/test/drivers/vercel-blob.test.ts @@ -13,6 +13,7 @@ describe.skipIf(!token)("drivers: vercel-blob (public)", async () => { base: Math.round(Math.random() * 1_000_000).toString(16), envPrefix: "VERCEL_TEST", }), + supportsCAS: true, }); }); @@ -27,5 +28,6 @@ describe.skipIf(!privateToken)("drivers: vercel-blob (private)", async () => { base: Math.round(Math.random() * 1_000_000).toString(16), envPrefix: "VERCEL_TEST_PRIVATE", }), + supportsCAS: true, }); }); From 5222a542efcf07a42f96ca5107b7c7046414b5bf Mon Sep 17 00:00:00 2001 From: Pooya Parsa Date: Tue, 5 May 2026 09:33:36 +0200 Subject: [PATCH 3/5] feat: extend CAS support to azure drivers and planetscale - azure-cosmos: items.create + replace with IfMatch accessCondition - azure-storage-blob: native conditions.ifMatch / ifNoneMatch - azure-storage-table: createEntity + updateEntity with etag - azure-app-configuration: addConfigurationSetting + setConfigurationSetting onlyIfUnchanged - planetscale: SELECT-then-write under per-key lock; etag column Test files for these drivers are describe.skip; verified compile + build only. --- src/drivers/azure-app-configuration.ts | 79 +++++++++++- src/drivers/azure-cosmos.ts | 123 ++++++++++++++++++- src/drivers/azure-storage-blob.ts | 51 +++++++- src/drivers/azure-storage-table.ts | 76 +++++++++++- src/drivers/planetscale.ts | 93 +++++++++++++- test/drivers/azure-app-configuration.test.ts | 1 + test/drivers/azure-cosmos.test.ts | 1 + test/drivers/azure-storage-blob.test.ts | 1 + test/drivers/azure-storage-table.test.ts | 1 + 9 files changed, 406 insertions(+), 20 deletions(-) diff --git a/src/drivers/azure-app-configuration.ts b/src/drivers/azure-app-configuration.ts index 9a073fa11..2da0e9069 100644 --- a/src/drivers/azure-app-configuration.ts +++ b/src/drivers/azure-app-configuration.ts @@ -1,4 +1,5 @@ import { type DriverFactory, createRequiredError } from "./utils/index.ts"; +import { CASMismatchError } from "./utils/cas.ts"; import { AppConfigurationClient } from "@azure/app-configuration"; import { DefaultAzureCredential } from "@azure/identity"; @@ -60,8 +61,70 @@ const driver: DriverFactory => { + const k = p(key); + const label = opts.label; + const c = getClient(); + const { ifMatch, ifNoneMatch } = tOptions; + try { + // Create-only: ifNoneMatch:"*" + if (ifNoneMatch === "*" && ifMatch === undefined) { + const result = await c.addConfigurationSetting({ key: k, value, label }); + return { etag: result.etag }; + } + // Swap by etag: ifMatch: (no ifNoneMatch, or harmless ifNoneMatch:"*") + if ( + ifMatch !== undefined && + ifMatch !== "*" && + (ifNoneMatch === undefined || ifNoneMatch === "*") + ) { + const result = await c.setConfigurationSetting( + { key: k, value, label, etag: ifMatch }, + { onlyIfUnchanged: true }, + ); + return { etag: result.etag }; + } + // Remaining cases (ifMatch:"*", ifNoneMatch:, or combinations) + // are emulated via read-then-conditional-set. + const current = await c.getConfigurationSetting({ key: k, label }).catch(() => null); + const exists = !!current; + const curEtag = current?.etag; + let mismatch = false; + if (ifNoneMatch !== undefined) { + mismatch = + ifNoneMatch === "*" ? exists : exists && curEtag === ifNoneMatch; + } + if (!mismatch && ifMatch !== undefined) { + mismatch = ifMatch === "*" ? !exists : !exists || curEtag !== ifMatch; + } + if (mismatch) { + throw new CASMismatchError(DRIVER_NAME, key); + } + if (exists) { + const result = await c.setConfigurationSetting( + { key: k, value, label, etag: curEtag }, + { onlyIfUnchanged: true }, + ); + return { etag: result.etag }; + } + const result = await c.addConfigurationSetting({ key: k, value, label }); + return { etag: result.etag }; + } catch (err: any) { + if (CASMismatchError.is(err)) throw err; + if (err?.statusCode === 412 || err?.statusCode === 409) { + throw new CASMismatchError(DRIVER_NAME, key); + } + throw err; + } + }; + return { name: DRIVER_NAME, + flags: { cas: true }, options: opts, getInstance: getClient, async hasItem(key) { @@ -86,7 +149,10 @@ const driver: DriverFactory null); + if (!setting) return null; return { mtime: setting.lastModified, etag: setting.etag, diff --git a/src/drivers/azure-cosmos.ts b/src/drivers/azure-cosmos.ts index c0d144d45..11c8c2b2a 100644 --- a/src/drivers/azure-cosmos.ts +++ b/src/drivers/azure-cosmos.ts @@ -1,4 +1,5 @@ import { createRequiredError, type DriverFactory } from "./utils/index.ts"; +import { CASMismatchError } from "./utils/cas.ts"; import { Container, CosmosClient } from "@azure/cosmos"; import { DefaultAzureCredential } from "@azure/identity"; @@ -43,8 +44,16 @@ export interface AzureCosmosItem { * The unstorage mtime metadata of the item. */ modified: string | Date; + + /** + * Cosmos-managed etag (read-only on the server side). + */ + _etag?: string; } +const isStatus = (err: unknown, status: number): boolean => + !!err && typeof err === "object" && (err as { code?: number | string }).code === status; + const driver: DriverFactory> = (opts) => { let client: Container; const getCosmosClient = async () => { @@ -83,8 +92,113 @@ const driver: DriverFactory> = (opts) => return client; }; + const setWithCAS = async ( + key: string, + value: string, + tOptions: { ifMatch?: string; ifNoneMatch?: string }, + ): Promise<{ etag: string }> => { + const container = await getCosmosClient(); + const modified = new Date(); + const body: AzureCosmosItem = { id: key, value, modified }; + const { ifMatch, ifNoneMatch } = tOptions; + + // ifNoneMatch:* — create-only via items.create (409 Conflict on collision). + if (ifNoneMatch === "*" && ifMatch === undefined) { + try { + const res = await container.items.create(body, { + consistencyLevel: "Session", + }); + return { etag: res.resource?._etag ?? res.etag }; + } catch (err) { + if (isStatus(err, 409)) throw new CASMismatchError(DRIVER_NAME, key); + throw err; + } + } + + // ifMatch: — replace with IfMatch precondition (412 on mismatch, 404 if absent). + if (ifMatch !== undefined && ifMatch !== "*" && ifNoneMatch === undefined) { + try { + const res = await container.item(key).replace(body, { + accessCondition: { type: "IfMatch", condition: ifMatch }, + consistencyLevel: "Session", + }); + return { etag: res.resource?._etag ?? res.etag }; + } catch (err) { + if (isStatus(err, 412) || isStatus(err, 404)) { + throw new CASMismatchError(DRIVER_NAME, key); + } + throw err; + } + } + + // ifMatch:* — require existence; replace without etag pinning (404 if absent). + if (ifMatch === "*" && ifNoneMatch === undefined) { + try { + const res = await container.item(key).replace(body, { + consistencyLevel: "Session", + }); + return { etag: res.resource?._etag ?? res.etag }; + } catch (err) { + if (isStatus(err, 404)) throw new CASMismatchError(DRIVER_NAME, key); + throw err; + } + } + + // Remaining shapes (ifNoneMatch:, combined): read-then-conditional-replace. + // Cosmos accessCondition only supports a single header per request, so combined + // preconditions and "ifNoneMatch:" are evaluated client-side, then the + // write is pinned to the observed etag for atomicity. + const existing = await container + .item(key) + .read() + .catch((err) => { + if (isStatus(err, 404)) { + return { resource: undefined as AzureCosmosItem | undefined, etag: "" }; + } + throw err; + }); + const exists = !!existing.resource; + const curEtag = existing.resource?._etag; + + if (ifNoneMatch !== undefined) { + const mismatch = + ifNoneMatch === "*" ? exists : exists && curEtag === ifNoneMatch; + if (mismatch) throw new CASMismatchError(DRIVER_NAME, key); + } + if (ifMatch !== undefined) { + const mismatch = + ifMatch === "*" ? !exists : !exists || curEtag !== ifMatch; + if (mismatch) throw new CASMismatchError(DRIVER_NAME, key); + } + + if (!exists) { + try { + const res = await container.items.create(body, { + consistencyLevel: "Session", + }); + return { etag: res.resource?._etag ?? res.etag }; + } catch (err) { + if (isStatus(err, 409)) throw new CASMismatchError(DRIVER_NAME, key); + throw err; + } + } + try { + const res = await container.item(key).replace(body, { + accessCondition: { type: "IfMatch", condition: curEtag! }, + consistencyLevel: "Session", + }); + return { etag: res.resource?._etag ?? res.etag }; + } catch (err) { + if (isStatus(err, 412) || isStatus(err, 404)) { + throw new CASMismatchError(DRIVER_NAME, key); + } + throw err; + } + }; + return { name: DRIVER_NAME, + flags: { cas: true }, options: opts, getInstance: getCosmosClient, async hasItem(key) { @@ -95,7 +209,10 @@ const driver: DriverFactory> = (opts) => const item = await (await getCosmosClient()).item(key).read(); return item.resource ? item.resource.value : null; }, - async setItem(key, value) { + async setItem(key, value, tOptions) { + if (tOptions?.ifMatch !== undefined || tOptions?.ifNoneMatch !== undefined) { + return setWithCAS(key, value, tOptions); + } const modified = new Date(); await ( await getCosmosClient() @@ -119,8 +236,10 @@ const driver: DriverFactory> = (opts) => }, async getMeta(key) { const item = await (await getCosmosClient()).item(key).read(); + if (!item.resource) return null; return { - mtime: item.resource?.modified ? new Date(item.resource.modified) : undefined, + mtime: item.resource.modified ? new Date(item.resource.modified) : undefined, + etag: item.resource._etag, }; }, async clear() { diff --git a/src/drivers/azure-storage-blob.ts b/src/drivers/azure-storage-blob.ts index ebaccbfad..53a5029e7 100644 --- a/src/drivers/azure-storage-blob.ts +++ b/src/drivers/azure-storage-blob.ts @@ -1,4 +1,5 @@ import { createError, type DriverFactory } from "./utils/index.ts"; +import { CASMismatchError } from "./utils/cas.ts"; import { BlobServiceClient, ContainerClient, @@ -97,8 +98,38 @@ const driver: DriverFactory = (opts) = return containerClient; }; + const uploadWithCAS = async ( + key: string, + value: any, + length: number, + topts: { ifMatch?: string; ifNoneMatch?: string } | undefined, + ): Promise<{ etag: string } | undefined> => { + const wantsCAS = + topts?.ifMatch !== undefined || topts?.ifNoneMatch !== undefined; + const conditions: { ifMatch?: string; ifNoneMatch?: string } = {}; + if (topts?.ifMatch !== undefined) { + conditions.ifMatch = topts.ifMatch === "*" ? "*" : `"${topts.ifMatch}"`; + } + if (topts?.ifNoneMatch !== undefined) { + conditions.ifNoneMatch = + topts.ifNoneMatch === "*" ? "*" : `"${topts.ifNoneMatch}"`; + } + try { + const res = await getContainerClient() + .getBlockBlobClient(key) + .upload(value, length, wantsCAS ? { conditions } : undefined); + return wantsCAS ? { etag: stripQuotes(res.etag) } : undefined; + } catch (err: any) { + if (wantsCAS && (err?.statusCode === 412 || err?.statusCode === 409)) { + throw new CASMismatchError(DRIVER_NAME, key); + } + throw err; + } + }; + return { name: DRIVER_NAME, + flags: { cas: true }, options: opts, getInstance: getContainerClient, async hasItem(key) { @@ -128,11 +159,11 @@ const driver: DriverFactory = (opts) = return null; } }, - async setItem(key, value) { - await getContainerClient().getBlockBlobClient(key).upload(value, Buffer.byteLength(value)); + async setItem(key, value, topts?: { ifMatch?: string; ifNoneMatch?: string }) { + return uploadWithCAS(key, value, Buffer.byteLength(value), topts); }, - async setItemRaw(key, value) { - await getContainerClient().getBlockBlobClient(key).upload(value, Buffer.byteLength(value)); + async setItemRaw(key, value, topts?: { ifMatch?: string; ifNoneMatch?: string }) { + return uploadWithCAS(key, value, Buffer.byteLength(value), topts); }, async removeItem(key) { await getContainerClient() @@ -149,11 +180,16 @@ const driver: DriverFactory = (opts) = return keys; }, async getMeta(key) { - const blobProperties = await getContainerClient().getBlockBlobClient(key).getProperties(); + const blobProperties = await getContainerClient() + .getBlockBlobClient(key) + .getProperties() + .catch(() => null); + if (!blobProperties) return null; return { mtime: blobProperties.lastModified, atime: blobProperties.lastAccessed, cr: blobProperties.createdOn, + etag: stripQuotes(blobProperties.etag), ...blobProperties.metadata, }; }, @@ -175,6 +211,11 @@ const driver: DriverFactory = (opts) = const isBrowser = typeof window !== "undefined"; +function stripQuotes(value: string | null | undefined): string { + if (!value) return ""; + return value.startsWith('"') && value.endsWith('"') ? value.slice(1, -1) : value; +} + // Helper function to read a Node.js readable stream into a Buffer. (https://github.com/Azure/azure-sdk-for-js/tree/main/sdk/storage/storage-blob) async function streamToBuffer(readableStream: NodeJS.ReadableStream): Promise { return new Promise((resolve, reject) => { diff --git a/src/drivers/azure-storage-table.ts b/src/drivers/azure-storage-table.ts index 13062a54f..7fc3f3d8e 100644 --- a/src/drivers/azure-storage-table.ts +++ b/src/drivers/azure-storage-table.ts @@ -1,4 +1,5 @@ import { createError, createRequiredError, type DriverFactory } from "./utils/index.ts"; +import { CASMismatchError } from "./utils/cas.ts"; import { TableClient, AzureNamedKeyCredential, @@ -99,8 +100,75 @@ const driver: DriverFactory = (opts) => { return client; }; + // CAS write path. Native ETag-based optimistic concurrency: + // - createEntity → 409 Conflict on existing rowKey (ifNoneMatch:"*") + // - updateEntity(..., { etag: }) → 412 on miss (ifMatch:) + // - ifNoneMatch: has no native equivalent; emulated as + // read+conditional-update (NOT atomic — a concurrent writer between the + // read and the update can slip through unnoticed). + const setWithCAS = async ( + key: string, + value: unknown, + tOptions: { ifMatch?: string; ifNoneMatch?: string }, + ): Promise<{ etag: string }> => { + const c = getClient(); + const entity: TableEntity = { partitionKey, rowKey: key, unstorageValue: value }; + const { ifMatch, ifNoneMatch } = tOptions; + try { + if (ifNoneMatch === "*" && ifMatch === undefined) { + const r = await c.createEntity(entity); + return { etag: r.etag as string }; + } + if (ifMatch !== undefined && ifNoneMatch === undefined) { + // SDK accepts "*" as wildcard match — same code path as exact etag. + const r = await c.updateEntity(entity, "Replace", { etag: ifMatch }); + return { etag: r.etag as string }; + } + if (ifNoneMatch !== undefined && ifMatch === undefined) { + // Non-atomic emulation of ifNoneMatch:: read current, then + // conditional update on the read etag. Race window is unavoidable + // without server-side support. + const cur = await c.getEntity(partitionKey, key).catch(() => null); + if (cur && cur.etag === ifNoneMatch) { + throw new CASMismatchError(DRIVER_NAME, key); + } + if (cur) { + const r = await c.updateEntity(entity, "Replace", { etag: cur.etag }); + return { etag: r.etag as string }; + } + const r = await c.createEntity(entity); + return { etag: r.etag as string }; + } + // Combined ifMatch + ifNoneMatch: ifMatch implies existence, so do the + // conditional update and post-check the resulting etag against ifNoneMatch. + if (ifMatch !== undefined && ifNoneMatch !== undefined) { + if (ifMatch !== "*" && ifMatch === ifNoneMatch) { + throw new CASMismatchError(DRIVER_NAME, key); + } + const cur = await c.getEntity(partitionKey, key).catch(() => null); + if (!cur) throw new CASMismatchError(DRIVER_NAME, key); + if (ifNoneMatch === "*" || cur.etag === ifNoneMatch) { + throw new CASMismatchError(DRIVER_NAME, key); + } + const r = await c.updateEntity(entity, "Replace", { etag: ifMatch }); + return { etag: r.etag as string }; + } + // Unreachable: caller filters wantsCAS before delegating here. + const r = await c.upsertEntity(entity, "Replace"); + return { etag: (r as { etag?: string }).etag as string }; + } catch (err) { + if (CASMismatchError.is(err)) throw err; + const status = (err as { statusCode?: number }).statusCode; + if (status === 412 || status === 409) { + throw new CASMismatchError(DRIVER_NAME, key); + } + throw err; + } + }; + return { name: DRIVER_NAME, + flags: { cas: true }, options: opts, getInstance: getClient, async hasItem(key) { @@ -119,7 +187,10 @@ const driver: DriverFactory = (opts) => { return null; } }, - async setItem(key, value) { + async setItem(key, value, tOptions) { + if (tOptions?.ifMatch !== undefined || tOptions?.ifNoneMatch !== undefined) { + return setWithCAS(key, value, tOptions); + } const entity: TableEntity = { partitionKey, rowKey: key, @@ -140,7 +211,8 @@ const driver: DriverFactory = (opts) => { return keys; }, async getMeta(key) { - const entity = await getClient().getEntity(partitionKey, key); + const entity = await getClient().getEntity(partitionKey, key).catch(() => null); + if (!entity) return null; return { mtime: entity.timestamp ? new Date(entity.timestamp) : undefined, etag: entity.etag, diff --git a/src/drivers/planetscale.ts b/src/drivers/planetscale.ts index 96488136d..3197ea6ff 100644 --- a/src/drivers/planetscale.ts +++ b/src/drivers/planetscale.ts @@ -1,4 +1,6 @@ +import { createHash } from "node:crypto"; import { createRequiredError, type DriverFactory } from "./utils/index.ts"; +import { checkCAS } from "./utils/cas.ts"; import type { ExecutedQuery, Connection } from "@planetscale/database"; import { connect } from "@planetscale/database"; @@ -11,12 +13,20 @@ export interface PlanetscaleDriverOptions { interface TableSchema { id: string; value: string; + etag: string | null; created_at: Date; updated_at: Date; } const DRIVER_NAME = "planetscale"; +const computeEtag = (value: unknown): string => { + const buf = Buffer.isBuffer(value) + ? value + : Buffer.from(typeof value === "string" ? value : String(value)); + return createHash("sha1").update(buf).digest("hex"); +}; + const driver: DriverFactory = (opts = {}) => { opts.table = opts.table || "storage"; @@ -37,12 +47,59 @@ const driver: DriverFactory = (opts = {}) console.error("[unstorage] [planetscale] Failed to enable cached queries:", error); }); } + // Best-effort additive migration for tables created before the etag column. + _connection + .execute(`ALTER TABLE ${opts.table} ADD COLUMN etag VARCHAR(64);`) + .catch(() => { + // Column already exists or table not yet created — safe to ignore. + }); } return _connection; }; + // Per-key in-process serialization for CAS writes. @planetscale/database has + // no portable transaction primitive across shards, so we serialize + // SELECT-then-write sequences in-process. Cross-process correctness is not + // guaranteed. + const writeLocks = new Map>(); + const withLock = async (key: string, fn: () => Promise): Promise => { + const previous = writeLocks.get(key) || Promise.resolve(); + let release!: () => void; + const next = new Promise((r) => { + release = r; + }); + writeLocks.set( + key, + previous.then(() => next), + ); + await previous; + try { + return await fn(); + } finally { + release(); + if (writeLocks.get(key) === next) { + writeLocks.delete(key); + } + } + }; + + const readState = async ( + key: string, + ): Promise<{ exists: boolean; etag?: string }> => { + const res = await getConnection().execute( + `SELECT etag from ${opts.table} WHERE id=:key;`, + { key }, + ); + const row = rows(res)[0]; + if (!row) { + return { exists: false }; + } + return { exists: true, etag: row.etag ?? undefined }; + }; + return { name: DRIVER_NAME, + flags: { cas: true }, options: opts, getInstance: getConnection, hasItem: async (key) => { @@ -58,10 +115,32 @@ const driver: DriverFactory = (opts = {}) }); return rows(res)[0]?.value ?? null; }, - setItem: async (key, value) => { + setItem: async (key, value, tOptions) => { + const wantsCAS = + tOptions?.ifMatch !== undefined || tOptions?.ifNoneMatch !== undefined; + if (wantsCAS) { + return withLock(key, async () => { + const { exists, etag: currentEtag } = await readState(key); + checkCAS(DRIVER_NAME, key, { exists, etag: currentEtag }, tOptions); + const newEtag = computeEtag(value); + if (exists) { + await getConnection().execute( + `UPDATE ${opts.table} SET value = :value, etag = :etag WHERE id = :key;`, + { key, value, etag: newEtag }, + ); + } else { + await getConnection().execute( + `INSERT INTO ${opts.table} (id, value, etag) VALUES (:key, :value, :etag);`, + { key, value, etag: newEtag }, + ); + } + return { etag: newEtag }; + }); + } + const etag = computeEtag(value); await getConnection().execute( - `INSERT INTO ${opts.table} (id, value) VALUES (:key, :value) ON DUPLICATE KEY UPDATE value = :value;`, - { key, value }, + `INSERT INTO ${opts.table} (id, value, etag) VALUES (:key, :value, :etag) ON DUPLICATE KEY UPDATE value = :value, etag = :etag;`, + { key, value, etag }, ); }, removeItem: async (key) => { @@ -69,12 +148,14 @@ const driver: DriverFactory = (opts = {}) }, getMeta: async (key) => { const res = await getConnection().execute( - `SELECT created_at, updated_at from ${opts.table} WHERE id=:key;`, + `SELECT etag, created_at, updated_at from ${opts.table} WHERE id=:key;`, { key }, ); + const row = rows(res)[0]; return { - birthtime: rows(res)[0]?.created_at, - mtime: rows(res)[0]?.updated_at, + birthtime: row?.created_at, + mtime: row?.updated_at, + etag: row?.etag ?? undefined, }; }, getKeys: async (base = "") => { diff --git a/test/drivers/azure-app-configuration.test.ts b/test/drivers/azure-app-configuration.test.ts index 223651bad..4b71c3b11 100644 --- a/test/drivers/azure-app-configuration.test.ts +++ b/test/drivers/azure-app-configuration.test.ts @@ -4,6 +4,7 @@ import { testDriver } from "./utils.ts"; describe.skip("drivers: azure-app-configuration", () => { testDriver({ + supportsCAS: true, driver: driver({ appConfigName: "unstoragetest", label: "dev", diff --git a/test/drivers/azure-cosmos.test.ts b/test/drivers/azure-cosmos.test.ts index 7c36433f5..2e84199b0 100644 --- a/test/drivers/azure-cosmos.test.ts +++ b/test/drivers/azure-cosmos.test.ts @@ -8,5 +8,6 @@ describe.skip("drivers: azure-cosmos", () => { endpoint: "COSMOS_DB_ENDPOINT", accountKey: "COSMOS_DB_KEY", }), + supportsCAS: true, }); }); diff --git a/test/drivers/azure-storage-blob.test.ts b/test/drivers/azure-storage-blob.test.ts index 335ff840b..a2eae4653 100644 --- a/test/drivers/azure-storage-blob.test.ts +++ b/test/drivers/azure-storage-blob.test.ts @@ -22,6 +22,7 @@ describe.skip("drivers: azure-storage-blob", () => { azuriteProcess.kill(9); }); testDriver({ + supportsCAS: true, driver: driver({ connectionString: "UseDevelopmentStorage=true;", accountName: "devstoreaccount1", diff --git a/test/drivers/azure-storage-table.test.ts b/test/drivers/azure-storage-table.test.ts index 59df8a7e6..25e135574 100644 --- a/test/drivers/azure-storage-table.test.ts +++ b/test/drivers/azure-storage-table.test.ts @@ -22,5 +22,6 @@ describe.skip("drivers: azure-storage-table", () => { connectionString: "UseDevelopmentStorage=true", accountName: "local", }), + supportsCAS: true, }); }); From 58f386dbcd14a84cd4fc43e3e283f1411b22a7d1 Mon Sep 17 00:00:00 2001 From: Pooya Parsa Date: Tue, 5 May 2026 09:41:33 +0200 Subject: [PATCH 4/5] up --- src/drivers/db0.ts | 8 +++---- src/drivers/fs.ts | 8 +++---- src/drivers/http.ts | 14 +++++++++-- src/drivers/mongodb.ts | 4 +++- src/storage.ts | 4 ++-- src/types.ts | 6 ++++- test/drivers/utils.ts | 54 ++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 82 insertions(+), 16 deletions(-) diff --git a/src/drivers/db0.ts b/src/drivers/db0.ts index c6f052093..8aa563caf 100644 --- a/src/drivers/db0.ts +++ b/src/drivers/db0.ts @@ -66,16 +66,14 @@ const driver: DriverFactory>> = (o const next = new Promise((r) => { release = r; }); - writeLocks.set( - key, - previous.then(() => next), - ); + const chained = previous.then(() => next); + writeLocks.set(key, chained); await previous; try { return await fn(); } finally { release(); - if (writeLocks.get(key) === next) { + if (writeLocks.get(key) === chained) { writeLocks.delete(key); } } diff --git a/src/drivers/fs.ts b/src/drivers/fs.ts index cf634a395..46039df32 100644 --- a/src/drivers/fs.ts +++ b/src/drivers/fs.ts @@ -65,16 +65,14 @@ const driver: DriverFactory = (userOptions = {}) => { const next = new Promise((r) => { release = r; }); - writeLocks.set( - key, - previous.then(() => next), - ); + const chained = previous.then(() => next); + writeLocks.set(key, chained); await previous; try { return await fn(); } finally { release(); - if (writeLocks.get(key) === next) { + if (writeLocks.get(key) === chained) { writeLocks.delete(key); } } diff --git a/src/drivers/http.ts b/src/drivers/http.ts index f83361f70..36a788d2d 100644 --- a/src/drivers/http.ts +++ b/src/drivers/http.ts @@ -2,7 +2,7 @@ import type { TransactionOptions } from "../types.ts"; import { type DriverFactory } from "./utils/index.ts"; import { type FetchError, $fetch as _fetch } from "ofetch"; import { joinURL } from "./utils/path.ts"; -import { CASMismatchError } from "./utils/cas.ts"; +import { CASMismatchError, CASUnsupportedError } from "./utils/cas.ts"; export interface HTTPOptions { base: string; @@ -59,11 +59,21 @@ const driver: DriverFactory = (opts) => { }); if (!wantsCAS) return undefined; const etag = parseEtag(res.headers.get("etag")); - return etag === undefined ? undefined : { etag }; + // A CAS-aware server echoes ETag on a successful conditional PUT. Its + // absence means the server (or its mounted driver) ignored the + // precondition headers — fail loudly to prevent silent lost updates, + // which is the whole point of CAS. + if (etag === undefined) { + throw new CASUnsupportedError(DRIVER_NAME); + } + return { etag }; } catch (error: any) { if (error?.response?.status === 412) { throw new CASMismatchError(DRIVER_NAME, key); } + if (error?.response?.status === 501) { + throw new CASUnsupportedError(DRIVER_NAME); + } throw error; } }; diff --git a/src/drivers/mongodb.ts b/src/drivers/mongodb.ts index 75712d0e1..82e90ad81 100644 --- a/src/drivers/mongodb.ts +++ b/src/drivers/mongodb.ts @@ -135,7 +135,9 @@ const driver: DriverFactory = (opts) => { const filter: Record = { key }; if (ifMatch !== undefined && ifMatch !== "*") { filter._etag = ifMatch; - } else if (ifNoneMatch !== undefined) { + } else if (ifNoneMatch !== undefined && ifNoneMatch !== "*") { + // ifMatch is `"*"` (existence already enforced by the `key` lookup) + // or undefined; ifNoneMatch is a concrete etag → require $ne. filter._etag = { $ne: ifNoneMatch }; } const r = await col.updateOne(filter, { diff --git a/src/storage.ts b/src/storage.ts index a86ae7d32..0dca2b4cf 100644 --- a/src/storage.ts +++ b/src/storage.ts @@ -326,8 +326,8 @@ export function createStorage( } return meta; }, - async setMeta(key: string, value: any, opts = {}) { - await this.setItem(key + "$", value, opts); + setMeta(key: string, value: any, opts = {}) { + return this.setItem(key + "$", value, opts); }, removeMeta(key: string, opts = {}) { return this.removeItem(key + "$", opts); diff --git a/src/types.ts b/src/types.ts index 4ef588c65..12a1689c5 100644 --- a/src/types.ts +++ b/src/types.ts @@ -174,7 +174,11 @@ export interface Storage { key: string, opts?: (TransactionOptions & { nativeOnly?: boolean }) | boolean /* legacy: nativeOnly */, ) => MaybePromise; - setMeta: (key: string, value: StorageMeta, opts?: TransactionOptions) => Promise; + setMeta: ( + key: string, + value: StorageMeta, + opts?: TransactionOptions, + ) => Promise; removeMeta: (key: string, opts?: TransactionOptions) => Promise; // Keys getKeys: (base?: string, opts?: GetKeysOptions) => Promise; diff --git a/test/drivers/utils.ts b/test/drivers/utils.ts index 147ea6ee1..f3f800e13 100644 --- a/test/drivers/utils.ts +++ b/test/drivers/utils.ts @@ -259,6 +259,60 @@ export function testDriver(opts: TestOptions): void { const meta = await ctx.storage.getMeta("cas:meta"); expect(meta.etag).toBe((r as { etag: string }).etag); }); + + it.skipIf(opts.casNoMetaEtag)( + "CAS: ifNoneMatch: rejects matching version, accepts mismatched", + async () => { + await ctx.storage.setItem("cas:nm", "v1"); + const meta1 = await ctx.storage.getMeta("cas:nm"); + const etag1 = meta1.etag as string; + expect(etag1).toBeTruthy(); + + // Forbidden etag matches current → mismatch. + await expect( + ctx.storage.setItem("cas:nm", "v2", { ifNoneMatch: etag1 }), + ).rejects.toBeInstanceOf(CASMismatchError); + expect(await ctx.storage.getItem("cas:nm")).toBe("v1"); + + // Forbidden etag does NOT match current → write proceeds. + const r = await ctx.storage.setItem("cas:nm", "v2", { + ifNoneMatch: "definitely-not-current", + }); + expect(r).toMatchObject({ etag: expect.any(String) }); + expect(await ctx.storage.getItem("cas:nm")).toBe("v2"); + }, + ); + + it.skipIf(opts.casNoMetaEtag)( + "CAS: combined ifMatch:* + ifNoneMatch: requires existence and version difference", + async () => { + await ctx.storage.setItem("cas:combo", "v1"); + const etag1 = (await ctx.storage.getMeta("cas:combo")).etag as string; + expect(etag1).toBeTruthy(); + + // Current etag equals the forbidden etag → mismatch (regression for + // the mongodb combined-filter bug where this case overwrote silently). + await expect( + ctx.storage.setItem("cas:combo", "v2", { ifMatch: "*", ifNoneMatch: etag1 }), + ).rejects.toBeInstanceOf(CASMismatchError); + expect(await ctx.storage.getItem("cas:combo")).toBe("v1"); + + // Current etag differs from the forbidden one → write proceeds. + const r = await ctx.storage.setItem("cas:combo", "v2", { + ifMatch: "*", + ifNoneMatch: "definitely-not-current", + }); + expect(r).toMatchObject({ etag: expect.any(String) }); + expect(await ctx.storage.getItem("cas:combo")).toBe("v2"); + }, + ); + + it.skipIf(opts.casNoMetaEtag)("CAS: setMeta propagates etag", async () => { + const r = await ctx.storage.setMeta("cas:setmeta", { tag: "v1" } as any, { + ifNoneMatch: "*", + }); + expect(r).toMatchObject({ etag: expect.any(String) }); + }); } else { it("CAS: throws CASUnsupportedError on ifMatch/ifNoneMatch", async () => { await expect( From 5fc783a22e75c4233a2603601a53a0a441beb431 Mon Sep 17 00:00:00 2001 From: Pooya Parsa Date: Tue, 5 May 2026 15:18:01 +0200 Subject: [PATCH 5/5] up --- src/drivers/utils/node-fs.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/drivers/utils/node-fs.ts b/src/drivers/utils/node-fs.ts index 05d1396d3..e789b0568 100644 --- a/src/drivers/utils/node-fs.ts +++ b/src/drivers/utils/node-fs.ts @@ -31,8 +31,8 @@ export async function writeFileExclusive( ): Promise { await ensuredir(dirname(path)); const tmp = `${path}.${process.pid}.${Date.now().toString(36)}.${Math.random().toString(36).slice(2, 8)}.tmp`; - await fsPromises.writeFile(tmp, data, encoding); try { + await fsPromises.writeFile(tmp, data, encoding); await fsPromises.link(tmp, path); } finally { await fsPromises.unlink(tmp).catch(() => {});