From ebaed58964b01b8a21523a7b48a3a51ba34a07a5 Mon Sep 17 00:00:00 2001 From: Victor Hahn Date: Tue, 9 Jun 2026 11:49:23 +0200 Subject: [PATCH] Identity: add subscription getter subscribe mode --- src/cci/identity/identity.definitions.ts | 18 ++++ src/cci/identity/identity.ts | 57 ++++++++--- test/cci/identity/identity.base.test.ts | 30 ++++++ .../identity.cubeInfoGenerators.test.ts | 94 +++++++++++++++++++ 4 files changed, 185 insertions(+), 14 deletions(-) diff --git a/src/cci/identity/identity.definitions.ts b/src/cci/identity/identity.definitions.ts index e50acf56..80da4a9c 100644 --- a/src/cci/identity/identity.definitions.ts +++ b/src/cci/identity/identity.definitions.ts @@ -210,6 +210,24 @@ export type GetPostsGenerator = MergedAsyncGenerator & { export type RelResolvingGetPostsGenerator = GetPostsGenerator>; export type RecursiveRelResolvingGetPostsGenerator = GetPostsGenerator>; +export interface GetPublicSubscriptionsOptions { + /** + * If true, the generator will not exit when all existing subscriptions have + * been yielded. Instead, it will keep running indefinitely, yielding values + * as new subscriptions become available. + * TODO provide a way to terminate the generator + */ + subscribe?: boolean; +} + +export type GetPublicSubscriptionsGenerator = MergedAsyncGenerator & { + /** + * A Promise that will resolve once all existing subscriptions have been + * yielded. This mirrors GetPostsGenerator.existingYielded for subscribe mode. + */ + existingYielded?: Promise; +}; + export interface GetRecursiveEmitterOptions { depth?: number, event?: string, diff --git a/src/cci/identity/identity.ts b/src/cci/identity/identity.ts index 454bb0ab..dfdb3de7 100644 --- a/src/cci/identity/identity.ts +++ b/src/cci/identity/identity.ts @@ -31,7 +31,7 @@ import { Veritum } from '../veritum/veritum'; import { GetVeritumOptions, VeritumRetrievalInterface } from '../veritum/veritumRetriever'; import { resolveRels, resolveRelsRecursive, ResolveRelsRecursiveResult, ResolveRelsResult } from '../veritum/veritumRetrievalUtil'; -import { DEFAULT_IDMUC_APPLICATION_STRING, DEFAULT_IDMUC_CONTEXT_STRING, DEFAULT_IDMUC_ENCRYPTION_CONTEXT_STRING, DEFAULT_IDMUC_ENCRYPTION_KEY_INDEX, DEFAULT_MIN_MUC_REBUILD_DELAY, DEFAULT_SUBSCRIPTION_RECURSION_DEPTH, GetPostsGenerator, GetPostsOptions, GetRecursiveEmitterOptions, IdentityEvents, IdentityLoadOptions, IdentityOptions, IDMUC_MASTERINDEX, PostFormatEventMap, PostInfo, RecursiveRelResolvingGetPostsGenerator, RelResolvingGetPostsGenerator } from './identity.definitions'; +import { DEFAULT_IDMUC_APPLICATION_STRING, DEFAULT_IDMUC_CONTEXT_STRING, DEFAULT_IDMUC_ENCRYPTION_CONTEXT_STRING, DEFAULT_IDMUC_ENCRYPTION_KEY_INDEX, DEFAULT_MIN_MUC_REBUILD_DELAY, DEFAULT_SUBSCRIPTION_RECURSION_DEPTH, GetPostsGenerator, GetPostsOptions, GetPublicSubscriptionsGenerator, GetPublicSubscriptionsOptions, GetRecursiveEmitterOptions, IdentityEvents, IdentityLoadOptions, IdentityOptions, IDMUC_MASTERINDEX, PostFormatEventMap, PostInfo, RecursiveRelResolvingGetPostsGenerator, RelResolvingGetPostsGenerator } from './identity.definitions'; import { deriveIdentityMasterKey, deriveIdentityRootCubeKeypair, validateIdentityRoot } from './identityHelpers'; import { IdentityPersistence } from './identityPersistence'; import { AvatarScheme, Avatar, DEFAULT_AVATARSCHEME } from './avatar'; @@ -569,8 +569,9 @@ export class Identity extends EventEmitter implements CubeEmitte removePublicSubscription(remoteIdentity: CubeKey | string | Identity) { const key: string = Identity.KeyStringOf(remoteIdentity); if (key) { - this._publicSubscriptions.delete(key); - this.emit("subscriptionRemoved", key); + if (this._publicSubscriptions.delete(key)) { + this.emit("subscriptionRemoved", key); + } } else { logger.warn("Identity: Ignoring unsubscription request to something that does not at all look like a CubeKey"); } @@ -796,21 +797,49 @@ export class Identity extends EventEmitter implements CubeEmitte return this._publicSubscriptions.has(Identity.KeyStringOf(other)); } - /** @yields CubeInfo objects for all of this user's public subscriptions. */ - async *getPublicSubscriptionCubeInfos(): AsyncGenerator { - const promises: Promise[] = []; - for (const sub of this.getPublicSubscriptionStrings()) { - promises.push(this.cubeRetriever.getCubeInfo(sub)); + private getPublicSubscriptions( + retrieve: (key: string) => Promise, + options: GetPublicSubscriptionsOptions = {}, + ): GetPublicSubscriptionsGenerator { + const existingGenerator = resolveAndYield( + Array.from(this.getPublicSubscriptionStrings(), retrieve), + ); + const generators: AsyncGenerator[] = [existingGenerator]; + + if (options.subscribe) { + const subscriptionKeyGenerator = eventsToGenerator<[string]>([{ + emitter: this, + event: 'subscriptionAdded', + }]); + + const subscriptionGenerator = (async function* (): AsyncGenerator { + for await (const key of subscriptionKeyGenerator) { + const retrieved = await retrieve(key); + if (retrieved !== undefined) yield retrieved; + } + })(); + generators.push(subscriptionGenerator); } - yield *resolveAndYield(promises); + + const ret: GetPublicSubscriptionsGenerator = mergeAsyncGenerators(...generators); + ret.existingYielded = ret.completions[0].then(); + return ret; + } + + /** @yields CubeInfo objects for all of this user's public subscriptions. */ + getPublicSubscriptionCubeInfos(options?: GetPublicSubscriptionsOptions): GetPublicSubscriptionsGenerator { + return this.getPublicSubscriptions( + (sub: string) => this.cubeRetriever.getCubeInfo(sub), + options, + ); } /** @yields Identity objects for all of this user's public subscriptions */ - async *getPublicSubscriptionIdentities(): AsyncGenerator { - for (const sub of this.getPublicSubscriptionStrings()) { - const retrieved: Identity = await this.identityStore.retrieveIdentity(sub); - if (retrieved !== undefined) yield retrieved; - } + getPublicSubscriptionIdentities(options?: GetPublicSubscriptionsOptions): GetPublicSubscriptionsGenerator { + return this.getPublicSubscriptions( + (sub: string) => this.identityStore.retrieveIdentity(sub), + options, + ); } // Alias to satisfy the RecursiveEmitterConstituent interface getSubemitters = this.getPublicSubscriptionIdentities; diff --git a/test/cci/identity/identity.base.test.ts b/test/cci/identity/identity.base.test.ts index f461d9eb..559c6e4e 100644 --- a/test/cci/identity/identity.base.test.ts +++ b/test/cci/identity/identity.base.test.ts @@ -153,6 +153,36 @@ describe('Identity: base model tests', () => { expect(id.hasPublicSubscription(subbedKey)).toBeTruthy(); }); + + it('emits subscriptionAdded only for newly added public subscriptions', async () => { + const id = new Identity( + undefined, Buffer.alloc(NetConstants.CUBE_KEY_SIZE, 41), idTestOptions); + const subbedKey: CubeKey = Buffer.alloc(NetConstants.CUBE_KEY_SIZE, 42) as CubeKey; + const emitted: string[] = []; + id.on('subscriptionAdded', (key: string) => emitted.push(key)); + + id.addPublicSubscription(subbedKey); + id.addPublicSubscription(subbedKey); + + expect(emitted).toEqual([subbedKey.toString('hex')]); + }); + + it('emits subscriptionRemoved only for actually removed public subscriptions', async () => { + const id = new Identity( + undefined, Buffer.alloc(NetConstants.CUBE_KEY_SIZE, 41), idTestOptions); + const subbedKey: CubeKey = Buffer.alloc(NetConstants.CUBE_KEY_SIZE, 42) as CubeKey; + const neverSubbedKey: CubeKey = Buffer.alloc(NetConstants.CUBE_KEY_SIZE, 43) as CubeKey; + const emitted: string[] = []; + id.on('subscriptionRemoved', (key: string) => emitted.push(key)); + + id.addPublicSubscription(subbedKey); + id.removePublicSubscription(neverSubbedKey); + id.removePublicSubscription(subbedKey); + id.removePublicSubscription(subbedKey); + + expect(emitted).toEqual([subbedKey.toString('hex')]); + }); + it('correctly identifies authors as subscribed or not subscribed', async () => { const subject: Identity = await Identity.Create( cubeStore, "subscriptor", "clavis mea", idTestOptions); diff --git a/test/cci/identity/identity.cubeInfoGenerators.test.ts b/test/cci/identity/identity.cubeInfoGenerators.test.ts index 2aef1706..6e03fc4a 100644 --- a/test/cci/identity/identity.cubeInfoGenerators.test.ts +++ b/test/cci/identity/identity.cubeInfoGenerators.test.ts @@ -15,6 +15,15 @@ import { testCubeStoreParams } from '../testcci.definitions'; import sodium from 'libsodium-wrappers-sumo' import { vi, describe, expect, it, test, beforeAll, beforeEach, afterAll, afterEach } from 'vitest'; +function timeout(ms = 1000): Promise { + return new Promise((_resolve, reject) => + setTimeout(() => reject(new Error('Generator did not yield in time')), ms)); +} + +function nextWithTimeout(gen: AsyncGenerator, ms = 1000): Promise> { + return Promise.race([gen.next(), timeout(ms)]); +} + describe('Identity: CubeInfo generators', () => { // This test suite handles Identity's impelementation of the CubeEmitter interface, // in particular the getAllCubeInfos() generator and related, more specialised @@ -183,6 +192,60 @@ describe('Identity: CubeInfo generators', () => { expect(subInfos[0].key).toEqual(sub2.getKeyIfAvailable()); expect(subInfos[0].getCube().getFirstField(FieldType.PAYLOAD).valueString).toBe("Subscriptio secunda"); }); + + + it('supports subscribe mode and yields new subscription CubeInfos as they arrive', async () => { + const sub1: Cube = Cube.Create({ + cubeType: CubeType.PIC, + requiredDifficulty: 0, + fields: VerityField.Payload("Subscriptio prima"), + }); + await cubeStore.addCube(sub1); + id.addPublicSubscription(sub1.getKeyIfAvailable()); + + const subInfos = id.getPublicSubscriptionCubeInfos({ subscribe: true }); + const first = await nextWithTimeout(subInfos); + expect(first.done).toBe(false); + expect(first.value.key).toEqual(sub1.getKeyIfAvailable()); + + const secondPromise = subInfos.next(); + await expect(subInfos.existingYielded).resolves.toBeUndefined(); + + const sub2: Cube = Cube.Create({ + cubeType: CubeType.PIC, + requiredDifficulty: 0, + fields: VerityField.Payload("Subscriptio secunda"), + }); + await cubeStore.addCube(sub2); + id.addPublicSubscription(sub2.getKeyIfAvailable()); + + const second = await Promise.race([secondPromise, timeout()]); + expect(second.done).toBe(false); + expect(second.value.key).toEqual(sub2.getKeyIfAvailable()); + expect(second.value.getCube().getFirstField(FieldType.PAYLOAD).valueString).toBe("Subscriptio secunda"); + + subInfos.cancel(); + }); + + it('keeps an initially empty subscription CubeInfo generator open in subscribe mode', async () => { + const subInfos = id.getPublicSubscriptionCubeInfos({ subscribe: true }); + const nextPromise = subInfos.next(); + await expect(subInfos.existingYielded).resolves.toBeUndefined(); + + const sub: Cube = Cube.Create({ + cubeType: CubeType.PIC, + requiredDifficulty: 0, + fields: VerityField.Payload("Subscriptio sera"), + }); + await cubeStore.addCube(sub); + id.addPublicSubscription(sub.getKeyIfAvailable()); + + const next = await Promise.race([nextPromise, timeout()]); + expect(next.done).toBe(false); + expect(next.value.key).toEqual(sub.getKeyIfAvailable()); + + subInfos.cancel(); + }); }); describe('getPublicSubscriptionIdentities()', () => { @@ -236,6 +299,37 @@ describe('Identity: CubeInfo generators', () => { expect(subs[0].key).toEqual(sub2.key); expect(subs[0].name).toBe("Subscriptio secunda"); }); + + + it('supports subscribe mode and yields new subscription Identities as they arrive', async () => { + const sub1MasterKey: Buffer = Buffer.alloc(sodium.crypto_sign_SEEDBYTES, 43); + const sub1: Identity = await Identity.Construct(cubeStore, sub1MasterKey, idTestOptions); + sub1.name = "Subscriptio prima"; + await sub1.store(); + id.addPublicSubscription(sub1.key); + + const subs = id.getPublicSubscriptionIdentities({ subscribe: true }); + const first = await nextWithTimeout(subs); + expect(first.done).toBe(false); + expect(first.value.key).toEqual(sub1.key); + expect(first.value.name).toBe("Subscriptio prima"); + + const secondPromise = subs.next(); + await expect(subs.existingYielded).resolves.toBeUndefined(); + + const sub2MasterKey: Buffer = Buffer.alloc(sodium.crypto_sign_SEEDBYTES, 44); + const sub2: Identity = await Identity.Construct(cubeStore, sub2MasterKey, idTestOptions); + sub2.name = "Subscriptio secunda"; + await sub2.store(); + id.addPublicSubscription(sub2.key); + + const second = await Promise.race([secondPromise, timeout()]); + expect(second.done).toBe(false); + expect(second.value.key).toEqual(sub2.key); + expect(second.value.name).toBe("Subscriptio secunda"); + + subs.cancel(); + }); }); describe('getAllCubeInfos()', () => {