From 6ebcadef2512783df3e0f1ac6dbdb388836d6d60 Mon Sep 17 00:00:00 2001 From: Nuel-ship-it Date: Wed, 27 May 2026 19:02:41 +0000 Subject: [PATCH] feat: add real-time collaboration features (#634) - Add OtCrdtService: operational transformation (insert/delete/retain), conflict resolution with revision-based + userId tie-break - Add PresenceService: join/leave/cursor tracking per session - Add ChangeHistoryService: record, query by revision, getLatest, clear - Add CollaborationGateway: WebSocket /collaboration namespace handling join-session, collaborative-operation, request-sync, resolve-conflict - Add CollaborationModule and wire into AppModule - Add 26 unit tests across 3 spec files Closes #634 --- src/app.module.ts | 7 +- .../change-history.service.spec.ts | 74 +++++++++++ src/collaboration/change-history.service.ts | 38 ++++++ src/collaboration/collaboration.gateway.ts | 119 ++++++++++++++++++ src/collaboration/collaboration.module.ts | 11 ++ .../collaboration-events.constants.ts | 32 ++--- src/collaboration/ot-crdt.service.spec.ts | 95 ++++++++++++++ src/collaboration/ot-crdt.service.ts | 83 ++++++++++++ src/collaboration/presence.service.spec.ts | 66 ++++++++++ src/collaboration/presence.service.ts | 48 +++++++ 10 files changed, 552 insertions(+), 21 deletions(-) create mode 100644 src/collaboration/change-history.service.spec.ts create mode 100644 src/collaboration/change-history.service.ts create mode 100644 src/collaboration/collaboration.gateway.ts create mode 100644 src/collaboration/collaboration.module.ts create mode 100644 src/collaboration/ot-crdt.service.spec.ts create mode 100644 src/collaboration/ot-crdt.service.ts create mode 100644 src/collaboration/presence.service.spec.ts create mode 100644 src/collaboration/presence.service.ts diff --git a/src/app.module.ts b/src/app.module.ts index 0d09de1d..e984ac4c 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -3,13 +3,10 @@ import { AppController } from './app.controller'; import { SearchModule } from './search/search.module'; import { DebuggingModule } from './debugging/debugging.module'; import { DataPipelineModule } from './data-pipeline/data-pipeline.module'; +import { CollaborationModule } from './collaboration/collaboration.module'; @Module({ - imports: [ - SearchModule, - DebuggingModule, - DataPipelineModule, - ], + imports: [SearchModule, DebuggingModule, DataPipelineModule, CollaborationModule], controllers: [AppController], providers: [], }) diff --git a/src/collaboration/change-history.service.spec.ts b/src/collaboration/change-history.service.spec.ts new file mode 100644 index 00000000..49561dca --- /dev/null +++ b/src/collaboration/change-history.service.spec.ts @@ -0,0 +1,74 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { ChangeHistoryService } from './change-history.service'; +import { Operation } from './ot-crdt.service'; + +const makeOp = (revision: number, sessionId = 's1'): Operation => ({ + type: 'insert', + position: 0, + content: 'x', + userId: 'u1', + sessionId, + revision, +}); + +describe('ChangeHistoryService', () => { + let service: ChangeHistoryService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [ChangeHistoryService], + }).compile(); + service = module.get(ChangeHistoryService); + }); + + describe('record', () => { + it('stores an entry', () => { + service.record(makeOp(1)); + expect(service.getHistory('s1')).toHaveLength(1); + }); + + it('stores entries for different sessions independently', () => { + service.record(makeOp(1, 's1')); + service.record(makeOp(1, 's2')); + expect(service.getHistory('s1')).toHaveLength(1); + expect(service.getHistory('s2')).toHaveLength(1); + }); + }); + + describe('getHistory', () => { + beforeEach(() => { + service.record(makeOp(1)); + service.record(makeOp(2)); + service.record(makeOp(3)); + }); + + it('returns all entries from revision 0', () => { + expect(service.getHistory('s1')).toHaveLength(3); + }); + + it('filters entries after fromRevision', () => { + expect(service.getHistory('s1', 1)).toHaveLength(2); // rev 2 and 3 + }); + + it('returns empty for unknown session', () => { + expect(service.getHistory('unknown')).toEqual([]); + }); + }); + + describe('getLatest', () => { + it('returns last N entries', () => { + for (let i = 1; i <= 5; i++) service.record(makeOp(i)); + const latest = service.getLatest('s1', 3); + expect(latest).toHaveLength(3); + expect(latest[0].revision).toBe(3); + }); + }); + + describe('clear', () => { + it('removes all history for session', () => { + service.record(makeOp(1)); + service.clear('s1'); + expect(service.getHistory('s1')).toHaveLength(0); + }); + }); +}); diff --git a/src/collaboration/change-history.service.ts b/src/collaboration/change-history.service.ts new file mode 100644 index 00000000..ed0432d9 --- /dev/null +++ b/src/collaboration/change-history.service.ts @@ -0,0 +1,38 @@ +import { Injectable } from '@nestjs/common'; +import { Operation } from './ot-crdt.service'; + +export interface HistoryEntry { + revision: number; + operation: Operation; + appliedAt: Date; +} + +@Injectable() +export class ChangeHistoryService { + // sessionId -> ordered history entries + private readonly history = new Map(); + + record(operation: Operation): void { + if (!this.history.has(operation.sessionId)) { + this.history.set(operation.sessionId, []); + } + this.history.get(operation.sessionId)!.push({ + revision: operation.revision, + operation, + appliedAt: new Date(), + }); + } + + getHistory(sessionId: string, fromRevision = 0): HistoryEntry[] { + return (this.history.get(sessionId) ?? []).filter((e) => e.revision > fromRevision); + } + + getLatest(sessionId: string, limit = 50): HistoryEntry[] { + const entries = this.history.get(sessionId) ?? []; + return entries.slice(-limit); + } + + clear(sessionId: string): void { + this.history.delete(sessionId); + } +} diff --git a/src/collaboration/collaboration.gateway.ts b/src/collaboration/collaboration.gateway.ts new file mode 100644 index 00000000..1ec4effc --- /dev/null +++ b/src/collaboration/collaboration.gateway.ts @@ -0,0 +1,119 @@ +import { Logger } from '@nestjs/common'; +import { + WebSocketGateway, + WebSocketServer, + SubscribeMessage, + MessageBody, + ConnectedSocket, + OnGatewayDisconnect, +} from '@nestjs/websockets'; +import { Server, Socket } from 'socket.io'; +import { COLLABORATION_EVENTS } from './constants/collaboration-events.constants'; +import { JoinSessionDto, CollaborativeOperationDto, SyncRequestDto } from './dto/websocket.dto'; +import { OtCrdtService, Operation } from './ot-crdt.service'; +import { PresenceService } from './presence.service'; +import { ChangeHistoryService } from './change-history.service'; + +@WebSocketGateway({ namespace: '/collaboration', cors: { origin: '*' } }) +export class CollaborationGateway implements OnGatewayDisconnect { + @WebSocketServer() + server: Server; + + private readonly logger = new Logger(CollaborationGateway.name); + // socketId -> { sessionId, userId } + private readonly socketMap = new Map(); + + constructor( + private readonly otCrdt: OtCrdtService, + private readonly presence: PresenceService, + private readonly history: ChangeHistoryService, + ) {} + + handleDisconnect(client: Socket): void { + const info = this.socketMap.get(client.id); + if (info) { + this.presence.leave(info.sessionId, info.userId); + this.socketMap.delete(client.id); + this.server.to(info.sessionId).emit(COLLABORATION_EVENTS.USER_JOINED, { + userId: info.userId, + event: 'left', + presence: this.presence.getPresence(info.sessionId), + }); + } + } + + @SubscribeMessage(COLLABORATION_EVENTS.JOIN_SESSION) + handleJoin(@MessageBody() dto: JoinSessionDto, @ConnectedSocket() client: Socket) { + client.join(dto.sessionId); + this.socketMap.set(client.id, { sessionId: dto.sessionId, userId: dto.userId }); + const presenceInfo = this.presence.join(dto.sessionId, dto.userId); + + this.server.to(dto.sessionId).emit(COLLABORATION_EVENTS.USER_JOINED, { + userId: dto.userId, + event: 'joined', + presence: this.presence.getPresence(dto.sessionId), + }); + + return { + event: COLLABORATION_EVENTS.SESSION_STATE, + data: { + sessionId: dto.sessionId, + revision: this.otCrdt.currentRevision(dto.sessionId), + presence: this.presence.getPresence(dto.sessionId), + presenceInfo, + }, + }; + } + + @SubscribeMessage(COLLABORATION_EVENTS.COLLABORATIVE_OPERATION) + handleOperation( + @MessageBody() dto: CollaborativeOperationDto, + @ConnectedSocket() client: Socket, + ) { + const incomingOp = dto.operation as Operation; + const revision = this.otCrdt.nextRevision(dto.sessionId); + const op: Operation = { ...incomingOp, sessionId: dto.sessionId, userId: dto.userId, revision }; + + // Transform against any concurrent ops at the same revision + const concurrent = this.history + .getHistory(dto.sessionId, revision - 1) + .filter((e) => e.revision === revision && e.operation.userId !== dto.userId); + + let finalOp = op; + for (const entry of concurrent) { + const result = this.otCrdt.transform(finalOp, entry.operation); + finalOp = result.operation; + } + + this.history.record(finalOp); + + // Broadcast to all other clients in the session + client.to(dto.sessionId).emit(COLLABORATION_EVENTS.OPERATION_APPLIED, { + operation: finalOp, + revision, + }); + + return { + event: COLLABORATION_EVENTS.OPERATION_APPLIED, + data: { operation: finalOp, revision }, + }; + } + + @SubscribeMessage(COLLABORATION_EVENTS.REQUEST_SYNC) + handleSync(@MessageBody() dto: SyncRequestDto) { + const revision = this.otCrdt.currentRevision(dto.sessionId); + const history = this.history.getLatest(dto.sessionId); + + return { + event: COLLABORATION_EVENTS.FULL_SYNC, + data: { sessionId: dto.sessionId, revision, history }, + }; + } + + @SubscribeMessage(COLLABORATION_EVENTS.RESOLVE_CONFLICT) + handleConflict(@MessageBody() body: { op1: Operation; op2: Operation; sessionId: string }) { + const resolved = this.otCrdt.resolveConflict(body.op1, body.op2); + this.server.to(body.sessionId).emit(COLLABORATION_EVENTS.CONFLICT_RESOLVED, { resolved }); + return { event: COLLABORATION_EVENTS.CONFLICT_RESOLVED, data: { resolved } }; + } +} diff --git a/src/collaboration/collaboration.module.ts b/src/collaboration/collaboration.module.ts new file mode 100644 index 00000000..5c377a20 --- /dev/null +++ b/src/collaboration/collaboration.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; +import { OtCrdtService } from './ot-crdt.service'; +import { PresenceService } from './presence.service'; +import { ChangeHistoryService } from './change-history.service'; +import { CollaborationGateway } from './collaboration.gateway'; + +@Module({ + providers: [OtCrdtService, PresenceService, ChangeHistoryService, CollaborationGateway], + exports: [OtCrdtService, PresenceService, ChangeHistoryService], +}) +export class CollaborationModule {} diff --git a/src/collaboration/constants/collaboration-events.constants.ts b/src/collaboration/constants/collaboration-events.constants.ts index ed4a1ce4..99388ab1 100644 --- a/src/collaboration/constants/collaboration-events.constants.ts +++ b/src/collaboration/constants/collaboration-events.constants.ts @@ -1,22 +1,22 @@ export const COLLABORATION_EVENTS = { - // Inbound WebSocket messages - JOIN_SESSION: 'join-session', - COLLABORATIVE_OPERATION: 'collaborative-operation', - REQUEST_SYNC: 'request-sync', - RESOLVE_CONFLICT: 'resolve-conflict', - // Outbound WebSocket messages - USER_JOINED: 'user-joined', - SESSION_STATE: 'session-state', - OPERATION_APPLIED: 'operation-applied', - FULL_SYNC: 'full-sync', - CONFLICT_RESOLVED: 'conflict-resolved', + // Inbound WebSocket messages + JOIN_SESSION: 'join-session', + COLLABORATIVE_OPERATION: 'collaborative-operation', + REQUEST_SYNC: 'request-sync', + RESOLVE_CONFLICT: 'resolve-conflict', + // Outbound WebSocket messages + USER_JOINED: 'user-joined', + SESSION_STATE: 'session-state', + OPERATION_APPLIED: 'operation-applied', + FULL_SYNC: 'full-sync', + CONFLICT_RESOLVED: 'conflict-resolved', } as const; export const NOTIFICATION_GATEWAY_EVENTS = { - SUBSCRIBE: 'subscribe', - NOTIFICATION: 'notification', - BROADCAST_NOTIFICATION: 'broadcast_notification', - SUBSCRIBE_NOTIFICATIONS: 'subscribe_notifications', + SUBSCRIBE: 'subscribe', + NOTIFICATION: 'notification', + BROADCAST_NOTIFICATION: 'broadcast_notification', + SUBSCRIBE_NOTIFICATIONS: 'subscribe_notifications', } as const; export const MESSAGING_GATEWAY_EVENTS = { - SEND_MESSAGE: 'send_message', + SEND_MESSAGE: 'send_message', } as const; diff --git a/src/collaboration/ot-crdt.service.spec.ts b/src/collaboration/ot-crdt.service.spec.ts new file mode 100644 index 00000000..ffa110a7 --- /dev/null +++ b/src/collaboration/ot-crdt.service.spec.ts @@ -0,0 +1,95 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { OtCrdtService, Operation } from './ot-crdt.service'; + +const makeOp = (overrides: Partial = {}): Operation => ({ + type: 'insert', + position: 0, + content: 'a', + userId: 'u1', + sessionId: 's1', + revision: 1, + ...overrides, +}); + +describe('OtCrdtService', () => { + let service: OtCrdtService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [OtCrdtService], + }).compile(); + service = module.get(OtCrdtService); + }); + + describe('nextRevision / currentRevision', () => { + it('starts at 0', () => expect(service.currentRevision('s1')).toBe(0)); + it('increments on each call', () => { + expect(service.nextRevision('s1')).toBe(1); + expect(service.nextRevision('s1')).toBe(2); + expect(service.currentRevision('s1')).toBe(2); + }); + it('tracks sessions independently', () => { + service.nextRevision('s1'); + expect(service.currentRevision('s2')).toBe(0); + }); + }); + + describe('transform', () => { + it('returns untransformed when sessions differ', () => { + const op = makeOp({ sessionId: 's1' }); + const against = makeOp({ sessionId: 's2' }); + const result = service.transform(op, against); + expect(result.transformed).toBe(false); + expect(result.operation).toEqual(op); + }); + + it('insert vs insert: shifts position when against is before', () => { + const op = makeOp({ type: 'insert', position: 5, content: 'x' }); + const against = makeOp({ type: 'insert', position: 3, content: 'ab' }); + const result = service.transform(op, against); + expect(result.operation.position).toBe(7); // 5 + 2 + }); + + it('insert vs insert: no shift when against is after', () => { + const op = makeOp({ type: 'insert', position: 3, content: 'x' }); + const against = makeOp({ type: 'insert', position: 5, content: 'ab' }); + const result = service.transform(op, against); + expect(result.operation.position).toBe(3); + }); + + it('insert vs delete: shifts position back', () => { + const op = makeOp({ type: 'insert', position: 5 }); + const against = makeOp({ type: 'delete', position: 2, length: 2 }); + const result = service.transform(op, against); + expect(result.operation.position).toBe(3); // 5 - 2 + }); + + it('delete vs insert: shifts position forward', () => { + const op = makeOp({ type: 'delete', position: 5, length: 1 }); + const against = makeOp({ type: 'insert', position: 3, content: 'ab' }); + const result = service.transform(op, against); + expect(result.operation.position).toBe(7); + }); + + it('delete vs delete same position: zeroes length', () => { + const op = makeOp({ type: 'delete', position: 3, length: 2 }); + const against = makeOp({ type: 'delete', position: 3, length: 2 }); + const result = service.transform(op, against); + expect(result.operation.length).toBe(0); + }); + }); + + describe('resolveConflict', () => { + it('picks higher revision', () => { + const op1 = makeOp({ revision: 3 }); + const op2 = makeOp({ revision: 5 }); + expect(service.resolveConflict(op1, op2)).toEqual(op2); + }); + + it('tie-breaks by userId lexicographic order', () => { + const op1 = makeOp({ revision: 2, userId: 'alice' }); + const op2 = makeOp({ revision: 2, userId: 'bob' }); + expect(service.resolveConflict(op1, op2)).toEqual(op1); // 'alice' <= 'bob' + }); + }); +}); diff --git a/src/collaboration/ot-crdt.service.ts b/src/collaboration/ot-crdt.service.ts new file mode 100644 index 00000000..45e7532c --- /dev/null +++ b/src/collaboration/ot-crdt.service.ts @@ -0,0 +1,83 @@ +import { Injectable, Logger } from '@nestjs/common'; + +export type OperationType = 'insert' | 'delete' | 'retain'; + +export interface Operation { + type: OperationType; + position: number; + content?: string; + length?: number; + userId: string; + sessionId: string; + revision: number; +} + +export interface TransformResult { + operation: Operation; + transformed: boolean; +} + +@Injectable() +export class OtCrdtService { + private readonly logger = new Logger(OtCrdtService.name); + // revision counter per session + private readonly revisions = new Map(); + + /** + * Operational Transformation: transform op against a concurrent op + * so both can be applied in any order and converge to the same state. + */ + transform(op: Operation, against: Operation): TransformResult { + if (op.sessionId !== against.sessionId) { + return { operation: op, transformed: false }; + } + + const transformed = { ...op }; + + if (op.type === 'insert' && against.type === 'insert') { + if (against.position <= op.position) { + transformed.position += against.content?.length ?? 0; + } + } else if (op.type === 'insert' && against.type === 'delete') { + if (against.position < op.position) { + transformed.position = Math.max(against.position, op.position - (against.length ?? 0)); + } + } else if (op.type === 'delete' && against.type === 'insert') { + if (against.position <= op.position) { + transformed.position += against.content?.length ?? 0; + } + } else if (op.type === 'delete' && against.type === 'delete') { + if (against.position < op.position) { + transformed.position = Math.max(against.position, op.position - (against.length ?? 0)); + } else if (against.position === op.position) { + // same position delete — idempotent, skip + transformed.length = 0; + } + } + + return { operation: transformed, transformed: true }; + } + + /** + * Resolve conflict between two concurrent operations. + * Last-writer-wins by userId lexicographic order for determinism. + */ + resolveConflict(op1: Operation, op2: Operation): Operation { + this.logger.debug(`Resolving conflict: rev=${op1.revision} vs rev=${op2.revision}`); + if (op1.revision !== op2.revision) { + return op1.revision > op2.revision ? op1 : op2; + } + // same revision — deterministic tie-break + return op1.userId <= op2.userId ? op1 : op2; + } + + nextRevision(sessionId: string): number { + const rev = (this.revisions.get(sessionId) ?? 0) + 1; + this.revisions.set(sessionId, rev); + return rev; + } + + currentRevision(sessionId: string): number { + return this.revisions.get(sessionId) ?? 0; + } +} diff --git a/src/collaboration/presence.service.spec.ts b/src/collaboration/presence.service.spec.ts new file mode 100644 index 00000000..aaad8439 --- /dev/null +++ b/src/collaboration/presence.service.spec.ts @@ -0,0 +1,66 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { PresenceService } from './presence.service'; + +describe('PresenceService', () => { + let service: PresenceService; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [PresenceService], + }).compile(); + service = module.get(PresenceService); + }); + + describe('join', () => { + it('returns presence info', () => { + const info = service.join('s1', 'u1'); + expect(info.userId).toBe('u1'); + expect(info.sessionId).toBe('s1'); + expect(info.joinedAt).toBeInstanceOf(Date); + }); + + it('marks user as present', () => { + service.join('s1', 'u1'); + expect(service.isPresent('s1', 'u1')).toBe(true); + }); + }); + + describe('leave', () => { + it('removes user from session', () => { + service.join('s1', 'u1'); + service.leave('s1', 'u1'); + expect(service.isPresent('s1', 'u1')).toBe(false); + }); + + it('cleans up empty session', () => { + service.join('s1', 'u1'); + service.leave('s1', 'u1'); + expect(service.getPresence('s1')).toHaveLength(0); + }); + }); + + describe('updateCursor', () => { + it('updates cursor position', () => { + service.join('s1', 'u1'); + service.updateCursor('s1', 'u1', 42); + const presence = service.getPresence('s1'); + expect(presence[0].cursorPosition).toBe(42); + }); + + it('is a no-op for unknown user', () => { + expect(() => service.updateCursor('s1', 'unknown', 5)).not.toThrow(); + }); + }); + + describe('getPresence', () => { + it('returns all users in session', () => { + service.join('s1', 'u1'); + service.join('s1', 'u2'); + expect(service.getPresence('s1')).toHaveLength(2); + }); + + it('returns empty array for unknown session', () => { + expect(service.getPresence('unknown')).toEqual([]); + }); + }); +}); diff --git a/src/collaboration/presence.service.ts b/src/collaboration/presence.service.ts new file mode 100644 index 00000000..306ad010 --- /dev/null +++ b/src/collaboration/presence.service.ts @@ -0,0 +1,48 @@ +import { Injectable } from '@nestjs/common'; + +export interface PresenceInfo { + userId: string; + sessionId: string; + joinedAt: Date; + lastSeenAt: Date; + cursorPosition?: number; +} + +@Injectable() +export class PresenceService { + // sessionId -> userId -> PresenceInfo + private readonly sessions = new Map>(); + + join(sessionId: string, userId: string): PresenceInfo { + if (!this.sessions.has(sessionId)) { + this.sessions.set(sessionId, new Map()); + } + const now = new Date(); + const info: PresenceInfo = { userId, sessionId, joinedAt: now, lastSeenAt: now }; + this.sessions.get(sessionId)!.set(userId, info); + return info; + } + + leave(sessionId: string, userId: string): void { + this.sessions.get(sessionId)?.delete(userId); + if (this.sessions.get(sessionId)?.size === 0) { + this.sessions.delete(sessionId); + } + } + + updateCursor(sessionId: string, userId: string, cursorPosition: number): void { + const info = this.sessions.get(sessionId)?.get(userId); + if (info) { + info.cursorPosition = cursorPosition; + info.lastSeenAt = new Date(); + } + } + + getPresence(sessionId: string): PresenceInfo[] { + return Array.from(this.sessions.get(sessionId)?.values() ?? []); + } + + isPresent(sessionId: string, userId: string): boolean { + return this.sessions.get(sessionId)?.has(userId) ?? false; + } +}