Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions packages/engines/src/lib/collect-transferables.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Recursively walks a response object and collects Transferable entries
* (ArrayBuffer from typed arrays, raw ArrayBuffer).
*
* Depth-limited to 6 to cover the deepest nesting in this codebase:
* response.data.result[annotId][mode].data.data (AnnotationAppearanceMap)
*/

const MAX_DEPTH = 6;

function walk(value: unknown, seen: Set<ArrayBuffer>, depth: number): void {
if (depth > MAX_DEPTH || value == null || typeof value !== 'object') {
return;
}

if (value instanceof ArrayBuffer) {
seen.add(value);
return;
}

// SharedArrayBuffer is not transferable — skip
if (typeof SharedArrayBuffer !== 'undefined' && value instanceof SharedArrayBuffer) {
return;
}

if (ArrayBuffer.isView(value)) {
const buf = value.buffer;
if (buf instanceof ArrayBuffer) {
seen.add(buf);
}
return;
}

if (Array.isArray(value)) {
for (let i = 0; i < value.length; i++) {
walk(value[i], seen, depth + 1);
}
return;
}

for (const key of Object.keys(value as Record<string, unknown>)) {
walk((value as Record<string, unknown>)[key], seen, depth + 1);
}
}

export function collectTransferables(value: unknown): Transferable[] {
const seen = new Set<ArrayBuffer>();
walk(value, seen, 0);
return Array.from(seen);
}
127 changes: 34 additions & 93 deletions packages/engines/src/lib/orchestrator/pdf-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ import {
ImageDataLike,
IPdfiumExecutor,
AnnotationAppearanceMap,
RenderPriority as Priority,
} from '@embedpdf/models';
import { WorkerTaskQueue, Priority } from './task-queue';
import { WorkerTaskQueue } from './task-queue';
import type { ImageDataConverter } from '../converters/types';

// Re-export for convenience
Expand Down Expand Up @@ -322,7 +323,7 @@ export class PdfEngine<T = Blob> implements IPdfEngine<T> {
execute: () => this.executor.renderPageRaw(doc, page, options),
meta: { docId: doc.id, pageIndex: page.index, operation: 'renderPageRaw' },
},
{ priority: Priority.HIGH },
{ priority: options?.priority ?? Priority.HIGH },
);
}

Expand All @@ -337,7 +338,7 @@ export class PdfEngine<T = Blob> implements IPdfEngine<T> {
execute: () => this.executor.renderPageRect(doc, page, rect, options),
meta: { docId: doc.id, pageIndex: page.index, operation: 'renderPageRectRaw' },
},
{ priority: Priority.HIGH },
{ priority: options?.priority ?? Priority.HIGH },
);
}

Expand Down Expand Up @@ -375,38 +376,18 @@ export class PdfEngine<T = Blob> implements IPdfEngine<T> {
page: PdfPageObject,
options?: PdfRenderPageAnnotationOptions,
): PdfTask<AnnotationAppearanceMap<T>> {
const resultTask = new Task<AnnotationAppearanceMap<T>, PdfErrorReason>();

const renderHandle = this.workerQueue.enqueue(
{
execute: () => this.executor.renderPageAnnotationsRaw(doc, page, options),
meta: { docId: doc.id, pageIndex: page.index, operation: 'renderPageAnnotationsRaw' },
},
{ priority: Priority.MEDIUM },
);

// Wire up abort: when resultTask is aborted, also abort the queue task
const originalAbort = resultTask.abort.bind(resultTask);
resultTask.abort = (reason) => {
renderHandle.abort(reason);
originalAbort(reason);
};

renderHandle.wait(
(rawMap) => {
if (resultTask.state.stage !== 0 /* Pending */) {
return;
}
this.encodeAppearanceMap(rawMap, options, resultTask);
},
(error) => {
if (resultTask.state.stage === 0 /* Pending */) {
resultTask.fail(error);
}
},
);

return resultTask;
return this.workerQueue
.enqueue(
{
execute: () => this.executor.renderPageAnnotationsRaw(doc, page, options),
meta: { docId: doc.id, pageIndex: page.index, operation: 'renderPageAnnotationsRaw' },
},
{ priority: Priority.MEDIUM },
)
.map(
(rawMap) => this.encodeAppearanceMap(rawMap, options),
(err: unknown) => ({ code: PdfErrorCode.Unknown, message: String(err) }),
);
}

renderPageAnnotationsRaw(
Expand All @@ -433,51 +414,24 @@ export class PdfEngine<T = Blob> implements IPdfEngine<T> {
pageIndex?: number,
priority: Priority = Priority.CRITICAL,
): PdfTask<T> {
const resultTask = new Task<T, PdfErrorReason>();

// Step 1: Add HIGH/MEDIUM priority task to render raw bytes
const renderHandle = this.workerQueue.enqueue(
{
execute: () => renderFn(),
meta: { docId, pageIndex, operation: 'render' },
},
{ priority },
);

// Wire up abort: when resultTask is aborted, also abort the queue task
const originalAbort = resultTask.abort.bind(resultTask);
resultTask.abort = (reason) => {
renderHandle.abort(reason); // Cancel the queue task!
originalAbort(reason);
};

renderHandle.wait(
(rawImageData) => {
// Check if resultTask was already aborted before encoding
if (resultTask.state.stage !== 0 /* Pending */) {
return;
}
this.encodeImage(rawImageData, options, resultTask);
},
(error) => {
// Only forward error if resultTask is still pending
if (resultTask.state.stage === 0 /* Pending */) {
resultTask.fail(error);
}
},
);

return resultTask;
return this.workerQueue
.enqueue(
{
execute: () => renderFn(),
meta: { docId, pageIndex, operation: 'render' },
},
{ priority },
)
.map(
(rawImageData) => this.encodeImage(rawImageData, options),
(err: unknown) => ({ code: PdfErrorCode.Unknown, message: String(err) }),
);
}

/**
* Encode image using encoder pool or inline
*/
private encodeImage(
rawImageData: ImageDataLike,
options: any,
resultTask: Task<T, PdfErrorReason>,
): void {
private encodeImage(rawImageData: ImageDataLike, options: any): Promise<T> {
const imageType = options?.imageType ?? 'image/webp';
const quality = options?.quality;

Expand All @@ -488,20 +442,16 @@ export class PdfEngine<T = Blob> implements IPdfEngine<T> {
height: rawImageData.height,
};

this.options
.imageConverter(() => plainImageData, imageType, quality)
.then((result) => resultTask.resolve(result))
.catch((error) => resultTask.reject({ code: PdfErrorCode.Unknown, message: String(error) }));
return this.options.imageConverter(() => plainImageData, imageType, quality);
}

/**
* Encode a full annotation appearance map to the output type T.
*/
private encodeAppearanceMap(
private async encodeAppearanceMap(
rawMap: AnnotationAppearanceMap<ImageDataLike>,
options: PdfRenderPageAnnotationOptions | undefined,
resultTask: Task<AnnotationAppearanceMap<T>, PdfErrorReason>,
): void {
): Promise<AnnotationAppearanceMap<T>> {
const imageType = options?.imageType ?? 'image/webp';
const quality = options?.imageQuality;

Expand Down Expand Up @@ -537,17 +487,8 @@ export class PdfEngine<T = Blob> implements IPdfEngine<T> {
}
}

Promise.all(jobs)
.then(() => {
if (resultTask.state.stage === 0 /* Pending */) {
resultTask.resolve(encodedMap);
}
})
.catch((error) => {
if (resultTask.state.stage === 0 /* Pending */) {
resultTask.reject({ code: PdfErrorCode.Unknown, message: String(error) });
}
});
await Promise.all(jobs);
return encodedMap;
}

// ========== Annotations ==========
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Logger, NoopLogger, Task, TaskError, PdfErrorCode } from '@embedpdf/models';
import { PdfiumNative } from '../pdfium/engine';
import { init } from '@embedpdf/pdfium';
import { collectTransferables } from '../collect-transferables';

const LOG_SOURCE = 'PdfiumNativeRunner';
const LOG_CATEGORY = 'Worker';
Expand Down Expand Up @@ -255,7 +256,7 @@ export class PdfiumNativeRunner {
*/
private respond(response: WorkerResponse): void {
this.logger.debug(LOG_SOURCE, LOG_CATEGORY, 'Sending response:', response.type);
self.postMessage(response);
self.postMessage(response, { transfer: collectTransferables(response) });
}

/**
Expand Down
17 changes: 5 additions & 12 deletions packages/engines/src/lib/orchestrator/task-queue.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
import { Task, TaskError, Logger, NoopLogger } from '@embedpdf/models';
import { Task, Logger, NoopLogger, RenderPriority } from '@embedpdf/models';

const LOG_SOURCE = 'TaskQueue';
const LOG_CATEGORY = 'Queue';

export enum Priority {
CRITICAL = 3,
HIGH = 2,
MEDIUM = 1,
LOW = 0,
}

// ============================================================================
// Type Utilities
// ============================================================================
Expand All @@ -35,14 +28,14 @@ export type ExtractTaskProgress<T> = T extends Task<any, any, infer P> ? P : nev

export interface QueuedTask<T extends Task<any, any, any>> {
id: string;
priority: Priority;
priority: RenderPriority;
meta?: Record<string, unknown>;
executeFactory: () => T; // Factory function - called when it's time to execute!
cancelled?: boolean;
}

export interface EnqueueOptions {
priority?: Priority;
priority?: RenderPriority;
meta?: Record<string, unknown>;
fifo?: boolean;
}
Expand Down Expand Up @@ -152,7 +145,7 @@ export class WorkerTaskQueue {
* const task = queue.enqueue({
* execute: () => this.executor.getMetadata(doc), // Factory - not called yet!
* meta: { operation: 'getMetadata' }
* }, { priority: Priority.LOW });
* }, { priority: RenderPriority.LOW });
*
* The returned task has the SAME type as executor.getMetadata() would return!
*/
Expand All @@ -164,7 +157,7 @@ export class WorkerTaskQueue {
options: EnqueueOptions = {},
): T {
const id = this.generateId();
const priority = options.priority ?? Priority.MEDIUM;
const priority = options.priority ?? RenderPriority.MEDIUM;

// Create a proxy task that we return to the user
// This task bridges to the real task that will be created later
Expand Down
3 changes: 2 additions & 1 deletion packages/engines/src/lib/webworker/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
PdfErrorCode,
TaskReturn,
} from '@embedpdf/models';
import { collectTransferables } from '../collect-transferables';

/**
* Request body that represent method calls of PdfEngine, it contains the
Expand Down Expand Up @@ -472,6 +473,6 @@ export class EngineRunner {
*/
respond(response: Response) {
this.logger.debug(LOG_SOURCE, LOG_CATEGORY, 'runner respond: ', response);
self.postMessage(response);
self.postMessage(response, { transfer: collectTransferables(response) });
}
}
15 changes: 15 additions & 0 deletions packages/models/src/pdf.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2919,6 +2919,17 @@ export interface PdfOpenDocumentUrlOptions {
normalizeRotation?: boolean;
}

/**
* Scheduling priority for render operations (higher = more urgent).
* Used by the orchestrator engine's task queue.
*/
export enum RenderPriority {
LOW = 0,
MEDIUM = 1,
HIGH = 2,
CRITICAL = 3,
}

export interface PdfRenderOptions {
/**
* Scale factor
Expand All @@ -2940,6 +2951,10 @@ export interface PdfRenderOptions {
* Image quality (0-1) for jpeg and png
*/
imageQuality?: number;
/**
* Scheduling priority.
*/
priority?: RenderPriority;
}

export interface ConvertToBlobOptions {
Expand Down
Loading