Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 74 additions & 5 deletions src/drivers/azure-app-configuration.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { type DriverFactory, createRequiredError } from "./utils/index.ts";
import { CASMismatchError } from "./utils/cas.ts";
import { AppConfigurationClient } from "@azure/app-configuration";
import { DefaultAzureCredential } from "@azure/identity";

Expand Down Expand Up @@ -60,8 +61,70 @@ const driver: DriverFactory<AzureAppConfigurationOptions, AppConfigurationClient
return client;
};

const setWithCAS = async (
key: string,
value: string,
tOptions: { ifMatch?: string; ifNoneMatch?: string },
): Promise<{ etag: string | undefined }> => {
const k = p(key);
const label = opts.label;
const c = getClient();
const { ifMatch, ifNoneMatch } = tOptions;
try {
// Create-only: ifNoneMatch:"*"
if (ifNoneMatch === "*" && ifMatch === undefined) {
const result = await c.addConfigurationSetting({ key: k, value, label });
return { etag: result.etag };
}
// Swap by etag: ifMatch:<etag> (no ifNoneMatch, or harmless ifNoneMatch:"*")
if (
ifMatch !== undefined &&
ifMatch !== "*" &&
(ifNoneMatch === undefined || ifNoneMatch === "*")
) {
const result = await c.setConfigurationSetting(
{ key: k, value, label, etag: ifMatch },
{ onlyIfUnchanged: true },
);
return { etag: result.etag };
}
// Remaining cases (ifMatch:"*", ifNoneMatch:<etag>, or combinations)
// are emulated via read-then-conditional-set.
const current = await c.getConfigurationSetting({ key: k, label }).catch(() => null);
const exists = !!current;
const curEtag = current?.etag;
let mismatch = false;
if (ifNoneMatch !== undefined) {
mismatch =
ifNoneMatch === "*" ? exists : exists && curEtag === ifNoneMatch;
}
if (!mismatch && ifMatch !== undefined) {
mismatch = ifMatch === "*" ? !exists : !exists || curEtag !== ifMatch;
}
if (mismatch) {
throw new CASMismatchError(DRIVER_NAME, key);
}
if (exists) {
const result = await c.setConfigurationSetting(
{ key: k, value, label, etag: curEtag },
{ onlyIfUnchanged: true },
);
return { etag: result.etag };
}
const result = await c.addConfigurationSetting({ key: k, value, label });
return { etag: result.etag };
} catch (err: any) {
if (CASMismatchError.is(err)) throw err;
if (err?.statusCode === 412 || err?.statusCode === 409) {
throw new CASMismatchError(DRIVER_NAME, key);
}
throw err;
}
};

return {
name: DRIVER_NAME,
flags: { cas: true },
options: opts,
getInstance: getClient,
async hasItem(key) {
Expand All @@ -86,7 +149,10 @@ const driver: DriverFactory<AzureAppConfigurationOptions, AppConfigurationClient
return null;
}
},
async setItem(key, value) {
async setItem(key, value, tOptions) {
if (tOptions?.ifMatch !== undefined || tOptions?.ifNoneMatch !== undefined) {
return setWithCAS(key, value, tOptions);
}
await getClient().setConfigurationSetting({
key: p(key),
value,
Expand Down Expand Up @@ -114,10 +180,13 @@ const driver: DriverFactory<AzureAppConfigurationOptions, AppConfigurationClient
return keys;
},
async getMeta(key) {
const setting = await getClient().getConfigurationSetting({
key: p(key),
label: opts.label,
});
const setting = await getClient()
.getConfigurationSetting({
key: p(key),
label: opts.label,
})
.catch(() => null);
if (!setting) return null;
return {
mtime: setting.lastModified,
etag: setting.etag,
Expand Down
123 changes: 121 additions & 2 deletions src/drivers/azure-cosmos.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { createRequiredError, type DriverFactory } from "./utils/index.ts";
import { CASMismatchError } from "./utils/cas.ts";
import { Container, CosmosClient } from "@azure/cosmos";
import { DefaultAzureCredential } from "@azure/identity";

Expand Down Expand Up @@ -43,8 +44,16 @@ export interface AzureCosmosItem {
* The unstorage mtime metadata of the item.
*/
modified: string | Date;

/**
* Cosmos-managed etag (read-only on the server side).
*/
_etag?: string;
}

const isStatus = (err: unknown, status: number): boolean =>
!!err && typeof err === "object" && (err as { code?: number | string }).code === status;

const driver: DriverFactory<AzureCosmosOptions, Promise<Container>> = (opts) => {
let client: Container;
const getCosmosClient = async () => {
Expand Down Expand Up @@ -83,8 +92,113 @@ const driver: DriverFactory<AzureCosmosOptions, Promise<Container>> = (opts) =>
return client;
};

const setWithCAS = async (
key: string,
value: string,
tOptions: { ifMatch?: string; ifNoneMatch?: string },
): Promise<{ etag: string }> => {
const container = await getCosmosClient();
const modified = new Date();
const body: AzureCosmosItem = { id: key, value, modified };
const { ifMatch, ifNoneMatch } = tOptions;

// ifNoneMatch:* — create-only via items.create (409 Conflict on collision).
if (ifNoneMatch === "*" && ifMatch === undefined) {
try {
const res = await container.items.create<AzureCosmosItem>(body, {
consistencyLevel: "Session",
});
return { etag: res.resource?._etag ?? res.etag };
} catch (err) {
if (isStatus(err, 409)) throw new CASMismatchError(DRIVER_NAME, key);
throw err;
}
}

// ifMatch:<etag> — replace with IfMatch precondition (412 on mismatch, 404 if absent).
if (ifMatch !== undefined && ifMatch !== "*" && ifNoneMatch === undefined) {
try {
const res = await container.item(key).replace<AzureCosmosItem>(body, {
accessCondition: { type: "IfMatch", condition: ifMatch },
consistencyLevel: "Session",
});
return { etag: res.resource?._etag ?? res.etag };
} catch (err) {
if (isStatus(err, 412) || isStatus(err, 404)) {
throw new CASMismatchError(DRIVER_NAME, key);
}
throw err;
}
}

// ifMatch:* — require existence; replace without etag pinning (404 if absent).
if (ifMatch === "*" && ifNoneMatch === undefined) {
try {
const res = await container.item(key).replace<AzureCosmosItem>(body, {
consistencyLevel: "Session",
});
return { etag: res.resource?._etag ?? res.etag };
} catch (err) {
if (isStatus(err, 404)) throw new CASMismatchError(DRIVER_NAME, key);
throw err;
}
}

// Remaining shapes (ifNoneMatch:<etag>, combined): read-then-conditional-replace.
// Cosmos accessCondition only supports a single header per request, so combined
// preconditions and "ifNoneMatch:<etag>" are evaluated client-side, then the
// write is pinned to the observed etag for atomicity.
const existing = await container
.item(key)
.read<AzureCosmosItem>()
.catch((err) => {
if (isStatus(err, 404)) {
return { resource: undefined as AzureCosmosItem | undefined, etag: "" };
}
throw err;
});
const exists = !!existing.resource;
const curEtag = existing.resource?._etag;

if (ifNoneMatch !== undefined) {
const mismatch =
ifNoneMatch === "*" ? exists : exists && curEtag === ifNoneMatch;
if (mismatch) throw new CASMismatchError(DRIVER_NAME, key);
}
if (ifMatch !== undefined) {
const mismatch =
ifMatch === "*" ? !exists : !exists || curEtag !== ifMatch;
if (mismatch) throw new CASMismatchError(DRIVER_NAME, key);
}

if (!exists) {
try {
const res = await container.items.create<AzureCosmosItem>(body, {
consistencyLevel: "Session",
});
return { etag: res.resource?._etag ?? res.etag };
} catch (err) {
if (isStatus(err, 409)) throw new CASMismatchError(DRIVER_NAME, key);
throw err;
}
}
try {
const res = await container.item(key).replace<AzureCosmosItem>(body, {
accessCondition: { type: "IfMatch", condition: curEtag! },
consistencyLevel: "Session",
});
return { etag: res.resource?._etag ?? res.etag };
} catch (err) {
if (isStatus(err, 412) || isStatus(err, 404)) {
throw new CASMismatchError(DRIVER_NAME, key);
}
throw err;
}
};

return {
name: DRIVER_NAME,
flags: { cas: true },
options: opts,
getInstance: getCosmosClient,
async hasItem(key) {
Expand All @@ -95,7 +209,10 @@ const driver: DriverFactory<AzureCosmosOptions, Promise<Container>> = (opts) =>
const item = await (await getCosmosClient()).item(key).read<AzureCosmosItem>();
return item.resource ? item.resource.value : null;
},
async setItem(key, value) {
async setItem(key, value, tOptions) {
if (tOptions?.ifMatch !== undefined || tOptions?.ifNoneMatch !== undefined) {
return setWithCAS(key, value, tOptions);
}
const modified = new Date();
await (
await getCosmosClient()
Expand All @@ -119,8 +236,10 @@ const driver: DriverFactory<AzureCosmosOptions, Promise<Container>> = (opts) =>
},
async getMeta(key) {
const item = await (await getCosmosClient()).item(key).read<AzureCosmosItem>();
if (!item.resource) return null;
return {
mtime: item.resource?.modified ? new Date(item.resource.modified) : undefined,
mtime: item.resource.modified ? new Date(item.resource.modified) : undefined,
etag: item.resource._etag,
};
},
async clear() {
Expand Down
51 changes: 46 additions & 5 deletions src/drivers/azure-storage-blob.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { createError, type DriverFactory } from "./utils/index.ts";
import { CASMismatchError } from "./utils/cas.ts";
import {
BlobServiceClient,
ContainerClient,
Expand Down Expand Up @@ -97,8 +98,38 @@ const driver: DriverFactory<AzureStorageBlobOptions, ContainerClient> = (opts) =
return containerClient;
};

const uploadWithCAS = async (
key: string,
value: any,
length: number,
topts: { ifMatch?: string; ifNoneMatch?: string } | undefined,
): Promise<{ etag: string } | undefined> => {
const wantsCAS =
topts?.ifMatch !== undefined || topts?.ifNoneMatch !== undefined;
const conditions: { ifMatch?: string; ifNoneMatch?: string } = {};
if (topts?.ifMatch !== undefined) {
conditions.ifMatch = topts.ifMatch === "*" ? "*" : `"${topts.ifMatch}"`;
}
if (topts?.ifNoneMatch !== undefined) {
conditions.ifNoneMatch =
topts.ifNoneMatch === "*" ? "*" : `"${topts.ifNoneMatch}"`;
}
try {
const res = await getContainerClient()
.getBlockBlobClient(key)
.upload(value, length, wantsCAS ? { conditions } : undefined);
return wantsCAS ? { etag: stripQuotes(res.etag) } : undefined;
} catch (err: any) {
if (wantsCAS && (err?.statusCode === 412 || err?.statusCode === 409)) {
throw new CASMismatchError(DRIVER_NAME, key);
}
throw err;
}
};

return {
name: DRIVER_NAME,
flags: { cas: true },
options: opts,
getInstance: getContainerClient,
async hasItem(key) {
Expand Down Expand Up @@ -128,11 +159,11 @@ const driver: DriverFactory<AzureStorageBlobOptions, ContainerClient> = (opts) =
return null;
}
},
async setItem(key, value) {
await getContainerClient().getBlockBlobClient(key).upload(value, Buffer.byteLength(value));
async setItem(key, value, topts?: { ifMatch?: string; ifNoneMatch?: string }) {
return uploadWithCAS(key, value, Buffer.byteLength(value), topts);
},
async setItemRaw(key, value) {
await getContainerClient().getBlockBlobClient(key).upload(value, Buffer.byteLength(value));
async setItemRaw(key, value, topts?: { ifMatch?: string; ifNoneMatch?: string }) {
return uploadWithCAS(key, value, Buffer.byteLength(value), topts);
},
async removeItem(key) {
await getContainerClient()
Expand All @@ -149,11 +180,16 @@ const driver: DriverFactory<AzureStorageBlobOptions, ContainerClient> = (opts) =
return keys;
},
async getMeta(key) {
const blobProperties = await getContainerClient().getBlockBlobClient(key).getProperties();
const blobProperties = await getContainerClient()
.getBlockBlobClient(key)
.getProperties()
.catch(() => null);
if (!blobProperties) return null;
return {
mtime: blobProperties.lastModified,
atime: blobProperties.lastAccessed,
cr: blobProperties.createdOn,
etag: stripQuotes(blobProperties.etag),
...blobProperties.metadata,
};
},
Expand All @@ -175,6 +211,11 @@ const driver: DriverFactory<AzureStorageBlobOptions, ContainerClient> = (opts) =

const isBrowser = typeof window !== "undefined";

function stripQuotes(value: string | null | undefined): string {
if (!value) return "";
return value.startsWith('"') && value.endsWith('"') ? value.slice(1, -1) : value;
}

// Helper function to read a Node.js readable stream into a Buffer. (https://github.com/Azure/azure-sdk-for-js/tree/main/sdk/storage/storage-blob)
async function streamToBuffer(readableStream: NodeJS.ReadableStream): Promise<Buffer> {
return new Promise((resolve, reject) => {
Expand Down
Loading
Loading