Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions middleware/packages/plugin-api/src/routinesIntegration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ export interface RoutinesIntegration {
userId: string;
channel: string;
conversationRef: unknown;
/**
* Operator-addressable principal id used as the Conductor channel-binding key, so proactive
* reminders / approvals reach this user. For Teams this is the lowercased email/UPN — the id an
* operator enters as a role holder or human-step principal. Omitted ⇒ the binding falls back to
* `userId` (the channel-native id, e.g. AAD object id). Does NOT affect routine attribution.
*/
principalRef?: string;
/**
* Cold-start authorization (default `false`). When `true`, the
* `manage_routine` tool permits this user to create routines that
Expand Down
29 changes: 19 additions & 10 deletions middleware/src/conductor/channelBindingStore.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,41 @@
import type { Pool } from 'pg';

import { canonicalizePrincipalId } from './principalId.js';

/**
* Maps a user (in Conductor's identity space — the same id used as a role holder / await responder,
* e.g. session.sub / email / channel-native id) to the opaque channel conversation reference needed
* e.g. email / session.sub / channel-native id) to the opaque channel conversation reference needed
* to proactively reach them (US5 reminders). Populated per inbound turn from the kernel-side
* `captureRoutineTurn` hook; read by the await worker when sending a reminder.
*
* IDENTITY CONTRACT: the `userId` key here MUST be the same id a human step's principal resolves to
* (a `user:<id>` ref, or a role holder id). The capture hook writes the channel-native turn user id;
* delivery therefore resolves only when role/user principals are expressed in that same id space —
* otherwise the reminder is flagged `unreachable` (never silently dropped, never a hang).
* IDENTITY CONTRACT: the key is the operator-addressable `principalRef` the channel supplied (Teams:
* the user's email), else the channel-native `userId` (e.g. AAD object id). A human step's principal
* / role holder ids MUST be expressed in that same id space, else the reminder is flagged
* `unreachable` (never silently dropped, never a hang). Keys are canonicalized
* (`canonicalizePrincipalId` — trim + lowercase) on BOTH write and read here, so casing/whitespace
* differences between the channel-supplied key and an operator-typed holder id never cause a miss.
*/
export class ConductorChannelBindingStore {
constructor(private readonly pool: Pool) {}

/** Upsert a user's conversation reference for a channel (idempotent per inbound turn). */
async upsert(userId: string, channelType: string, conversationRef: unknown): Promise<void> {
if (!userId || !channelType) return;
const key = canonicalizePrincipalId(userId);
if (!key || !channelType) return;
await this.pool.query(
`INSERT INTO conductor_channel_bindings (user_id, channel_type, conversation_ref)
VALUES ($1, $2, $3::jsonb)
ON CONFLICT (user_id, channel_type)
DO UPDATE SET conversation_ref = EXCLUDED.conversation_ref, updated_at = now()`,
[userId, channelType, JSON.stringify(conversationRef ?? null)],
[key, channelType, JSON.stringify(conversationRef ?? null)],
);
}

/** The conversation reference to reach `userId` on `channelType`, or null if none is bound. */
async get(userId: string, channelType: string): Promise<unknown | null> {
const r = await this.pool.query<{ conversation_ref: unknown }>(
`SELECT conversation_ref FROM conductor_channel_bindings WHERE user_id = $1 AND channel_type = $2`,
[userId, channelType],
[canonicalizePrincipalId(userId), channelType],
);
return r.rows[0]?.conversation_ref ?? null;
}
Expand All @@ -39,12 +44,16 @@ export class ConductorChannelBindingStore {
async getMany(userIds: string[], channelType: string): Promise<Map<string, unknown>> {
const out = new Map<string, unknown>();
if (userIds.length === 0) return out;
// Canonical key → the ORIGINAL holder id the caller passed, so callers can `.get(holder)` with
// the id they already hold (e.g. a role-resolved holder) regardless of its casing.
const byCanonical = new Map<string, string>();
for (const id of userIds) byCanonical.set(canonicalizePrincipalId(id), id);
const r = await this.pool.query<{ user_id: string; conversation_ref: unknown }>(
`SELECT user_id, conversation_ref FROM conductor_channel_bindings
WHERE channel_type = $2 AND user_id = ANY($1::text[])`,
[userIds, channelType],
[[...byCanonical.keys()], channelType],
);
for (const row of r.rows) out.set(row.user_id, row.conversation_ref);
for (const row of r.rows) out.set(byCanonical.get(row.user_id) ?? row.user_id, row.conversation_ref);
return out;
}
}
23 changes: 23 additions & 0 deletions middleware/src/conductor/principalId.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Canonical id space for Conductor principals (US5 reminder/approval delivery).
//
// A reminder reaches a person only if the channel-binding key and the human-step principal /
// role-holder id compare EQUAL. Those ids enter from different sources (a channel plugin's
// `principalRef`, an operator-typed role holder, a `user:` principal) and the SQL match is
// case-sensitive, so every id that crosses that boundary must be canonicalized identically.
// Canonicalization lives at the store/role layer (not the call sites) so it can't be forgotten.
// Email/UPN ids are case-insensitive; AAD-object-id GUIDs are already lowercase — so trimming +
// lowercasing is safe and lossless for both.

export function canonicalizePrincipalId(id: string): string {
return id.trim().toLowerCase();
}

/**
* The Conductor channel-binding key for an inbound turn: the operator-addressable `principalRef`
* (e.g. a Teams user's email) when the channel supplied a non-empty one, else the channel-native
* `userId` (e.g. AAD object id). Uses `||` (not `??`) so a blank `principalRef` from a channel
* falls back to `userId` instead of writing an empty, never-matched binding key.
*/
export function bindingKeyForTurn(info: { userId: string; principalRef?: string }): string {
return info.principalRef || info.userId;
}
10 changes: 9 additions & 1 deletion middleware/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { createMemoryBackendRouter } from './routes/memoryBackend.js';
import { createChatRouter } from './routes/chat.js';
import { createOperatorAgentsRouter } from './routes/operatorAgents.js';
import { wireConductor } from './conductor/index.js';
import { bindingKeyForTurn } from './conductor/principalId.js';
import { createOperatorChannelsRouter } from './routes/operatorChannels.js';
import { createAgentBuilderRouter } from './routes/agentBuilder.js';
import { ScheduleWorker } from './scheduler/scheduleWorker.js';
Expand Down Expand Up @@ -1736,7 +1737,14 @@ async function main(): Promise<void> {
const bindings = serviceRegistry.get<{ upsert(u: string, c: string, r: unknown): Promise<void> }>(
'conductorChannelBindings',
);
if (bindings) void bindings.upsert(String(info.userId), String(info.channel), info.conversationRef).catch(() => undefined);
// Key the binding by the operator-addressable principalRef (Teams: the user's email) when the
// channel supplied one, so it matches a human-step principal / role holder; otherwise fall back
// to the channel-native userId (e.g. AAD object id). The store canonicalizes the key on write.
if (bindings) {
void bindings
.upsert(bindingKeyForTurn(info), String(info.channel), info.conversationRef)
.catch(() => undefined);
}
}),
);
console.log(
Expand Down
4 changes: 2 additions & 2 deletions middleware/src/plugins/routines/integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export function createRoutinesIntegration(
handle: RoutinesHandle,
/** Optional per-turn observer — the kernel uses it to persist a Conductor channel binding for
* reminders, without coupling routines to Conductor. Best-effort: failures must not break a turn. */
onTurnCaptured?: (info: { userId: string; channel: string; conversationRef: unknown }) => void,
onTurnCaptured?: (info: { userId: string; principalRef?: string; channel: string; conversationRef: unknown }) => void,
): RoutinesIntegration {
return {
captureRoutineTurn(info) {
Expand All @@ -38,7 +38,7 @@ export function createRoutinesIntegration(
canTargetOthers: info.canTargetOthers ?? false,
});
try {
onTurnCaptured?.({ userId: info.userId, channel: info.channel, conversationRef: info.conversationRef });
onTurnCaptured?.({ userId: info.userId, principalRef: info.principalRef, channel: info.channel, conversationRef: info.conversationRef });
} catch {
// never let a binding-capture error break the inbound turn
}
Expand Down
101 changes: 101 additions & 0 deletions middleware/test/routinesIntegrationBinding.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { describe, it } from 'node:test';
import { strict as assert } from 'node:assert';

import { createRoutinesIntegration } from '../src/plugins/routines/integration.js';
import type { RoutinesHandle } from '../src/plugins/routines/initRoutines.js';
import { bindingKeyForTurn, canonicalizePrincipalId } from '../src/conductor/principalId.js';
import { ConductorChannelBindingStore } from '../src/conductor/channelBindingStore.js';

// Conductor real-world P2a — the routines turn-capture seam forwards an optional `principalRef` to
// `onTurnCaptured` so the kernel can key the Conductor channel binding by an operator-addressable id
// (Teams: the user's email) instead of the channel-native id (AAD object id). This is what lets a
// reminder/approval addressed to `jane@co` reach Jane's Teams conversation.

type Captured = { userId: string; principalRef?: string; channel: string; conversationRef: unknown };

// captureRoutineTurn only touches the per-turn ALS + the onTurnCaptured observer — the handle's
// runner is never dereferenced on this path, so a bare stub suffices.
const stubHandle = {} as unknown as RoutinesHandle;

describe('createRoutinesIntegration onTurnCaptured principalRef (P2a)', () => {
it('forwards principalRef so the binding is keyed by the operator-addressable id', () => {
const seen: Captured[] = [];
const integ = createRoutinesIntegration(stubHandle, (info) => seen.push(info));
integ.captureRoutineTurn({
tenant: 't1',
userId: 'aad-object-id-123',
principalRef: 'jane@co',
channel: 'teams',
conversationRef: { conversation: { id: 'c1' } },
});
assert.equal(seen.length, 1);
assert.equal(seen[0]?.userId, 'aad-object-id-123');
assert.equal(seen[0]?.principalRef, 'jane@co'); // binding upserts by this (index.ts: principalRef ?? userId)
assert.equal(seen[0]?.channel, 'teams');
});

it('forwards undefined principalRef when the channel omits it (binding falls back to userId)', () => {
const seen: Captured[] = [];
const integ = createRoutinesIntegration(stubHandle, (info) => seen.push(info));
integ.captureRoutineTurn({
tenant: 't1',
userId: 'aad-object-id-123',
channel: 'teams',
conversationRef: {},
});
assert.equal(seen.length, 1);
assert.equal(seen[0]?.principalRef, undefined);
assert.equal(seen[0]?.userId, 'aad-object-id-123');
});

it('a throwing onTurnCaptured never breaks the inbound turn', () => {
const integ = createRoutinesIntegration(stubHandle, () => {
throw new Error('binding store down');
});
assert.doesNotThrow(() =>
integ.captureRoutineTurn({ tenant: 't1', userId: 'u', channel: 'teams', conversationRef: {} }),
);
});
});

describe('principalId helpers (P2a)', () => {
it('canonicalizePrincipalId trims + lowercases so casing never causes a miss', () => {
assert.equal(canonicalizePrincipalId(' Jane@Co.COM '), 'jane@co.com');
});

it('bindingKeyForTurn prefers a non-empty principalRef, falls back to userId (|| not ??)', () => {
assert.equal(bindingKeyForTurn({ userId: 'aad-1', principalRef: 'jane@co' }), 'jane@co');
assert.equal(bindingKeyForTurn({ userId: 'aad-1' }), 'aad-1');
assert.equal(bindingKeyForTurn({ userId: 'aad-1', principalRef: '' }), 'aad-1'); // blank ⇒ fallback, not an empty key
});
});

// Minimal pg.Pool stub recording the SQL params, so we can assert the store canonicalizes keys.
function fakePool(rows: Array<{ user_id: string; conversation_ref: unknown }> = []): {
pool: import('pg').Pool;
calls: Array<{ params: unknown[] }>;
} {
const calls: Array<{ params: unknown[] }> = [];
const pool = {
query: async (_sql: string, params: unknown[]) => {
calls.push({ params });
return { rows };
},
} as unknown as import('pg').Pool;
return { pool, calls };
}

describe('ConductorChannelBindingStore canonicalization (P2a)', () => {
it('upsert stores the key trimmed + lowercased', async () => {
const { pool, calls } = fakePool();
await new ConductorChannelBindingStore(pool).upsert(' Jane@Co.COM ', 'teams', { c: 1 });
assert.equal(calls[0]?.params[0], 'jane@co.com');
});

it('getMany matches case-insensitively and keys the result by the ORIGINAL holder id', async () => {
const { pool, calls } = fakePool([{ user_id: 'jane@co.com', conversation_ref: { ref: 1 } }]);
const refs = await new ConductorChannelBindingStore(pool).getMany(['Jane@Co.com'], 'teams');
assert.deepEqual(calls[0]?.params[0], ['jane@co.com']); // queried with the canonical key
assert.deepEqual(refs.get('Jane@Co.com'), { ref: 1 }); // caller looks up by the id it passed
});
});
Loading