Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/cci/identity/identity.definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,24 @@ export type GetPostsGenerator<T> = MergedAsyncGenerator<T> & {
export type RelResolvingGetPostsGenerator<T> = GetPostsGenerator<RelResolvingPostInfo<T>>;
export type RecursiveRelResolvingGetPostsGenerator<T> = GetPostsGenerator<RecursiveRelResolvingPostInfo<T>>;

export interface GetPublicSubscriptionsOptions {
/**
* If true, the generator will not exit when all existing subscriptions have
* been yielded. Instead, it will keep running indefinitely, yielding values
* as new subscriptions become available.
* TODO provide a way to terminate the generator
*/
subscribe?: boolean;
}

export type GetPublicSubscriptionsGenerator<T> = MergedAsyncGenerator<T> & {
/**
* A Promise that will resolve once all existing subscriptions have been
* yielded. This mirrors GetPostsGenerator.existingYielded for subscribe mode.
*/
existingYielded?: Promise<void>;
};

export interface GetRecursiveEmitterOptions {
depth?: number,
event?: string,
Expand Down
57 changes: 43 additions & 14 deletions src/cci/identity/identity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import { Veritum } from '../veritum/veritum';
import { GetVeritumOptions, VeritumRetrievalInterface } from '../veritum/veritumRetriever';
import { resolveRels, resolveRelsRecursive, ResolveRelsRecursiveResult, ResolveRelsResult } from '../veritum/veritumRetrievalUtil';

import { DEFAULT_IDMUC_APPLICATION_STRING, DEFAULT_IDMUC_CONTEXT_STRING, DEFAULT_IDMUC_ENCRYPTION_CONTEXT_STRING, DEFAULT_IDMUC_ENCRYPTION_KEY_INDEX, DEFAULT_MIN_MUC_REBUILD_DELAY, DEFAULT_SUBSCRIPTION_RECURSION_DEPTH, GetPostsGenerator, GetPostsOptions, GetRecursiveEmitterOptions, IdentityEvents, IdentityLoadOptions, IdentityOptions, IDMUC_MASTERINDEX, PostFormatEventMap, PostInfo, RecursiveRelResolvingGetPostsGenerator, RelResolvingGetPostsGenerator } from './identity.definitions';
import { DEFAULT_IDMUC_APPLICATION_STRING, DEFAULT_IDMUC_CONTEXT_STRING, DEFAULT_IDMUC_ENCRYPTION_CONTEXT_STRING, DEFAULT_IDMUC_ENCRYPTION_KEY_INDEX, DEFAULT_MIN_MUC_REBUILD_DELAY, DEFAULT_SUBSCRIPTION_RECURSION_DEPTH, GetPostsGenerator, GetPostsOptions, GetPublicSubscriptionsGenerator, GetPublicSubscriptionsOptions, GetRecursiveEmitterOptions, IdentityEvents, IdentityLoadOptions, IdentityOptions, IDMUC_MASTERINDEX, PostFormatEventMap, PostInfo, RecursiveRelResolvingGetPostsGenerator, RelResolvingGetPostsGenerator } from './identity.definitions';
import { deriveIdentityMasterKey, deriveIdentityRootCubeKeypair, validateIdentityRoot } from './identityHelpers';
import { IdentityPersistence } from './identityPersistence';
import { AvatarScheme, Avatar, DEFAULT_AVATARSCHEME } from './avatar';
Expand Down Expand Up @@ -569,8 +569,9 @@ export class Identity extends EventEmitter<IdentityEvents> implements CubeEmitte
removePublicSubscription(remoteIdentity: CubeKey | string | Identity) {
const key: string = Identity.KeyStringOf(remoteIdentity);
if (key) {
this._publicSubscriptions.delete(key);
this.emit("subscriptionRemoved", key);
if (this._publicSubscriptions.delete(key)) {
this.emit("subscriptionRemoved", key);
}
} else {
logger.warn("Identity: Ignoring unsubscription request to something that does not at all look like a CubeKey");
}
Expand Down Expand Up @@ -796,21 +797,49 @@ export class Identity extends EventEmitter<IdentityEvents> implements CubeEmitte
return this._publicSubscriptions.has(Identity.KeyStringOf(other));
}

/** @yields CubeInfo objects for all of this user's public subscriptions. */
async *getPublicSubscriptionCubeInfos(): AsyncGenerator<CubeInfo, void, undefined> {
const promises: Promise<CubeInfo>[] = [];
for (const sub of this.getPublicSubscriptionStrings()) {
promises.push(this.cubeRetriever.getCubeInfo(sub));
private getPublicSubscriptions<T>(
retrieve: (key: string) => Promise<T | undefined>,
options: GetPublicSubscriptionsOptions = {},
): GetPublicSubscriptionsGenerator<T> {
const existingGenerator = resolveAndYield(
Array.from(this.getPublicSubscriptionStrings(), retrieve),
);
const generators: AsyncGenerator<T>[] = [existingGenerator];

if (options.subscribe) {
const subscriptionKeyGenerator = eventsToGenerator<[string]>([{
emitter: this,
event: 'subscriptionAdded',
}]);

const subscriptionGenerator = (async function* (): AsyncGenerator<T> {
for await (const key of subscriptionKeyGenerator) {
const retrieved = await retrieve(key);
if (retrieved !== undefined) yield retrieved;
}
})();
generators.push(subscriptionGenerator);
}
yield *resolveAndYield(promises);

const ret: GetPublicSubscriptionsGenerator<T> = mergeAsyncGenerators(...generators);
ret.existingYielded = ret.completions[0].then();
return ret;
}

/** @yields CubeInfo objects for all of this user's public subscriptions. */
getPublicSubscriptionCubeInfos(options?: GetPublicSubscriptionsOptions): GetPublicSubscriptionsGenerator<CubeInfo> {
return this.getPublicSubscriptions(
(sub: string) => this.cubeRetriever.getCubeInfo(sub),
options,
);
}

/** @yields Identity objects for all of this user's public subscriptions */
async *getPublicSubscriptionIdentities(): AsyncGenerator<Identity> {
for (const sub of this.getPublicSubscriptionStrings()) {
const retrieved: Identity = await this.identityStore.retrieveIdentity(sub);
if (retrieved !== undefined) yield retrieved;
}
getPublicSubscriptionIdentities(options?: GetPublicSubscriptionsOptions): GetPublicSubscriptionsGenerator<Identity> {
return this.getPublicSubscriptions(
(sub: string) => this.identityStore.retrieveIdentity(sub),
options,
);
}
// Alias to satisfy the RecursiveEmitterConstituent interface
getSubemitters = this.getPublicSubscriptionIdentities;
Expand Down
30 changes: 30 additions & 0 deletions test/cci/identity/identity.base.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,36 @@ describe('Identity: base model tests', () => {
expect(id.hasPublicSubscription(subbedKey)).toBeTruthy();
});


it('emits subscriptionAdded only for newly added public subscriptions', async () => {
const id = new Identity(
undefined, Buffer.alloc(NetConstants.CUBE_KEY_SIZE, 41), idTestOptions);
const subbedKey: CubeKey = Buffer.alloc(NetConstants.CUBE_KEY_SIZE, 42) as CubeKey;
const emitted: string[] = [];
id.on('subscriptionAdded', (key: string) => emitted.push(key));

id.addPublicSubscription(subbedKey);
id.addPublicSubscription(subbedKey);

expect(emitted).toEqual([subbedKey.toString('hex')]);
});

it('emits subscriptionRemoved only for actually removed public subscriptions', async () => {
const id = new Identity(
undefined, Buffer.alloc(NetConstants.CUBE_KEY_SIZE, 41), idTestOptions);
const subbedKey: CubeKey = Buffer.alloc(NetConstants.CUBE_KEY_SIZE, 42) as CubeKey;
const neverSubbedKey: CubeKey = Buffer.alloc(NetConstants.CUBE_KEY_SIZE, 43) as CubeKey;
const emitted: string[] = [];
id.on('subscriptionRemoved', (key: string) => emitted.push(key));

id.addPublicSubscription(subbedKey);
id.removePublicSubscription(neverSubbedKey);
id.removePublicSubscription(subbedKey);
id.removePublicSubscription(subbedKey);

expect(emitted).toEqual([subbedKey.toString('hex')]);
});

it('correctly identifies authors as subscribed or not subscribed', async () => {
const subject: Identity = await Identity.Create(
cubeStore, "subscriptor", "clavis mea", idTestOptions);
Expand Down
94 changes: 94 additions & 0 deletions test/cci/identity/identity.cubeInfoGenerators.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ import { testCubeStoreParams } from '../testcci.definitions';
import sodium from 'libsodium-wrappers-sumo'
import { vi, describe, expect, it, test, beforeAll, beforeEach, afterAll, afterEach } from 'vitest';

function timeout(ms = 1000): Promise<never> {
return new Promise((_resolve, reject) =>
setTimeout(() => reject(new Error('Generator did not yield in time')), ms));
}

function nextWithTimeout<T>(gen: AsyncGenerator<T>, ms = 1000): Promise<IteratorResult<T>> {
return Promise.race([gen.next(), timeout(ms)]);
}

describe('Identity: CubeInfo generators', () => {
// This test suite handles Identity's impelementation of the CubeEmitter interface,
// in particular the getAllCubeInfos() generator and related, more specialised
Expand Down Expand Up @@ -183,6 +192,60 @@ describe('Identity: CubeInfo generators', () => {
expect(subInfos[0].key).toEqual(sub2.getKeyIfAvailable());
expect(subInfos[0].getCube().getFirstField(FieldType.PAYLOAD).valueString).toBe("Subscriptio secunda");
});


it('supports subscribe mode and yields new subscription CubeInfos as they arrive', async () => {
const sub1: Cube = Cube.Create({
cubeType: CubeType.PIC,
requiredDifficulty: 0,
fields: VerityField.Payload("Subscriptio prima"),
});
await cubeStore.addCube(sub1);
id.addPublicSubscription(sub1.getKeyIfAvailable());

const subInfos = id.getPublicSubscriptionCubeInfos({ subscribe: true });
const first = await nextWithTimeout(subInfos);
expect(first.done).toBe(false);
expect(first.value.key).toEqual(sub1.getKeyIfAvailable());

const secondPromise = subInfos.next();
await expect(subInfos.existingYielded).resolves.toBeUndefined();

const sub2: Cube = Cube.Create({
cubeType: CubeType.PIC,
requiredDifficulty: 0,
fields: VerityField.Payload("Subscriptio secunda"),
});
await cubeStore.addCube(sub2);
id.addPublicSubscription(sub2.getKeyIfAvailable());

const second = await Promise.race([secondPromise, timeout()]);
expect(second.done).toBe(false);
expect(second.value.key).toEqual(sub2.getKeyIfAvailable());
expect(second.value.getCube().getFirstField(FieldType.PAYLOAD).valueString).toBe("Subscriptio secunda");

subInfos.cancel();
});

it('keeps an initially empty subscription CubeInfo generator open in subscribe mode', async () => {
const subInfos = id.getPublicSubscriptionCubeInfos({ subscribe: true });
const nextPromise = subInfos.next();
await expect(subInfos.existingYielded).resolves.toBeUndefined();

const sub: Cube = Cube.Create({
cubeType: CubeType.PIC,
requiredDifficulty: 0,
fields: VerityField.Payload("Subscriptio sera"),
});
await cubeStore.addCube(sub);
id.addPublicSubscription(sub.getKeyIfAvailable());

const next = await Promise.race([nextPromise, timeout()]);
expect(next.done).toBe(false);
expect(next.value.key).toEqual(sub.getKeyIfAvailable());

subInfos.cancel();
});
});

describe('getPublicSubscriptionIdentities()', () => {
Expand Down Expand Up @@ -236,6 +299,37 @@ describe('Identity: CubeInfo generators', () => {
expect(subs[0].key).toEqual(sub2.key);
expect(subs[0].name).toBe("Subscriptio secunda");
});


it('supports subscribe mode and yields new subscription Identities as they arrive', async () => {
const sub1MasterKey: Buffer = Buffer.alloc(sodium.crypto_sign_SEEDBYTES, 43);
const sub1: Identity = await Identity.Construct(cubeStore, sub1MasterKey, idTestOptions);
sub1.name = "Subscriptio prima";
await sub1.store();
id.addPublicSubscription(sub1.key);

const subs = id.getPublicSubscriptionIdentities({ subscribe: true });
const first = await nextWithTimeout(subs);
expect(first.done).toBe(false);
expect(first.value.key).toEqual(sub1.key);
expect(first.value.name).toBe("Subscriptio prima");

const secondPromise = subs.next();
await expect(subs.existingYielded).resolves.toBeUndefined();

const sub2MasterKey: Buffer = Buffer.alloc(sodium.crypto_sign_SEEDBYTES, 44);
const sub2: Identity = await Identity.Construct(cubeStore, sub2MasterKey, idTestOptions);
sub2.name = "Subscriptio secunda";
await sub2.store();
id.addPublicSubscription(sub2.key);

const second = await Promise.race([secondPromise, timeout()]);
expect(second.done).toBe(false);
expect(second.value.key).toEqual(sub2.key);
expect(second.value.name).toBe("Subscriptio secunda");

subs.cancel();
});
});

describe('getAllCubeInfos()', () => {
Expand Down
Loading