From 8ef3e22f714df5ae6dc15948d163488340c76a93 Mon Sep 17 00:00:00 2001 From: Marcel Wege Date: Tue, 30 Jun 2026 11:54:43 +0200 Subject: [PATCH] feat(conductor): principalRef identity-bridge for channel-binding delivery (P2a) Conductor reminders/approvals are delivered by looking up a channel binding keyed by the human-step principal / role-holder id, but the binding was keyed by the channel-native id (Teams AAD object id) while operators address people by email, so delivery silently missed. This threads an operator-addressable `principalRef` through the routines turn-capture seam and keys the binding by it. - plugin-api: add optional `principalRef?` to captureRoutineTurn's info (additive contract; the only OSS caller is integration.ts, so no compiled plugin breaks). - routines/integration.ts: forward principalRef through onTurnCaptured. - index.ts: key the binding upsert via bindingKeyForTurn (principalRef || userId -- `||` so a blank principalRef falls back instead of writing an empty key). - principalId.ts: canonicalizePrincipalId (trim + lowercase) + bindingKeyForTurn. - channelBindingStore: canonicalize the key on BOTH write (upsert) and read (get/getMany) so casing/whitespace between the channel-supplied key and an operator-typed holder id can never cause a silent `unreachable`; getMany returns results keyed by the caller's original holder id. Inert until a channel sets principalRef (the Teams plugin half, P2b, follows once this merges). Reviewed (Claude + Forge APPROVE-WITH-CHANGES): folded the normalization mismatch (the make-or-break), the empty-string ??->|| gap, the stale identity-contract comment, and the test import + keying coverage. Build + lint + 54 conductor/routines tests green. --- .../plugin-api/src/routinesIntegration.ts | 7 ++ .../src/conductor/channelBindingStore.ts | 29 +++-- middleware/src/conductor/principalId.ts | 23 ++++ middleware/src/index.ts | 10 +- .../src/plugins/routines/integration.ts | 4 +- .../test/routinesIntegrationBinding.test.ts | 101 ++++++++++++++++++ 6 files changed, 161 insertions(+), 13 deletions(-) create mode 100644 middleware/src/conductor/principalId.ts create mode 100644 middleware/test/routinesIntegrationBinding.test.ts diff --git a/middleware/packages/plugin-api/src/routinesIntegration.ts b/middleware/packages/plugin-api/src/routinesIntegration.ts index 0c5619ef..7e0c7e17 100644 --- a/middleware/packages/plugin-api/src/routinesIntegration.ts +++ b/middleware/packages/plugin-api/src/routinesIntegration.ts @@ -52,6 +52,13 @@ export interface RoutinesIntegration { userId: string; channel: string; conversationRef: unknown; + /** + * Operator-addressable principal id used as the Conductor channel-binding key, so proactive + * reminders / approvals reach this user. For Teams this is the lowercased email/UPN — the id an + * operator enters as a role holder or human-step principal. Omitted ⇒ the binding falls back to + * `userId` (the channel-native id, e.g. AAD object id). Does NOT affect routine attribution. + */ + principalRef?: string; /** * Cold-start authorization (default `false`). When `true`, the * `manage_routine` tool permits this user to create routines that diff --git a/middleware/src/conductor/channelBindingStore.ts b/middleware/src/conductor/channelBindingStore.ts index 3a59ac39..ddabae37 100644 --- a/middleware/src/conductor/channelBindingStore.ts +++ b/middleware/src/conductor/channelBindingStore.ts @@ -1,28 +1,33 @@ import type { Pool } from 'pg'; +import { canonicalizePrincipalId } from './principalId.js'; + /** * Maps a user (in Conductor's identity space — the same id used as a role holder / await responder, - * e.g. session.sub / email / channel-native id) to the opaque channel conversation reference needed + * e.g. email / session.sub / channel-native id) to the opaque channel conversation reference needed * to proactively reach them (US5 reminders). Populated per inbound turn from the kernel-side * `captureRoutineTurn` hook; read by the await worker when sending a reminder. * - * IDENTITY CONTRACT: the `userId` key here MUST be the same id a human step's principal resolves to - * (a `user:` ref, or a role holder id). The capture hook writes the channel-native turn user id; - * delivery therefore resolves only when role/user principals are expressed in that same id space — - * otherwise the reminder is flagged `unreachable` (never silently dropped, never a hang). + * IDENTITY CONTRACT: the key is the operator-addressable `principalRef` the channel supplied (Teams: + * the user's email), else the channel-native `userId` (e.g. AAD object id). A human step's principal + * / role holder ids MUST be expressed in that same id space, else the reminder is flagged + * `unreachable` (never silently dropped, never a hang). Keys are canonicalized + * (`canonicalizePrincipalId` — trim + lowercase) on BOTH write and read here, so casing/whitespace + * differences between the channel-supplied key and an operator-typed holder id never cause a miss. */ export class ConductorChannelBindingStore { constructor(private readonly pool: Pool) {} /** Upsert a user's conversation reference for a channel (idempotent per inbound turn). */ async upsert(userId: string, channelType: string, conversationRef: unknown): Promise { - if (!userId || !channelType) return; + const key = canonicalizePrincipalId(userId); + if (!key || !channelType) return; await this.pool.query( `INSERT INTO conductor_channel_bindings (user_id, channel_type, conversation_ref) VALUES ($1, $2, $3::jsonb) ON CONFLICT (user_id, channel_type) DO UPDATE SET conversation_ref = EXCLUDED.conversation_ref, updated_at = now()`, - [userId, channelType, JSON.stringify(conversationRef ?? null)], + [key, channelType, JSON.stringify(conversationRef ?? null)], ); } @@ -30,7 +35,7 @@ export class ConductorChannelBindingStore { async get(userId: string, channelType: string): Promise { const r = await this.pool.query<{ conversation_ref: unknown }>( `SELECT conversation_ref FROM conductor_channel_bindings WHERE user_id = $1 AND channel_type = $2`, - [userId, channelType], + [canonicalizePrincipalId(userId), channelType], ); return r.rows[0]?.conversation_ref ?? null; } @@ -39,12 +44,16 @@ export class ConductorChannelBindingStore { async getMany(userIds: string[], channelType: string): Promise> { const out = new Map(); if (userIds.length === 0) return out; + // Canonical key → the ORIGINAL holder id the caller passed, so callers can `.get(holder)` with + // the id they already hold (e.g. a role-resolved holder) regardless of its casing. + const byCanonical = new Map(); + for (const id of userIds) byCanonical.set(canonicalizePrincipalId(id), id); const r = await this.pool.query<{ user_id: string; conversation_ref: unknown }>( `SELECT user_id, conversation_ref FROM conductor_channel_bindings WHERE channel_type = $2 AND user_id = ANY($1::text[])`, - [userIds, channelType], + [[...byCanonical.keys()], channelType], ); - for (const row of r.rows) out.set(row.user_id, row.conversation_ref); + for (const row of r.rows) out.set(byCanonical.get(row.user_id) ?? row.user_id, row.conversation_ref); return out; } } diff --git a/middleware/src/conductor/principalId.ts b/middleware/src/conductor/principalId.ts new file mode 100644 index 00000000..51f09ee4 --- /dev/null +++ b/middleware/src/conductor/principalId.ts @@ -0,0 +1,23 @@ +// Canonical id space for Conductor principals (US5 reminder/approval delivery). +// +// A reminder reaches a person only if the channel-binding key and the human-step principal / +// role-holder id compare EQUAL. Those ids enter from different sources (a channel plugin's +// `principalRef`, an operator-typed role holder, a `user:` principal) and the SQL match is +// case-sensitive, so every id that crosses that boundary must be canonicalized identically. +// Canonicalization lives at the store/role layer (not the call sites) so it can't be forgotten. +// Email/UPN ids are case-insensitive; AAD-object-id GUIDs are already lowercase — so trimming + +// lowercasing is safe and lossless for both. + +export function canonicalizePrincipalId(id: string): string { + return id.trim().toLowerCase(); +} + +/** + * The Conductor channel-binding key for an inbound turn: the operator-addressable `principalRef` + * (e.g. a Teams user's email) when the channel supplied a non-empty one, else the channel-native + * `userId` (e.g. AAD object id). Uses `||` (not `??`) so a blank `principalRef` from a channel + * falls back to `userId` instead of writing an empty, never-matched binding key. + */ +export function bindingKeyForTurn(info: { userId: string; principalRef?: string }): string { + return info.principalRef || info.userId; +} diff --git a/middleware/src/index.ts b/middleware/src/index.ts index 8f3e2add..2b8d8a56 100644 --- a/middleware/src/index.ts +++ b/middleware/src/index.ts @@ -25,6 +25,7 @@ import { createMemoryBackendRouter } from './routes/memoryBackend.js'; import { createChatRouter } from './routes/chat.js'; import { createOperatorAgentsRouter } from './routes/operatorAgents.js'; import { wireConductor } from './conductor/index.js'; +import { bindingKeyForTurn } from './conductor/principalId.js'; import { createOperatorChannelsRouter } from './routes/operatorChannels.js'; import { createAgentBuilderRouter } from './routes/agentBuilder.js'; import { ScheduleWorker } from './scheduler/scheduleWorker.js'; @@ -1736,7 +1737,14 @@ async function main(): Promise { const bindings = serviceRegistry.get<{ upsert(u: string, c: string, r: unknown): Promise }>( 'conductorChannelBindings', ); - if (bindings) void bindings.upsert(String(info.userId), String(info.channel), info.conversationRef).catch(() => undefined); + // Key the binding by the operator-addressable principalRef (Teams: the user's email) when the + // channel supplied one, so it matches a human-step principal / role holder; otherwise fall back + // to the channel-native userId (e.g. AAD object id). The store canonicalizes the key on write. + if (bindings) { + void bindings + .upsert(bindingKeyForTurn(info), String(info.channel), info.conversationRef) + .catch(() => undefined); + } }), ); console.log( diff --git a/middleware/src/plugins/routines/integration.ts b/middleware/src/plugins/routines/integration.ts index 192a355c..0742a5fb 100644 --- a/middleware/src/plugins/routines/integration.ts +++ b/middleware/src/plugins/routines/integration.ts @@ -26,7 +26,7 @@ export function createRoutinesIntegration( handle: RoutinesHandle, /** Optional per-turn observer — the kernel uses it to persist a Conductor channel binding for * reminders, without coupling routines to Conductor. Best-effort: failures must not break a turn. */ - onTurnCaptured?: (info: { userId: string; channel: string; conversationRef: unknown }) => void, + onTurnCaptured?: (info: { userId: string; principalRef?: string; channel: string; conversationRef: unknown }) => void, ): RoutinesIntegration { return { captureRoutineTurn(info) { @@ -38,7 +38,7 @@ export function createRoutinesIntegration( canTargetOthers: info.canTargetOthers ?? false, }); try { - onTurnCaptured?.({ userId: info.userId, channel: info.channel, conversationRef: info.conversationRef }); + onTurnCaptured?.({ userId: info.userId, principalRef: info.principalRef, channel: info.channel, conversationRef: info.conversationRef }); } catch { // never let a binding-capture error break the inbound turn } diff --git a/middleware/test/routinesIntegrationBinding.test.ts b/middleware/test/routinesIntegrationBinding.test.ts new file mode 100644 index 00000000..cd3f307d --- /dev/null +++ b/middleware/test/routinesIntegrationBinding.test.ts @@ -0,0 +1,101 @@ +import { describe, it } from 'node:test'; +import { strict as assert } from 'node:assert'; + +import { createRoutinesIntegration } from '../src/plugins/routines/integration.js'; +import type { RoutinesHandle } from '../src/plugins/routines/initRoutines.js'; +import { bindingKeyForTurn, canonicalizePrincipalId } from '../src/conductor/principalId.js'; +import { ConductorChannelBindingStore } from '../src/conductor/channelBindingStore.js'; + +// Conductor real-world P2a — the routines turn-capture seam forwards an optional `principalRef` to +// `onTurnCaptured` so the kernel can key the Conductor channel binding by an operator-addressable id +// (Teams: the user's email) instead of the channel-native id (AAD object id). This is what lets a +// reminder/approval addressed to `jane@co` reach Jane's Teams conversation. + +type Captured = { userId: string; principalRef?: string; channel: string; conversationRef: unknown }; + +// captureRoutineTurn only touches the per-turn ALS + the onTurnCaptured observer — the handle's +// runner is never dereferenced on this path, so a bare stub suffices. +const stubHandle = {} as unknown as RoutinesHandle; + +describe('createRoutinesIntegration onTurnCaptured principalRef (P2a)', () => { + it('forwards principalRef so the binding is keyed by the operator-addressable id', () => { + const seen: Captured[] = []; + const integ = createRoutinesIntegration(stubHandle, (info) => seen.push(info)); + integ.captureRoutineTurn({ + tenant: 't1', + userId: 'aad-object-id-123', + principalRef: 'jane@co', + channel: 'teams', + conversationRef: { conversation: { id: 'c1' } }, + }); + assert.equal(seen.length, 1); + assert.equal(seen[0]?.userId, 'aad-object-id-123'); + assert.equal(seen[0]?.principalRef, 'jane@co'); // binding upserts by this (index.ts: principalRef ?? userId) + assert.equal(seen[0]?.channel, 'teams'); + }); + + it('forwards undefined principalRef when the channel omits it (binding falls back to userId)', () => { + const seen: Captured[] = []; + const integ = createRoutinesIntegration(stubHandle, (info) => seen.push(info)); + integ.captureRoutineTurn({ + tenant: 't1', + userId: 'aad-object-id-123', + channel: 'teams', + conversationRef: {}, + }); + assert.equal(seen.length, 1); + assert.equal(seen[0]?.principalRef, undefined); + assert.equal(seen[0]?.userId, 'aad-object-id-123'); + }); + + it('a throwing onTurnCaptured never breaks the inbound turn', () => { + const integ = createRoutinesIntegration(stubHandle, () => { + throw new Error('binding store down'); + }); + assert.doesNotThrow(() => + integ.captureRoutineTurn({ tenant: 't1', userId: 'u', channel: 'teams', conversationRef: {} }), + ); + }); +}); + +describe('principalId helpers (P2a)', () => { + it('canonicalizePrincipalId trims + lowercases so casing never causes a miss', () => { + assert.equal(canonicalizePrincipalId(' Jane@Co.COM '), 'jane@co.com'); + }); + + it('bindingKeyForTurn prefers a non-empty principalRef, falls back to userId (|| not ??)', () => { + assert.equal(bindingKeyForTurn({ userId: 'aad-1', principalRef: 'jane@co' }), 'jane@co'); + assert.equal(bindingKeyForTurn({ userId: 'aad-1' }), 'aad-1'); + assert.equal(bindingKeyForTurn({ userId: 'aad-1', principalRef: '' }), 'aad-1'); // blank ⇒ fallback, not an empty key + }); +}); + +// Minimal pg.Pool stub recording the SQL params, so we can assert the store canonicalizes keys. +function fakePool(rows: Array<{ user_id: string; conversation_ref: unknown }> = []): { + pool: import('pg').Pool; + calls: Array<{ params: unknown[] }>; +} { + const calls: Array<{ params: unknown[] }> = []; + const pool = { + query: async (_sql: string, params: unknown[]) => { + calls.push({ params }); + return { rows }; + }, + } as unknown as import('pg').Pool; + return { pool, calls }; +} + +describe('ConductorChannelBindingStore canonicalization (P2a)', () => { + it('upsert stores the key trimmed + lowercased', async () => { + const { pool, calls } = fakePool(); + await new ConductorChannelBindingStore(pool).upsert(' Jane@Co.COM ', 'teams', { c: 1 }); + assert.equal(calls[0]?.params[0], 'jane@co.com'); + }); + + it('getMany matches case-insensitively and keys the result by the ORIGINAL holder id', async () => { + const { pool, calls } = fakePool([{ user_id: 'jane@co.com', conversation_ref: { ref: 1 } }]); + const refs = await new ConductorChannelBindingStore(pool).getMany(['Jane@Co.com'], 'teams'); + assert.deepEqual(calls[0]?.params[0], ['jane@co.com']); // queried with the canonical key + assert.deepEqual(refs.get('Jane@Co.com'), { ref: 1 }); // caller looks up by the id it passed + }); +});