Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions src/collaboration/change-history.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { Test, TestingModule } from '@nestjs/testing';
import { ChangeHistoryService } from './change-history.service';
import { Operation } from './ot-crdt.service';

const makeOp = (revision: number, sessionId = 's1'): Operation => ({
type: 'insert',
position: 0,
content: 'x',
userId: 'u1',
sessionId,
revision,
});

describe('ChangeHistoryService', () => {
let service: ChangeHistoryService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [ChangeHistoryService],
}).compile();
service = module.get(ChangeHistoryService);
});

describe('record', () => {
it('stores an entry', () => {
service.record(makeOp(1));
expect(service.getHistory('s1')).toHaveLength(1);
});

it('stores entries for different sessions independently', () => {
service.record(makeOp(1, 's1'));
service.record(makeOp(1, 's2'));
expect(service.getHistory('s1')).toHaveLength(1);
expect(service.getHistory('s2')).toHaveLength(1);
});
});

describe('getHistory', () => {
beforeEach(() => {
service.record(makeOp(1));
service.record(makeOp(2));
service.record(makeOp(3));
});

it('returns all entries from revision 0', () => {
expect(service.getHistory('s1')).toHaveLength(3);
});

it('filters entries after fromRevision', () => {
expect(service.getHistory('s1', 1)).toHaveLength(2); // rev 2 and 3
});

it('returns empty for unknown session', () => {
expect(service.getHistory('unknown')).toEqual([]);
});
});

describe('getLatest', () => {
it('returns last N entries', () => {
for (let i = 1; i <= 5; i++) service.record(makeOp(i));
const latest = service.getLatest('s1', 3);
expect(latest).toHaveLength(3);
expect(latest[0].revision).toBe(3);
});
});

describe('clear', () => {
it('removes all history for session', () => {
service.record(makeOp(1));
service.clear('s1');
expect(service.getHistory('s1')).toHaveLength(0);
});
});
});
38 changes: 38 additions & 0 deletions src/collaboration/change-history.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { Injectable } from '@nestjs/common';
import { Operation } from './ot-crdt.service';

export interface HistoryEntry {
revision: number;
operation: Operation;
appliedAt: Date;
}

@Injectable()
export class ChangeHistoryService {
// sessionId -> ordered history entries
private readonly history = new Map<string, HistoryEntry[]>();

record(operation: Operation): void {
if (!this.history.has(operation.sessionId)) {
this.history.set(operation.sessionId, []);
}
this.history.get(operation.sessionId)!.push({
revision: operation.revision,
operation,
appliedAt: new Date(),
});
}

getHistory(sessionId: string, fromRevision = 0): HistoryEntry[] {
return (this.history.get(sessionId) ?? []).filter((e) => e.revision > fromRevision);
}

getLatest(sessionId: string, limit = 50): HistoryEntry[] {
const entries = this.history.get(sessionId) ?? [];
return entries.slice(-limit);
}

clear(sessionId: string): void {
this.history.delete(sessionId);
}
}
119 changes: 119 additions & 0 deletions src/collaboration/collaboration.gateway.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import { Logger } from '@nestjs/common';
import {
WebSocketGateway,
WebSocketServer,
SubscribeMessage,
MessageBody,
ConnectedSocket,
OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { COLLABORATION_EVENTS } from './constants/collaboration-events.constants';
import { JoinSessionDto, CollaborativeOperationDto, SyncRequestDto } from './dto/websocket.dto';
import { OtCrdtService, Operation } from './ot-crdt.service';
import { PresenceService } from './presence.service';
import { ChangeHistoryService } from './change-history.service';

@WebSocketGateway({ namespace: '/collaboration', cors: { origin: '*' } })
export class CollaborationGateway implements OnGatewayDisconnect {
@WebSocketServer()
server: Server;

private readonly logger = new Logger(CollaborationGateway.name);
// socketId -> { sessionId, userId }
private readonly socketMap = new Map<string, { sessionId: string; userId: string }>();

constructor(
private readonly otCrdt: OtCrdtService,
private readonly presence: PresenceService,
private readonly history: ChangeHistoryService,
) {}

handleDisconnect(client: Socket): void {
const info = this.socketMap.get(client.id);
if (info) {
this.presence.leave(info.sessionId, info.userId);
this.socketMap.delete(client.id);
this.server.to(info.sessionId).emit(COLLABORATION_EVENTS.USER_JOINED, {
userId: info.userId,
event: 'left',
presence: this.presence.getPresence(info.sessionId),
});
}
}

@SubscribeMessage(COLLABORATION_EVENTS.JOIN_SESSION)
handleJoin(@MessageBody() dto: JoinSessionDto, @ConnectedSocket() client: Socket) {
client.join(dto.sessionId);
this.socketMap.set(client.id, { sessionId: dto.sessionId, userId: dto.userId });
const presenceInfo = this.presence.join(dto.sessionId, dto.userId);

this.server.to(dto.sessionId).emit(COLLABORATION_EVENTS.USER_JOINED, {
userId: dto.userId,
event: 'joined',
presence: this.presence.getPresence(dto.sessionId),
});

return {
event: COLLABORATION_EVENTS.SESSION_STATE,
data: {
sessionId: dto.sessionId,
revision: this.otCrdt.currentRevision(dto.sessionId),
presence: this.presence.getPresence(dto.sessionId),
presenceInfo,
},
};
}

@SubscribeMessage(COLLABORATION_EVENTS.COLLABORATIVE_OPERATION)
handleOperation(
@MessageBody() dto: CollaborativeOperationDto,
@ConnectedSocket() client: Socket,
) {
const incomingOp = dto.operation as Operation;
const revision = this.otCrdt.nextRevision(dto.sessionId);
const op: Operation = { ...incomingOp, sessionId: dto.sessionId, userId: dto.userId, revision };

// Transform against any concurrent ops at the same revision
const concurrent = this.history
.getHistory(dto.sessionId, revision - 1)
.filter((e) => e.revision === revision && e.operation.userId !== dto.userId);

let finalOp = op;
for (const entry of concurrent) {
const result = this.otCrdt.transform(finalOp, entry.operation);
finalOp = result.operation;
}

this.history.record(finalOp);

// Broadcast to all other clients in the session
client.to(dto.sessionId).emit(COLLABORATION_EVENTS.OPERATION_APPLIED, {
operation: finalOp,
revision,
});

return {
event: COLLABORATION_EVENTS.OPERATION_APPLIED,
data: { operation: finalOp, revision },
};
}

@SubscribeMessage(COLLABORATION_EVENTS.REQUEST_SYNC)
handleSync(@MessageBody() dto: SyncRequestDto) {
const revision = this.otCrdt.currentRevision(dto.sessionId);
const history = this.history.getLatest(dto.sessionId);

return {
event: COLLABORATION_EVENTS.FULL_SYNC,
data: { sessionId: dto.sessionId, revision, history },
};
}

@SubscribeMessage(COLLABORATION_EVENTS.RESOLVE_CONFLICT)
handleConflict(@MessageBody() body: { op1: Operation; op2: Operation; sessionId: string }) {
const resolved = this.otCrdt.resolveConflict(body.op1, body.op2);
this.server.to(body.sessionId).emit(COLLABORATION_EVENTS.CONFLICT_RESOLVED, { resolved });
return { event: COLLABORATION_EVENTS.CONFLICT_RESOLVED, data: { resolved } };
}
}
11 changes: 11 additions & 0 deletions src/collaboration/collaboration.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { Module } from '@nestjs/common';
import { OtCrdtService } from './ot-crdt.service';
import { PresenceService } from './presence.service';
import { ChangeHistoryService } from './change-history.service';
import { CollaborationGateway } from './collaboration.gateway';

@Module({
providers: [OtCrdtService, PresenceService, ChangeHistoryService, CollaborationGateway],
exports: [OtCrdtService, PresenceService, ChangeHistoryService],
})
export class CollaborationModule {}
95 changes: 95 additions & 0 deletions src/collaboration/ot-crdt.service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { Test, TestingModule } from '@nestjs/testing';
import { OtCrdtService, Operation } from './ot-crdt.service';

const makeOp = (overrides: Partial<Operation> = {}): Operation => ({
type: 'insert',
position: 0,
content: 'a',
userId: 'u1',
sessionId: 's1',
revision: 1,
...overrides,
});

describe('OtCrdtService', () => {
let service: OtCrdtService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [OtCrdtService],
}).compile();
service = module.get(OtCrdtService);
});

describe('nextRevision / currentRevision', () => {
it('starts at 0', () => expect(service.currentRevision('s1')).toBe(0));
it('increments on each call', () => {
expect(service.nextRevision('s1')).toBe(1);
expect(service.nextRevision('s1')).toBe(2);
expect(service.currentRevision('s1')).toBe(2);
});
it('tracks sessions independently', () => {
service.nextRevision('s1');
expect(service.currentRevision('s2')).toBe(0);
});
});

describe('transform', () => {
it('returns untransformed when sessions differ', () => {
const op = makeOp({ sessionId: 's1' });
const against = makeOp({ sessionId: 's2' });
const result = service.transform(op, against);
expect(result.transformed).toBe(false);
expect(result.operation).toEqual(op);
});

it('insert vs insert: shifts position when against is before', () => {
const op = makeOp({ type: 'insert', position: 5, content: 'x' });
const against = makeOp({ type: 'insert', position: 3, content: 'ab' });
const result = service.transform(op, against);
expect(result.operation.position).toBe(7); // 5 + 2
});

it('insert vs insert: no shift when against is after', () => {
const op = makeOp({ type: 'insert', position: 3, content: 'x' });
const against = makeOp({ type: 'insert', position: 5, content: 'ab' });
const result = service.transform(op, against);
expect(result.operation.position).toBe(3);
});

it('insert vs delete: shifts position back', () => {
const op = makeOp({ type: 'insert', position: 5 });
const against = makeOp({ type: 'delete', position: 2, length: 2 });
const result = service.transform(op, against);
expect(result.operation.position).toBe(3); // 5 - 2
});

it('delete vs insert: shifts position forward', () => {
const op = makeOp({ type: 'delete', position: 5, length: 1 });
const against = makeOp({ type: 'insert', position: 3, content: 'ab' });
const result = service.transform(op, against);
expect(result.operation.position).toBe(7);
});

it('delete vs delete same position: zeroes length', () => {
const op = makeOp({ type: 'delete', position: 3, length: 2 });
const against = makeOp({ type: 'delete', position: 3, length: 2 });
const result = service.transform(op, against);
expect(result.operation.length).toBe(0);
});
});

describe('resolveConflict', () => {
it('picks higher revision', () => {
const op1 = makeOp({ revision: 3 });
const op2 = makeOp({ revision: 5 });
expect(service.resolveConflict(op1, op2)).toEqual(op2);
});

it('tie-breaks by userId lexicographic order', () => {
const op1 = makeOp({ revision: 2, userId: 'alice' });
const op2 = makeOp({ revision: 2, userId: 'bob' });
expect(service.resolveConflict(op1, op2)).toEqual(op1); // 'alice' <= 'bob'
});
});
});
Loading
Loading