Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,19 @@ DEBUG_RPC=false
# Enable CORS for development
CORS_ENABLED=true
CORS_ORIGIN=http://localhost:3000

# ============================================
# Webhook Delivery
# ============================================

# Secret used to sign outbound webhook deliveries
WEBHOOK_SIGNING_SECRET=webhook-secret

# Secret used to verify inbound webhook dispatch requests
WEBHOOK_INGEST_SECRET=webhook-secret

# Retry configuration for BullMQ webhook jobs
WEBHOOK_MAX_ATTEMPTS=5
WEBHOOK_BACKOFF_DELAY_MS=1000
WEBHOOK_REQUEST_TIMEOUT_MS=10000
WEBHOOK_WORKER_CONCURRENCY=25
4,240 changes: 1,672 additions & 2,568 deletions backend/package-lock.json

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
"start": "node dist/src/index.js",
"start:prod": "node dist/src/index.js",
"dev": "tsx watch src/index.ts",


"test": "node --experimental-vm-modules ./node_modules/jest/bin/jest.js --config ./jest.config.js",
"collaboration": "tsx src/collaborationServer.ts"
},
"keywords": [],
Expand Down Expand Up @@ -47,9 +46,12 @@
"@types/bcryptjs": "^2.4.6",
"@types/cors": "^2.8.19",
"@types/express": "^5.0.6",
"@types/jest": "^30.0.0",
"@types/node": "^25.5.0",
"@types/qrcode": "^1.5.5",
"@types/ws": "^8.18.1",
"jest": "^30.4.2",
"ts-jest": "^29.4.11",
"ts-node": "^10.9.2",
"tsx": "^4.21.0",
"typescript": "^5.9.3"
Expand Down
5 changes: 5 additions & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { requestLogger } from './middleware/requestLogger.js';
import { requireWorkspaceMiddleware } from './middleware/WorkspaceContext.js';
import freelanceRoute from './routes/freelance.js';
import routes from './routes/index.js';
import { startWebhookWorker, stopWebhookWorker } from './services/webhooks/index.js';
import { validateEnvironment } from './utils/checkEnv.js';
import logger from './utils/logger.js';
import { pubClient, redisConnection, subClient } from './utils/redis.js';
Expand Down Expand Up @@ -45,6 +46,8 @@ if (process.env.NODE_ENV !== 'test') {
logger.warn('CacheWarmer failed to start:', err);
});

startWebhookWorker();

logger.info('Distributed caching layer initialized');
}

Expand Down Expand Up @@ -116,6 +119,7 @@ if (process.env.NODE_ENV !== 'test') {
// Stop cache components
blockHeaderListener.stop();
cacheWarmer.stop();
await stopWebhookWorker();
await distributedCacheManager.gracefulShutdown();

// Clean up connections
Expand All @@ -135,6 +139,7 @@ if (process.env.NODE_ENV !== 'test') {
// Stop cache components
blockHeaderListener.stop();
cacheWarmer.stop();
await stopWebhookWorker();
await distributedCacheManager.gracefulShutdown();

// Clean up connections
Expand Down
2 changes: 2 additions & 0 deletions backend/src/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import exportRouter from './export.routes.js';
import generatorRouter from './generator/generator.routes.js';
import learningRoutes from './learning/learning.routes.js';
import healthRouter from './health.routes.js';
import webhooksRouter from './webhooks.js';
import securityRouter from './security.routes.js';
import studentsRouter from './students.js';

Expand All @@ -29,6 +30,7 @@ router.use('/learning', learningRoutes);
router.use('/security', securityRouter);
router.use('/generator', generatorRouter);
router.use('/export', exportRouter);
router.use('/webhooks', webhooksRouter);
router.use('/user', userRouter);

export default router;
79 changes: 65 additions & 14 deletions backend/src/routes/webhooks.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,78 @@
import { Request, Response, Router } from 'express';
import { verifySignature } from '../utils/signature.js';
import { enqueueWebhook } from '../services/queue.service.js';
import logger from '../utils/logger.js';
import {
canonicalizeWebhookPayload,
enqueueWebhookDeliveries,
verifyWebhookSignature,
} from '../services/webhooks/index.js';
import type {
WebhookDestination,
WebhookEventPayload,
} from '../services/webhooks/index.js';

const router = Router();
const WEBHOOK_SECRET = process.env.WEBHOOK_SECRET || 'webhook-secret';

router.post('/ingest', async (req: Request, res: Response) => {
const signature = req.headers['x-webhook-signature'] as string;
const payload = JSON.stringify(req.body);
const getIngestSecret = (): string => {
return process.env.WEBHOOK_INGEST_SECRET || process.env.WEBHOOK_SIGNING_SECRET || 'webhook-secret';
};

if (!signature || !verifySignature(payload, signature, WEBHOOK_SECRET)) {
logger.warn('Invalid webhook signature');
return res.status(401).json({ error: 'Invalid signature' });
const extractBody = (body: unknown): {
event: WebhookEventPayload;
destinations: WebhookDestination[];
metadata?: Record<string, unknown>;
} => {
if (!body || typeof body !== 'object') {
throw new Error('Request body must be an object');
}

const candidate = body as {
event?: WebhookEventPayload;
destinations?: WebhookDestination[];
metadata?: Record<string, unknown>;
};

if (!candidate.event || !candidate.destinations || candidate.destinations.length === 0) {
throw new Error('Request body must include an event and at least one destination');
}

return {
event: candidate.event,
destinations: candidate.destinations,
metadata: candidate.metadata,
};
};

router.get('/health', async (_req: Request, res: Response) => {
res.status(200).json({
status: 'ok',
mode: 'webhook-dispatch',
});
});

router.post(['/ingest', '/dispatch'], async (req: Request, res: Response) => {
try {
// Immediately enqueue and return 200 OK
await enqueueWebhook(req.body);
res.status(200).json({ status: 'accepted' });
const timestamp = req.header('x-webhook-timestamp') || '';
const signature = req.header('x-webhook-signature') || '';
const canonicalBody = canonicalizeWebhookPayload(req.body);
const secret = getIngestSecret();

if (!verifyWebhookSignature(canonicalBody, signature, secret, timestamp)) {
logger.warn('Rejected webhook dispatch request with invalid signature');
return res.status(401).json({ error: 'Invalid webhook signature' });
}

const { event, destinations, metadata } = extractBody(req.body);
const results = await enqueueWebhookDeliveries(event, destinations, metadata);

return res.status(202).json({
status: 'accepted',
deliveries: results,
});
} catch (error) {
logger.error('Failed to enqueue webhook:', error);
res.status(500).json({ error: 'Internal server error' });
logger.error('Failed to accept webhook dispatch request:', error);
return res.status(400).json({
error: error instanceof Error ? error.message : 'Invalid webhook dispatch request',
});
}
});

Expand Down
68 changes: 40 additions & 28 deletions backend/src/services/queue.service.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,48 @@
import redis from '../utils/redis.js';
import logger from '../utils/logger.js';
import {
enqueueWebhookDelivery,
enqueueWebhookDeliveries,
webhookDeadLetterQueue,
webhookDeliveryQueue,
WEBHOOK_DEAD_LETTER_QUEUE_NAME,
WEBHOOK_DELIVERY_QUEUE_NAME,
} from './webhooks/index.js';
import type {
DeadLetterWebhookJob,
WebhookDeliveryJobData,
WebhookDeliveryRequest,
} from './webhooks/index.js';

const WEBHOOK_QUEUE = 'webhooks:queue';
const WEBHOOK_DLQ = 'webhooks:dlq';
export const enqueueWebhook = async (payload: WebhookDeliveryRequest): Promise<void> => {
await enqueueWebhookDelivery(payload);
};

export const enqueueWebhook = async (payload: any): Promise<void> => {
await redis.lpush(
WEBHOOK_QUEUE,
JSON.stringify({
...payload,
enqueuedAt: Date.now(),
retries: 0,
})
);
export const enqueueWebhooks = async (
event: WebhookDeliveryRequest['event'],
destinations: WebhookDeliveryRequest['destination'][]
): Promise<Array<{ deliveryId: string; queue: string }>> => {
return enqueueWebhookDeliveries(event, destinations);
};

export const dequeueWebhook = async (): Promise<any | null> => {
const data = await redis.brpop(WEBHOOK_QUEUE, 0); // Block until data is available
if (data) {
return JSON.parse(data[1]);
}
return null;
export const enqueueDLQ = async (payload: WebhookDeliveryJobData, error: string): Promise<void> => {
const deadLetterJob: DeadLetterWebhookJob = {
...payload,
failedAt: new Date().toISOString(),
error,
};

await webhookDeadLetterQueue.add(payload.event.type, deadLetterJob);
};

export const enqueueDLQ = async (payload: any, error: string): Promise<void> => {
await redis.lpush(
WEBHOOK_DLQ,
JSON.stringify({
...payload,
failedAt: Date.now(),
error,
})
export const dequeueWebhook = async (): Promise<null> => {
throw new Error(
`Manual dequeue is not supported by ${WEBHOOK_DELIVERY_QUEUE_NAME}; use BullMQ workers instead`
);
logger.error(`Webhook moved to DLQ: ${error}`);
};

export {
webhookDeadLetterQueue,
webhookDeliveryQueue,
WEBHOOK_DEAD_LETTER_QUEUE_NAME,
WEBHOOK_DELIVERY_QUEUE_NAME,
};

52 changes: 9 additions & 43 deletions backend/src/services/webhookWorker.ts
Original file line number Diff line number Diff line change
@@ -1,46 +1,12 @@
import { dequeueWebhook, enqueueWebhook, enqueueDLQ } from './queue.service.js';
import logger from '../utils/logger.js';
import {
PermanentWebhookDeliveryError,
deliverWebhook,
startWebhookWorker,
stopWebhookWorker,
} from './webhooks/index.js';

const MAX_RETRIES = 5;
export { PermanentWebhookDeliveryError, deliverWebhook, startWebhookWorker, stopWebhookWorker };

const processWebhookLogic = async (payload: any) => {
// Implement actual business logic here
// e.g., update frontend state, database, etc.
logger.info(`Processing webhook: ${JSON.stringify(payload)}`);

// Simulate some logic
if (payload.shouldFail) {
throw new Error('Simulated processing failure');
}
};

export const startWorker = async () => {
logger.info('Webhook worker started');

while (true) {
try {
const webhook = await dequeueWebhook();
if (!webhook) continue;

try {
await processWebhookLogic(webhook);
} catch (error) {
if (webhook.retries < MAX_RETRIES) {
webhook.retries += 1;
const backoff = Math.pow(2, webhook.retries) * 1000;
logger.warn(`Retrying webhook in ${backoff}ms (Attempt ${webhook.retries})`);

setTimeout(async () => {
await enqueueWebhook(webhook);
}, backoff);
} else {
await enqueueDLQ(webhook, error instanceof Error ? error.message : 'Unknown error');
}
}
} catch (error) {
logger.error('Worker loop error:', error);
// Wait a bit before continuing to avoid tight loop on error
await new Promise((resolve) => setTimeout(resolve, 5000));
}
}
export const startWorker = async (): Promise<void> => {
startWebhookWorker();
};
68 changes: 68 additions & 0 deletions backend/src/services/webhooks/dispatcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { randomUUID } from 'crypto';
import type { JobsOptions } from 'bullmq';
import logger from '../../utils/logger.js';
import {
webhookDeliveryQueue,
WEBHOOK_DELIVERY_QUEUE_NAME,
} from './queue.js';
import type {
WebhookDeliveryJobData,
WebhookDeliveryRequest,
WebhookDestination,
WebhookEventPayload,
} from './types.js';

export const buildWebhookDeliveryJob = (request: WebhookDeliveryRequest): WebhookDeliveryJobData => {
return {
deliveryId: randomUUID(),
destination: request.destination,
event: request.event,
metadata: request.metadata,
};
};

export const buildWebhookJobOptions = (overrides: Partial<JobsOptions> = {}): JobsOptions => {
return {
priority: 10,
...overrides,
};
};

export const enqueueWebhookDelivery = async (
request: WebhookDeliveryRequest,
overrides: Partial<JobsOptions> = {}
): Promise<{ deliveryId: string; queue: string }> => {
return enqueueWebhookDeliveryToQueue(webhookDeliveryQueue, request, overrides);
};

export const enqueueWebhookDeliveryToQueue = async (
queue: Pick<typeof webhookDeliveryQueue, 'add'>,
request: WebhookDeliveryRequest,
overrides: Partial<JobsOptions> = {}
): Promise<{ deliveryId: string; queue: string }> => {
const job = buildWebhookDeliveryJob(request);
await queue.add(job.event.type, job, buildWebhookJobOptions(overrides));

logger.info(`Queued webhook delivery ${job.deliveryId} for ${job.destination.url}`);

return {
deliveryId: job.deliveryId,
queue: WEBHOOK_DELIVERY_QUEUE_NAME,
};
};

export const enqueueWebhookDeliveries = async (
event: WebhookEventPayload,
destinations: WebhookDestination[],
metadata: Record<string, unknown> = {}
): Promise<Array<{ deliveryId: string; queue: string }>> => {
const jobs = destinations.map((destination) =>
enqueueWebhookDelivery({
destination,
event,
metadata,
})
);

return Promise.all(jobs);
};
6 changes: 6 additions & 0 deletions backend/src/services/webhooks/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export * from './dispatcher.js';
export * from './queue.js';
export * from './signature.js';
export * from './types.js';
export * from './worker.js';

Loading