diff --git a/src/plugins/objects/defaults.ts b/src/plugins/objects/defaults.ts index f60345c68..e0c397374 100644 --- a/src/plugins/objects/defaults.ts +++ b/src/plugins/objects/defaults.ts @@ -1,5 +1,5 @@ export const DEFAULTS = { - gcInterval: 1000 * 60 * 5, // 5 minutes + gcInterval: 1000 * 60 * 5, // RTO10a, 5 minutes /** * The SDK will attempt to use the `objectsGCGracePeriod` value provided by the server in the `connectionDetails` object of the `CONNECTED` event. * If the server does not provide this value, the SDK will fall back to this default value. @@ -9,5 +9,5 @@ export const DEFAULTS = { * * Applies both for map entries tombstones and object tombstones. */ - gcGracePeriod: 1000 * 60 * 60 * 24, // 24 hours + gcGracePeriod: 1000 * 60 * 60 * 24, // RTO10b3, 24 hours }; diff --git a/src/plugins/objects/livecounter.ts b/src/plugins/objects/livecounter.ts index 6251840e9..186c8474c 100644 --- a/src/plugins/objects/livecounter.ts +++ b/src/plugins/objects/livecounter.ts @@ -168,6 +168,7 @@ export class LiveCounter extends LiveObject /** * @internal + * @spec RTLC7, RTLC7a */ applyOperation(op: ObjectOperation, msg: ObjectMessage): void { if (op.objectId !== this.getObjectId()) { @@ -181,6 +182,7 @@ export class LiveCounter extends LiveObject const opSerial = msg.serial!; const opSiteCode = msg.siteCode!; if (!this._canApplyOperation(opSerial, opSiteCode)) { + // RTLC7b this._client.Logger.logAction( this._client.logger, this._client.Logger.LOG_MICRO, @@ -191,17 +193,18 @@ export class LiveCounter extends LiveObject } // should update stored site serial immediately. doesn't matter if we successfully apply the op, // as it's important to mark that the op was processed by the object - this._siteTimeserials[opSiteCode] = opSerial; + this._siteTimeserials[opSiteCode] = opSerial; // RTLC7c if (this.isTombstoned()) { - // this object is tombstoned so the operation cannot be applied + // RTLC7e - this object is tombstoned so the operation cannot be applied return; } let update: LiveCounterUpdate | LiveObjectUpdateNoop; + // RTLC7d switch (op.action) { case ObjectOperationAction.COUNTER_CREATE: - update = this._applyCounterCreate(op); + update = this._applyCounterCreate(op); // RTLC7d1 break; case ObjectOperationAction.COUNTER_INC: @@ -210,15 +213,16 @@ export class LiveCounter extends LiveObject // leave an explicit return here, so that TS knows that update object is always set after the switch statement. return; } else { - update = this._applyCounterInc(op.counterOp); + update = this._applyCounterInc(op.counterOp); // RTLC7d2 } break; case ObjectOperationAction.OBJECT_DELETE: - update = this._applyObjectDelete(msg); + update = this._applyObjectDelete(msg); // RTLC7d4, RTLC7d4a break; default: + // RTLC7d3 throw new this._client.ErrorInfo( `Invalid ${op.action} op for LiveCounter objectId=${this.getObjectId()}`, 92000, @@ -271,27 +275,26 @@ export class LiveCounter extends LiveObject this._siteTimeserials = objectState.siteTimeserials ?? {}; // RTLC6a if (this.isTombstoned()) { - // this object is tombstoned. this is a terminal state which can't be overridden. skip the rest of object state message processing - return { noop: true }; + // RTLC6e - this object is tombstoned. this is a terminal state which can't be overridden. skip the rest of object state message processing + return { noop: true }; // RTLC6e1 } const previousDataRef = this._dataRef; if (objectState.tombstone) { // tombstone this object and ignore the data from the object state message - this.tombstone(objectMessage); + this.tombstone(objectMessage); // RTLC6f } else { // override data for this object with data from the object state this._createOperationIsMerged = false; // RTLC6b this._dataRef = { data: objectState.counter?.count ?? 0 }; // RTLC6c - // RTLC6d if (!this._client.Utils.isNil(objectState.createOp)) { - this._mergeInitialDataFromCreateOperation(objectState.createOp); + this._mergeInitialDataFromCreateOperation(objectState.createOp); // RTLC6d } } // if object got tombstoned, the update object will include all data that got cleared. // otherwise it is a diff between previous value and new value from object state. - return this._updateFromDataDiff(previousDataRef, this._dataRef); + return this._updateFromDataDiff(previousDataRef, this._dataRef); // RTLC6f1 } /** @@ -312,13 +315,14 @@ export class LiveCounter extends LiveObject return { update: { amount: counterDiff } }; } + /** @spec RTLC10 */ protected _mergeInitialDataFromCreateOperation(objectOperation: ObjectOperation): LiveCounterUpdate { // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case. // note that it is intentional to SUM the incoming count from the create op. // if we got here, it means that current counter instance is missing the initial value in its data reference, // which we're going to add now. - this._dataRef.data += objectOperation.counter?.count ?? 0; // RTLC6d1 - this._createOperationIsMerged = true; // RTLC6d2 + this._dataRef.data += objectOperation.counter?.count ?? 0; // RTLC10a + this._createOperationIsMerged = true; // RTLC10b return { update: { amount: objectOperation.counter?.count ?? 0 } }; } @@ -331,8 +335,10 @@ export class LiveCounter extends LiveObject ); } + /** @spec RTLC8 */ private _applyCounterCreate(op: ObjectOperation): LiveCounterUpdate | LiveObjectUpdateNoop { if (this._createOperationIsMerged) { + // RTLC8b // There can't be two different create operation for the same object id, because the object id // fully encodes that operation. This means we can safely ignore any new incoming create operations // if we already merged it once. @@ -345,11 +351,12 @@ export class LiveCounter extends LiveObject return { noop: true }; } - return this._mergeInitialDataFromCreateOperation(op); + return this._mergeInitialDataFromCreateOperation(op); // RTLC8c } + /** @spec RTLC9 */ private _applyCounterInc(op: ObjectsCounterOp): LiveCounterUpdate { - this._dataRef.data += op.amount; + this._dataRef.data += op.amount; // RTLC9b return { update: { amount: op.amount } }; } } diff --git a/src/plugins/objects/livemap.ts b/src/plugins/objects/livemap.ts index 477b07ff7..8e42670f3 100644 --- a/src/plugins/objects/livemap.ts +++ b/src/plugins/objects/livemap.ts @@ -40,7 +40,7 @@ export type LiveMapObjectData = ObjectIdObjectData | ValueObjectData; export interface LiveMapEntry { tombstone: boolean; - tombstonedAt: number | undefined; + tombstonedAt: number | undefined; // RTLM3a1 timeserial: string | undefined; data: LiveMapObjectData | undefined; } @@ -300,6 +300,7 @@ export class LiveMap extends LiveObject(key: TKey): T[TKey] | undefined { this._objects.throwIfInvalidAccessApiConfiguration(); // RTLM5b, RTLM5c + // RTLM5e if (this.isTombstoned()) { return undefined as T[TKey]; } @@ -320,9 +321,11 @@ export class LiveMap extends LiveObject extends LiveObject(): IterableIterator<[TKey, T[TKey]]> { - this._objects.throwIfInvalidAccessApiConfiguration(); + this._objects.throwIfInvalidAccessApiConfiguration(); // RTLM11b, RTLM11c + // RTLM11d for (const [key, entry] of this._dataRef.data.entries()) { if (this._isMapEntryTombstoned(entry)) { - // do not return tombstoned entries + // RTLM11d1 - do not return tombstoned entries continue; } // data always exists for non-tombstoned elements const value = this._getResolvedValueFromObjectData(entry.data!) as T[TKey]; - yield [key as TKey, value]; + yield [key as TKey, value]; // RTLM11d2 } } + /** @spec RTLM12, RTLM12a */ *keys(): IterableIterator { for (const [key] of this.entries()) { - yield key; + yield key; // RTLM12b } } + /** @spec RTLM13, RTLM13a */ *values(): IterableIterator { for (const [_, value] of this.entries()) { - yield value; + yield value; // RTLM13b } } @@ -395,6 +402,7 @@ export class LiveMap extends LiveObject, msg: ObjectMessage): void { if (op.objectId !== this.getObjectId()) { @@ -408,6 +416,7 @@ export class LiveMap extends LiveObject extends LiveObject | LiveObjectUpdateNoop; + // RTLM15d switch (op.action) { case ObjectOperationAction.MAP_CREATE: - update = this._applyMapCreate(op); + update = this._applyMapCreate(op); // RTLM15d1 break; case ObjectOperationAction.MAP_SET: @@ -437,7 +447,7 @@ export class LiveMap extends LiveObject extends LiveObject extends LiveObject= this._objects.gcGracePeriod) { - keysToDelete.push(key); + keysToDelete.push(key); // RTLM19a1 } } @@ -624,6 +636,7 @@ export class LiveMap extends LiveObject): LiveMapUpdate { if (this._client.Utils.isNil(objectOperation.map)) { // if a map object is missing for the MAP_CREATE op, the initial value is implicitly an empty map. @@ -632,7 +645,7 @@ export class LiveMap extends LiveObject = { update: {} }; - // RTLM6d1 + // RTLM17a // in order to apply MAP_CREATE op for an existing map, we should merge their underlying entries keys. // we can do this by iterating over entries from MAP_CREATE op and apply changes on per-key basis as if we had MAP_SET, MAP_REMOVE operations. Object.entries(objectOperation.map.entries ?? {}).forEach(([key, entry]) => { @@ -640,10 +653,10 @@ export class LiveMap extends LiveObject | LiveObjectUpdateNoop; if (entry.tombstone === true) { - // RTLM6d1b - entry in MAP_CREATE op is removed, try to apply MAP_REMOVE op + // RTLM17a2 - entry in MAP_CREATE op is removed, try to apply MAP_REMOVE op update = this._applyMapRemove({ key }, opSerial, entry.serialTimestamp); } else { - // RTLM6d1a - entry in MAP_CREATE op is not removed, try to set it via MAP_SET op + // RTLM17a1 - entry in MAP_CREATE op is not removed, try to set it via MAP_SET op update = this._applyMapSet({ key, data: entry.data }, opSerial); } @@ -656,7 +669,7 @@ export class LiveMap extends LiveObject extends LiveObject): LiveMapUpdate | LiveObjectUpdateNoop { if (this._createOperationIsMerged) { + // RTLM16b // There can't be two different create operation for the same object id, because the object id // fully encodes that operation. This means we can safely ignore any new incoming create operations // if we already merged it once. @@ -684,6 +699,7 @@ export class LiveMap extends LiveObject extends LiveObject, - opSerial: string | undefined, + op: ObjectsMapOp, // RTLM7d1 + opSerial: string | undefined, // RTLM7d2 ): LiveMapUpdate | LiveObjectUpdateNoop { const { ErrorInfo, Utils } = this._client; @@ -751,14 +767,14 @@ export class LiveMap extends LiveObject extends LiveObject, - opSerial: string | undefined, - opTimestamp: number | undefined, + op: ObjectsMapOp, // RTLM8c1 + opSerial: string | undefined, // RTLM8c2 + opTimestamp: number | undefined, // RTLM8c3 ): LiveMapUpdate | LiveObjectUpdateNoop { const existingEntry = this._dataRef.data.get(op.key); // RTLM8a @@ -791,30 +807,33 @@ export class LiveMap extends LiveObject extends LiveObject extends LiveObject extends LiveObject extends LiveObject; - protected _createOperationIsMerged: boolean; - private _tombstone: boolean; - private _tombstonedAt: number | undefined; + protected _siteTimeserials: Record; // RTLO3b + protected _createOperationIsMerged: boolean; // RTLO3c + private _tombstone: boolean; // RTLO3d + private _tombstonedAt: number | undefined; // RTLO3e, RTLO3e1 protected constructor( protected _objects: Objects, @@ -60,12 +61,12 @@ export abstract class LiveObject< this._client = this._objects.getClient(); this._subscriptions = new this._client.EventEmitter(this._client.logger); this._lifecycleEvents = new this._client.EventEmitter(this._client.logger); - this._objectId = objectId; + this._objectId = objectId; // RTLO3a1 this._dataRef = this._getZeroValueData(); // use empty map of serials by default, so any future operation can be applied to this object - this._siteTimeserials = {}; - this._createOperationIsMerged = false; - this._tombstone = false; + this._siteTimeserials = {}; // RTLO3b1 + this._createOperationIsMerged = false; // RTLO3c1 + this._tombstone = false; // RTLO3d1 } subscribe(listener: (update: TUpdate) => void): SubscribeResponse { @@ -150,21 +151,25 @@ export abstract class LiveObject< * Clears the object's data, cancels any buffered operations and sets the tombstone flag to `true`. * * @internal + * @spec RTLO4e */ tombstone(objectMessage: ObjectMessage): TUpdate { - this._tombstone = true; + this._tombstone = true; // RTLO4e2 + // RTLO4e3 if (objectMessage.serialTimestamp != null) { - this._tombstonedAt = objectMessage.serialTimestamp; + this._tombstonedAt = objectMessage.serialTimestamp; // RTLO4e3a } else { + // RTLO4e3b1 this._client.Logger.logAction( this._client.logger, this._client.Logger.LOG_MINOR, 'LiveObject.tombstone()', `object has been tombstoned but no "serialTimestamp" found in the message, using local clock instead; objectId=${this.getObjectId()}`, ); + // RTLO4e3b this._tombstonedAt = Date.now(); // best-effort estimate since no timestamp provided by the server } - const update = this.clearData(); + const update = this.clearData(); // RTLO4e4 this._lifecycleEvents.emit(LiveObjectLifecycleEvent.deleted); return update; @@ -198,22 +203,25 @@ export abstract class LiveObject< * * An operation should be applied if its serial is strictly greater than the serial in the `siteTimeserials` map for the same site. * If `siteTimeserials` map does not contain a serial for the same site, the operation should be applied. + * + * @spec RTLO4a */ protected _canApplyOperation(opSerial: string | undefined, opSiteCode: string | undefined): boolean { if (!opSerial) { - throw new this._client.ErrorInfo(`Invalid serial: ${opSerial}`, 92000, 500); + throw new this._client.ErrorInfo(`Invalid serial: ${opSerial}`, 92000, 500); // RTLO4a3 } if (!opSiteCode) { - throw new this._client.ErrorInfo(`Invalid site code: ${opSiteCode}`, 92000, 500); + throw new this._client.ErrorInfo(`Invalid site code: ${opSiteCode}`, 92000, 500); // RTLO4a3 } - const siteSerial = this._siteTimeserials[opSiteCode]; - return !siteSerial || opSerial > siteSerial; + const siteSerial = this._siteTimeserials[opSiteCode]; // RTLO4a4 + return !siteSerial || opSerial > siteSerial; // RTLO4a5, RTLO4a6 } + /** @spec RTLO5 */ protected _applyObjectDelete(objectMessage: ObjectMessage): TUpdate { - return this.tombstone(objectMessage); + return this.tombstone(objectMessage); // RTLO5b } /** diff --git a/src/plugins/objects/objects.ts b/src/plugins/objects/objects.ts index 9c29bb340..d96d7a7bf 100644 --- a/src/plugins/objects/objects.ts +++ b/src/plugins/objects/objects.ts @@ -51,7 +51,7 @@ export class Objects { private _syncObjectsDataPool: SyncObjectsDataPool; private _currentSyncId: string | undefined; private _currentSyncCursor: string | undefined; - private _bufferedObjectOperations: ObjectMessage[]; + private _bufferedObjectOperations: ObjectMessage[]; // RTO7a // Used by tests static _DEFAULTS = DEFAULTS; @@ -64,12 +64,12 @@ export class Objects { this._eventEmitterPublic = new this._client.EventEmitter(this._client.logger); this._objectsPool = new ObjectsPool(this); this._syncObjectsDataPool = new SyncObjectsDataPool(this); - this._bufferedObjectOperations = []; + this._bufferedObjectOperations = []; // RTO7a1 // use server-provided objectsGCGracePeriod if available, and subscribe to new connectionDetails that can be emitted as part of the RTN24 this.gcGracePeriod = - this._channel.connectionManager.connectionDetails?.objectsGCGracePeriod ?? DEFAULTS.gcGracePeriod; + this._channel.connectionManager.connectionDetails?.objectsGCGracePeriod ?? DEFAULTS.gcGracePeriod; // RTO10b1 this._channel.connectionManager.on('connectiondetails', (details: Record) => { - this.gcGracePeriod = details.objectsGCGracePeriod ?? DEFAULTS.gcGracePeriod; + this.gcGracePeriod = details.objectsGCGracePeriod ?? DEFAULTS.gcGracePeriod; // RTO10b2 }); } @@ -227,7 +227,7 @@ export class Objects { const newSyncSequence = this._currentSyncId !== syncId; if (newSyncSequence) { // RTO5a2 - new sync sequence started - this._startNewSync(syncId, syncCursor); // RTO5a2a + this._startNewSync(syncId, syncCursor); } // RTO5a3 - continue current sync sequence @@ -243,18 +243,19 @@ export class Objects { /** * @internal + * @spec RTO8 */ handleObjectMessages(objectMessages: ObjectMessage[]): void { if (this._state !== ObjectsState.synced) { - // The client receives object messages in realtime over the channel concurrently with the sync sequence. + // RTO7 - The client receives object messages in realtime over the channel concurrently with the sync sequence. // Some of the incoming object messages may have already been applied to the objects described in // the sync sequence, but others may not; therefore we must buffer these messages so that we can apply // them to the objects once the sync is complete. - this._bufferedObjectOperations.push(...objectMessages); + this._bufferedObjectOperations.push(...objectMessages); // RTO8a return; } - this._applyObjectMessages(objectMessages); + this._applyObjectMessages(objectMessages); // RTO8b } /** @@ -283,6 +284,7 @@ export class Objects { // reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes. this._objectsPool.resetToInitialPool(true); // RTO4b1, RTO4b2 this._syncObjectsDataPool.clear(); // RTO4b3 + this._bufferedObjectOperations = []; // RTO4b5 // defer the state change event until the next tick if we started a new sequence just now due to being in initialized state. // this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop. this._endSync(fromInitializedState); // RTO4b4 @@ -346,8 +348,8 @@ export class Objects { private _startNewSync(syncId?: string, syncCursor?: string): void { // need to discard all buffered object operation messages on new sync start - this._bufferedObjectOperations = []; - this._syncObjectsDataPool.clear(); + this._bufferedObjectOperations = []; // RTO5a2b + this._syncObjectsDataPool.clear(); // RTO5a2a this._currentSyncId = syncId; this._currentSyncCursor = syncCursor; this._stateChange(ObjectsState.syncing, false); @@ -358,9 +360,9 @@ export class Objects { this._applySync(); // should apply buffered object operations after we applied the sync. // can use regular object messages application logic - this._applyObjectMessages(this._bufferedObjectOperations); + this._applyObjectMessages(this._bufferedObjectOperations); // RTO5c6 - this._bufferedObjectOperations = []; + this._bufferedObjectOperations = []; // RTO5c5 this._syncObjectsDataPool.clear(); // RTO5c4 this._currentSyncId = undefined; // RTO5c3 this._currentSyncCursor = undefined; // RTO5c3 @@ -435,9 +437,12 @@ export class Objects { existingObjectUpdates.forEach(({ object, update }) => object.notifyUpdated(update)); } + /** @spec RTO9 */ private _applyObjectMessages(objectMessages: ObjectMessage[]): void { + // RTO9a for (const objectMessage of objectMessages) { if (!objectMessage.operation) { + // RTO9a1 this._client.Logger.logAction( this._client.logger, this._client.Logger.LOG_MAJOR, @@ -447,8 +452,7 @@ export class Objects { continue; } - const objectOperation = objectMessage.operation; - + const objectOperation = objectMessage.operation; // RTO9a2 switch (objectOperation.action) { case ObjectOperationAction.MAP_CREATE: case ObjectOperationAction.COUNTER_CREATE: @@ -456,17 +460,19 @@ export class Objects { case ObjectOperationAction.MAP_REMOVE: case ObjectOperationAction.COUNTER_INC: case ObjectOperationAction.OBJECT_DELETE: + // RTO9a2a // we can receive an op for an object id we don't have yet in the pool. instead of buffering such operations, // we can create a zero-value object for the provided object id and apply the operation to that zero-value object. // this also means that all objects are capable of applying the corresponding *_CREATE ops on themselves, // since they need to be able to eventually initialize themselves from that *_CREATE op. // so to simplify operations handling, we always try to create a zero-value object in the pool first, // and then we can always apply the operation on the existing object in the pool. - this._objectsPool.createZeroValueObjectIfNotExists(objectOperation.objectId); - this._objectsPool.get(objectOperation.objectId)!.applyOperation(objectOperation, objectMessage); + this._objectsPool.createZeroValueObjectIfNotExists(objectOperation.objectId); // RTO9a2a1 + this._objectsPool.get(objectOperation.objectId)!.applyOperation(objectOperation, objectMessage); // RTO9a2a2, RTO9a2a3 break; default: + // RTO9a2b this._client.Logger.logAction( this._client.logger, this._client.Logger.LOG_MAJOR, diff --git a/src/plugins/objects/objectspool.ts b/src/plugins/objects/objectspool.ts index 740192481..30a0a429d 100644 --- a/src/plugins/objects/objectspool.ts +++ b/src/plugins/objects/objectspool.ts @@ -21,6 +21,7 @@ export class ObjectsPool { this._client = this._objects.getClient(); this._pool = this._createInitialPool(); this._gcInterval = setInterval(() => { + // RTO10 this._onGCInterval(); }, DEFAULTS.gcInterval); // call nodejs's Timeout.unref to not require Node.js event loop to remain active due to this interval. see https://nodejs.org/api/timers.html#timeoutunref @@ -103,18 +104,20 @@ export class ObjectsPool { return pool; } + /** @spec RTO10c */ private _onGCInterval(): void { const toDelete: string[] = []; for (const [objectId, obj] of this._pool.entries()) { + // RTO10c1 // tombstoned objects should be removed from the pool if they have been tombstoned for longer than grace period. // by removing them from the local pool, Objects plugin no longer keeps a reference to those objects, allowing JS's // Garbage Collection to eventually free the memory for those objects, provided the user no longer references them either. if (obj.isTombstoned() && Date.now() - obj.tombstonedAt()! >= this._objects.gcGracePeriod) { - toDelete.push(objectId); + toDelete.push(objectId); // RTO10c1b continue; } - obj.onGCInterval(); + obj.onGCInterval(); // RTO10c1a } toDelete.forEach((x) => this._pool.delete(x));