diff --git a/src/lib/graphql/fetchAllRequestIdentifierMetadata.ts b/src/lib/graphql/fetchAllRequestIdentifierMetadata.ts index f9a7395f..0bbc2227 100644 --- a/src/lib/graphql/fetchAllRequestIdentifierMetadata.ts +++ b/src/lib/graphql/fetchAllRequestIdentifierMetadata.ts @@ -11,7 +11,7 @@ export interface RequestIdentifierMetadata { isVerifiedAtLeastOnce: boolean; } -const PAGE_SIZE = 50; +const PAGE_SIZE = 2000; /** * Fetch all request identifier metadata for a particular request @@ -41,22 +41,29 @@ export async function fetchAllRequestIdentifierMetadata( const resolvedRequestIds = requestIds ?? (requestId ? [requestId] : undefined); const requestIdentifiers: RequestIdentifierMetadata[] = []; - let offset = 0; + let cursor: string | undefined; // Paginate let shouldContinue = false; do { const { - requestIdentifiers: { nodes }, + requestIdentifiers: { nodes, pageInfo }, } = await makeGraphQLRequest<{ /** Request Identifiers */ requestIdentifiers: { /** List */ nodes: RequestIdentifierMetadata[]; + /** Pagination info */ + pageInfo: { + /** Cursor for the last item */ + endCursor: string | null; + /** Whether more pages exist */ + hasNextPage: boolean; + }; }; }>(client, REQUEST_IDENTIFIERS, { first: PAGE_SIZE, - offset, + after: cursor, requestIds: resolvedRequestIds, updatedAtBefore: updatedAtBefore ? updatedAtBefore.toISOString() @@ -64,8 +71,8 @@ export async function fetchAllRequestIdentifierMetadata( updatedAtAfter: updatedAtAfter ? updatedAtAfter.toISOString() : undefined, }); requestIdentifiers.push(...nodes); - offset += PAGE_SIZE; - shouldContinue = nodes.length === PAGE_SIZE; + cursor = pageInfo.endCursor ?? undefined; + shouldContinue = pageInfo.hasNextPage; } while (shouldContinue); return requestIdentifiers; diff --git a/src/lib/graphql/fetchAllRequestIdentifiers.ts b/src/lib/graphql/fetchAllRequestIdentifiers.ts index 40d0c9ae..cd58e30a 100644 --- a/src/lib/graphql/fetchAllRequestIdentifiers.ts +++ b/src/lib/graphql/fetchAllRequestIdentifiers.ts @@ -24,10 +24,29 @@ const RequestIdentifier = t.type({ /** Type override */ export type RequestIdentifier = t.TypeOf; -const PAGE_SIZE = 50; +const PAGE_SIZE = 100; + +const PageInfo = t.type({ + endCursor: t.union([t.string, t.null]), + hasNextPage: t.boolean, +}); export const RequestIdentifiersResponse = t.type({ identifiers: t.array(RequestIdentifier), + pageInfo: PageInfo, +}); + +const BatchRequestIdentifier = t.type({ + id: t.string, + name: t.string, + value: t.string, + type: valuesOf(IdentifierType), + requestId: t.string, +}); + +const BatchRequestIdentifiersResponse = t.type({ + identifiers: t.array(BatchRequestIdentifier), + pageInfo: PageInfo, }); /** @@ -84,24 +103,31 @@ export async function fetchAllRequestIdentifiers( }, ): Promise { const requestIdentifiers: RequestIdentifier[] = []; - let offset = 0; - let shouldContinue = false; + let endCursor: string | undefined; + let shouldContinue = true; if (!skipSombraCheck) { await validateSombraVersion(client); } - do { + while (shouldContinue) { let response: unknown; try { response = await sombra! .post<{ /** Decrypted identifiers */ identifiers: RequestIdentifier[]; + /** Pagination info */ + pageInfo: { + /** Cursor for the last item */ + endCursor: string | null; + /** Whether more pages exist */ + hasNextPage: boolean; + }; }>('v1/request-identifiers', { json: { first: PAGE_SIZE, - offset, + after: endCursor ?? undefined, requestId, }, }) @@ -114,16 +140,88 @@ export async function fetchAllRequestIdentifiers( ); } - const { identifiers: nodes } = decodeCodec( + const { identifiers: nodes, pageInfo } = decodeCodec( RequestIdentifiersResponse, response, ); requestIdentifiers.push(...nodes); - offset += PAGE_SIZE; - shouldContinue = nodes.length === PAGE_SIZE; - } while (shouldContinue); + endCursor = pageInfo.endCursor ?? undefined; + shouldContinue = pageInfo.hasNextPage; + } return requestIdentifiers; } + +/** + * Fetch request identifiers for multiple requests in a single paginated call. + * Returns a Map keyed by requestId so callers can look up identifiers per request. + * + * @param sombra - Sombra client + * @param options - Options + * @returns Map of requestId to its identifiers + */ +export async function fetchRequestIdentifiersBatch( + sombra: Got, + { + requestIds, + }: { + /** IDs of requests to fetch identifiers for */ + requestIds: string[]; + }, +): Promise> { + const result = new Map(); + + if (requestIds.length === 0) { + return result; + } + + // Ensure every requested ID has an entry even if Sombra returns nothing for it + for (const id of requestIds) { + result.set(id, []); + } + + let cursor: string | undefined; + let shouldContinue = true; + + while (shouldContinue) { + let response: unknown; + try { + response = await sombra + .post('v1/request-identifiers', { + json: { + first: PAGE_SIZE, + after: cursor ?? undefined, + requestIds, + }, + }) + .json(); + } catch (err) { + throw new Error( + `Failed to fetch request identifiers: ${ + err?.response?.body || err?.message + }`, + ); + } + + const { identifiers: nodes, pageInfo } = decodeCodec( + BatchRequestIdentifiersResponse, + response, + ); + + for (const { requestId, ...identifier } of nodes) { + const list = result.get(requestId); + if (list) { + list.push(identifier); + } else { + result.set(requestId, [identifier]); + } + } + + cursor = pageInfo.endCursor ?? undefined; + shouldContinue = pageInfo.hasNextPage; + } + + return result; +} diff --git a/src/lib/graphql/gqls/RequestIdentifier.ts b/src/lib/graphql/gqls/RequestIdentifier.ts index 466a28fc..4ed3970f 100644 --- a/src/lib/graphql/gqls/RequestIdentifier.ts +++ b/src/lib/graphql/gqls/RequestIdentifier.ts @@ -15,19 +15,19 @@ export const REMOVE_REQUEST_IDENTIFIERS = gql` export const REQUEST_IDENTIFIERS = gql` query TranscendCliRequestIdentifiers( $first: Int! - $offset: Int! + $after: String $requestIds: [ID!] $updatedAtBefore: Date $updatedAtAfter: Date ) { requestIdentifiers( - input: { - requestIds: $requestIds + input: { requestIds: $requestIds } + filterBy: { updatedAtBefore: $updatedAtBefore updatedAtAfter: $updatedAtAfter } first: $first - offset: $offset + after: $after useMaster: false orderBy: [ { field: createdAt, direction: ASC } @@ -40,6 +40,10 @@ export const REQUEST_IDENTIFIERS = gql` isVerifiedAtLeastOnce } totalCount + pageInfo { + endCursor + hasNextPage + } } } `; diff --git a/src/lib/manual-enrichment/pullManualEnrichmentIdentifiersToCsv.ts b/src/lib/manual-enrichment/pullManualEnrichmentIdentifiersToCsv.ts index 1076994d..17dbf772 100644 --- a/src/lib/manual-enrichment/pullManualEnrichmentIdentifiersToCsv.ts +++ b/src/lib/manual-enrichment/pullManualEnrichmentIdentifiersToCsv.ts @@ -11,7 +11,7 @@ import { buildTranscendGraphQLClient, createSombraGotInstance, fetchAllRequestEnrichers, - fetchAllRequestIdentifiers, + fetchRequestIdentifiersBatch, fetchAllRequests, validateSombraVersion, } from '../graphql'; @@ -71,45 +71,37 @@ export async function pullManualEnrichmentIdentifiersToCsv({ await validateSombraVersion(client); - // Requests to save - const savedRequests: PrivacyRequestWithIdentifiers[] = []; - - // Filter down requests to what is needed - await map( + // Fetch enrichers for all requests in parallel + const requestsWithEnrichers = await map( allRequests, - async (request) => { - // Fetch enrichers - const requestEnrichers = await fetchAllRequestEnrichers(client, { + async (request) => ({ + request, + requestEnrichers: await fetchAllRequestEnrichers(client, { requestId: request.id, - }); - - // Check if manual enrichment exists for that request - const hasManualEnrichment = requestEnrichers.filter( - ({ status }) => status === 'ACTION_REQUIRED', - ); + }), + }), + { concurrency }, + ); - // Save request to queue - if (hasManualEnrichment) { - const requestIdentifiers = await fetchAllRequestIdentifiers( - client, - sombra, - { - requestId: request.id, - skipSombraCheck: true, - }, - ); - savedRequests.push({ - ...request, - requestIdentifiers, - requestEnrichers, - }); - } - }, - { - concurrency, - }, + // Filter to requests that have manual enrichment + const manualEnrichmentRequests = requestsWithEnrichers.filter( + ({ requestEnrichers }) => + requestEnrichers.filter(({ status }) => status === 'ACTION_REQUIRED') + .length > 0, ); + // Batch-fetch identifiers for all qualifying requests at once + const identifiersByRequest = await fetchRequestIdentifiersBatch(sombra, { + requestIds: manualEnrichmentRequests.map(({ request }) => request.id), + }); + + const savedRequests: PrivacyRequestWithIdentifiers[] = + manualEnrichmentRequests.map(({ request, requestEnrichers }) => ({ + ...request, + requestIdentifiers: identifiersByRequest.get(request.id) ?? [], + requestEnrichers, + })); + const data = savedRequests.map( ({ attributeValues, diff --git a/src/lib/requests/bulkRestartRequests.ts b/src/lib/requests/bulkRestartRequests.ts index ff5bce0e..f8c0b613 100644 --- a/src/lib/requests/bulkRestartRequests.ts +++ b/src/lib/requests/bulkRestartRequests.ts @@ -8,9 +8,10 @@ import { difference } from 'lodash-es'; import { join, resolve } from 'node:path'; import { DEFAULT_TRANSCEND_API } from '../../constants'; import { + RequestIdentifier, buildTranscendGraphQLClient, createSombraGotInstance, - fetchAllRequestIdentifiers, + fetchRequestIdentifiersBatch, fetchAllRequests, validateSombraVersion, } from '../graphql'; @@ -163,8 +164,12 @@ export async function bulkRestartRequests({ } } + let identifiersByRequest: Map | undefined; if (copyIdentifiers) { await validateSombraVersion(client); + identifiersByRequest = await fetchRequestIdentifiersBatch(sombra, { + requestIds: requests.map((r) => r.id), + }); } // Map over the requests @@ -174,12 +179,8 @@ export async function bulkRestartRequests({ requests, async (request, ind) => { try { - // Pull the request identifiers const requestIdentifiers = copyIdentifiers - ? await fetchAllRequestIdentifiers(client, sombra, { - requestId: request.id, - skipSombraCheck: true, - }) + ? identifiersByRequest!.get(request.id) ?? [] : []; // Make the GraphQL request to restart the request diff --git a/src/lib/requests/index.ts b/src/lib/requests/index.ts index f027453f..a7fdecf5 100644 --- a/src/lib/requests/index.ts +++ b/src/lib/requests/index.ts @@ -31,3 +31,4 @@ export * from './pullPrivacyRequests'; export * from './streamPrivacyRequestsToCsv'; export * from './skipRequestDataSilos'; export * from './removeUnverifiedRequestIdentifiers'; +export * from './splitDateRange'; diff --git a/src/lib/requests/pullPrivacyRequests.ts b/src/lib/requests/pullPrivacyRequests.ts index 78b2b4a4..01ede4c4 100644 --- a/src/lib/requests/pullPrivacyRequests.ts +++ b/src/lib/requests/pullPrivacyRequests.ts @@ -4,10 +4,9 @@ import colors from 'colors'; import { DEFAULT_TRANSCEND_API } from '../../constants'; import { - RequestIdentifier, buildTranscendGraphQLClient, createSombraGotInstance, - fetchAllRequestIdentifiers, + fetchRequestIdentifiersBatch, fetchAllRequests, validateSombraVersion, } from '../graphql'; @@ -17,33 +16,7 @@ import { CsvRow, ExportedPrivacyRequest, } from './formatRequestForCsv'; - -/** - * Split a date range into N evenly-spaced chunks. - * - * @param after - Start of the date range - * @param before - End of the date range - * @param chunks - Number of chunks to split into - * @returns Array of date range bounds - */ -function splitDateRange( - after: Date, - before: Date, - chunks: number, -): { - /** Chunk start */ createdAtAfter: Date; - /** Chunk end */ createdAtBefore: Date; -}[] { - const /** Range start ms */ start = after.getTime(); - const /** Range end ms */ end = before.getTime(); - const /** Ms per chunk */ chunkSize = (end - start) / chunks; - return Array.from({ length: chunks }, (_, i) => ({ - createdAtAfter: new Date(start + chunkSize * i), - createdAtBefore: new Date( - i === chunks - 1 ? end : start + chunkSize * (i + 1), - ), - })); -} +import { splitDateRange } from './splitDateRange'; /** * Pull down a list of privacy requests @@ -57,7 +30,6 @@ export async function pullPrivacyRequests({ actions = [], statuses = [], identifierSearch, - pageLimit = 100, concurrency = 1, transcendUrl = DEFAULT_TRANSCEND_API, createdAtBefore, @@ -162,31 +134,16 @@ export async function pullPrivacyRequests({ } // Fetch the request identifiers for those requests - const requestsWithRequestIdentifiers = skipRequestIdentifiers - ? requests.map((request) => ({ - ...request, - requestIdentifiers: [] as RequestIdentifier[], - })) - : await map( - requests, - async (request) => { - const requestIdentifiers = await fetchAllRequestIdentifiers( - client, - sombra, - { - requestId: request.id, - skipSombraCheck: true, - }, - ); - return { - ...request, - requestIdentifiers, - }; - }, - { - concurrency: pageLimit, - }, - ); + const identifiersByRequest = skipRequestIdentifiers + ? new Map() + : await fetchRequestIdentifiersBatch(sombra, { + requestIds: requests.map((r) => r.id), + }); + + const requestsWithRequestIdentifiers = requests.map((request) => ({ + ...request, + requestIdentifiers: identifiersByRequest.get(request.id) ?? [], + })); logger.info( colors.magenta(`Pulled ${requestsWithRequestIdentifiers.length} requests`), diff --git a/src/lib/requests/splitDateRange.ts b/src/lib/requests/splitDateRange.ts new file mode 100644 index 00000000..0606ef59 --- /dev/null +++ b/src/lib/requests/splitDateRange.ts @@ -0,0 +1,30 @@ +export interface ChunkedDateRange { + /** Chunk start */ + createdAtAfter: Date; + /** Chunk end */ + createdAtBefore: Date; +} + +/** + * Split a date range into N evenly-spaced chunks. + * + * @param after - Start of the date range + * @param before - End of the date range + * @param chunks - Number of chunks to split into + * @returns Array of date range bounds + */ +export function splitDateRange( + after: Date, + before: Date, + chunks: number, +): ChunkedDateRange[] { + const startMs = after.getTime(); + const endMs = before.getTime(); + const chunkSizeMs = (endMs - startMs) / chunks; + return Array.from({ length: chunks }, (_, i) => ({ + createdAtAfter: new Date(startMs + chunkSizeMs * i), + createdAtBefore: new Date( + i === chunks - 1 ? endMs : startMs + chunkSizeMs * (i + 1), + ), + })); +} diff --git a/src/lib/requests/streamPrivacyRequestsToCsv.ts b/src/lib/requests/streamPrivacyRequestsToCsv.ts index d11d1642..410e02ab 100644 --- a/src/lib/requests/streamPrivacyRequestsToCsv.ts +++ b/src/lib/requests/streamPrivacyRequestsToCsv.ts @@ -8,7 +8,7 @@ import { DEFAULT_TRANSCEND_API } from '../../constants'; import { buildTranscendGraphQLClient, createSombraGotInstance, - fetchAllRequestIdentifiers, + fetchRequestIdentifiersBatch, fetchAllRequests, fetchRequestsTotalCount, validateSombraVersion, @@ -19,37 +19,7 @@ import { formatRequestForCsv, ExportedPrivacyRequest, } from './formatRequestForCsv'; - -interface ChunkedDateRange { - /** Chunk start */ - createdAtAfter: Date; - /** Chunk end */ - createdAtBefore: Date; -} - -/** - * Split a date range into N evenly-spaced chunks. - * - * @param after - Start of the date range - * @param before - End of the date range - * @param chunks - Number of chunks to split into - * @returns Array of date range bounds - */ -function splitDateRange( - after: Date, - before: Date, - chunks: number, -): ChunkedDateRange[] { - const startMs = after.getTime(); - const endMs = before.getTime(); - const chunkSize = (endMs - startMs) / chunks; - return Array.from({ length: chunks }, (_, i) => ({ - createdAtAfter: new Date(startMs + chunkSize * i), - createdAtBefore: new Date( - i === chunks - 1 ? endMs : startMs + chunkSize * (i + 1), - ), - })); -} +import { splitDateRange } from './splitDateRange'; /** * Stream privacy requests directly to CSV files, one file per date-range chunk. @@ -66,7 +36,6 @@ export async function streamPrivacyRequestsToCsv({ statuses = [], identifierSearch, concurrency = 1, - pageLimit = 100, transcendUrl = DEFAULT_TRANSCEND_API, createdAtBefore, createdAtAfter, @@ -224,20 +193,15 @@ export async function streamPrivacyRequestsToCsv({ if (nodes.length === 0) return; // Optionally enrich each request with its identifiers - const enriched: ExportedPrivacyRequest[] = skipRequestIdentifiers - ? nodes.map((n) => ({ ...n, requestIdentifiers: [] })) - : await map( - nodes, - async (n) => ({ - ...n, - requestIdentifiers: await fetchAllRequestIdentifiers( - client, - sombra!, - { requestId: n.id, skipSombraCheck: true }, - ), - }), - { concurrency: pageLimit }, - ); + const identifiersByRequest = skipRequestIdentifiers + ? new Map() + : await fetchRequestIdentifiersBatch(sombra!, { + requestIds: nodes.map((n) => n.id), + }); + const enriched: ExportedPrivacyRequest[] = nodes.map((n) => ({ + ...n, + requestIdentifiers: identifiersByRequest.get(n.id) ?? [], + })); const rows: Record[] = enriched.map(formatRequestForCsv);