diff --git a/src/commands/channels/annotations/get.ts b/src/commands/channels/annotations/get.ts index 293cb9af8..7a3833570 100644 --- a/src/commands/channels/annotations/get.ts +++ b/src/commands/channels/annotations/get.ts @@ -26,7 +26,8 @@ export default class ChannelsAnnotationsGet extends AblyBaseCommand { }), }; - static override description = "Get annotations for a channel message"; + static override description = + "List individual annotation events for a channel message (paginated event stream, not the rolled-up summary)"; static override examples = [ '$ ably channels annotations get my-channel "01234567890:0"', diff --git a/src/commands/channels/get-message.ts b/src/commands/channels/get-message.ts new file mode 100644 index 000000000..ae9955e04 --- /dev/null +++ b/src/commands/channels/get-message.ts @@ -0,0 +1,134 @@ +import { Args, Flags } from "@oclif/core"; +import * as Ably from "ably"; + +import { AblyBaseCommand } from "../../base-command.js"; +import { productApiFlags } from "../../flags.js"; +import { + formatMessageTimestamp, + formatMessagesOutput, + formatResource, +} from "../../utils/output.js"; +import type { MessageDisplayFields } from "../../utils/output.js"; + +export default class ChannelsGetMessage extends AblyBaseCommand { + static override args = { + channelName: Args.string({ + description: "The channel name", + required: true, + }), + messageSerial: Args.string({ + description: "The serial of the message to retrieve", + required: true, + }), + }; + + static override description = + "Get the latest version of a message on an Ably channel"; + + static override examples = [ + '$ ably channels get-message my-channel "01234567890:0"', + '$ ably channels get-message my-channel "01234567890:0" --json', + '$ ably channels get-message my-channel "01234567890:0" --pretty-json', + '$ ably channels get-message my-channel "01234567890:0" --cipher YOUR_CIPHER_KEY', + ]; + + static override flags = { + ...productApiFlags, + cipher: Flags.string({ + description: + "Decryption key for encrypted messages (base64-encoded or hex-encoded, supports AES-128-CBC and AES-256-CBC)", + }), + }; + + async run(): Promise { + const { args, flags } = await this.parse(ChannelsGetMessage); + const channelName = args.channelName; + const serial = args.messageSerial; + + try { + const rest = await this.createAblyRestClient(flags); + if (!rest) return; + + const channelOptions: Ably.ChannelOptions = {}; + if (flags.cipher) { + channelOptions.cipher = { key: flags.cipher }; + } + + const channel = rest.channels.get(channelName, channelOptions); + + this.logProgress( + `Fetching message ${formatResource(serial)} on channel ${formatResource(channelName)}`, + flags, + ); + + const message = await channel.getMessage(serial); + + const tracePayload = { + id: message.id, + timestamp: formatMessageTimestamp(message.timestamp), + channel: channelName, + event: message.name || undefined, + clientId: message.clientId, + connectionId: message.connectionId, + data: message.data as unknown, + encoding: message.encoding, + extras: message.extras as unknown, + action: + message.action === undefined ? undefined : String(message.action), + serial: message.serial, + version: message.version, + annotations: message.annotations, + }; + this.logCliEvent( + flags, + "channelGetMessage", + "messageRetrieved", + `Retrieved message ${message.serial ?? serial} on channel ${channelName}`, + tracePayload, + ); + + if (this.shouldOutputJson(flags)) { + this.logJsonResult( + { + message: { + ...message, + // Stringify action for predictable JSON typing across commands + // (matches `channels subscribe`'s explicit normalisation). + action: + message.action === undefined + ? undefined + : String(message.action), + // Nullish-aware: a legitimate epoch-zero timestamp must not be + // dropped to undefined. + timestamp: + message.timestamp == null + ? undefined + : new Date(message.timestamp).toISOString(), + }, + }, + flags, + ); + } else { + const display: MessageDisplayFields = { + action: + message.action === undefined ? undefined : String(message.action), + channel: channelName, + clientId: message.clientId, + data: message.data, + event: message.name || undefined, + id: message.id, + serial: message.serial, + timestamp: message.timestamp ?? Date.now(), + version: message.version, + annotations: message.annotations, + }; + this.log(formatMessagesOutput([display])); + } + } catch (error) { + this.fail(error, flags, "channelGetMessage", { + channel: channelName, + serial, + }); + } + } +} diff --git a/src/utils/output.ts b/src/utils/output.ts index f825209fd..9d1a309ac 100644 --- a/src/utils/output.ts +++ b/src/utils/output.ts @@ -202,16 +202,19 @@ export function formatMessagesOutput(messages: MessageDisplayFields[]): string { } if (msg.annotations && Object.keys(msg.annotations.summary).length > 0) { - lines.push(`${formatLabel("Annotations")}`); + lines.push( + `${formatLabel("Annotations")}`, + ` ${formatLabel("Summary")}`, + ); for (const [annotationType, value] of Object.entries( msg.annotations.summary, )) { const formattedValue = formatMessageData(value) .split("\n") - .map((line) => ` ${line}`) + .map((line) => ` ${line}`) .join("\n"); - lines.push(` ${formatLabel(annotationType)}`, formattedValue); + lines.push(` ${formatLabel(annotationType)}`, formattedValue); } } diff --git a/test/e2e/channels/channel-message-ops-e2e.test.ts b/test/e2e/channels/channel-message-ops-e2e.test.ts index 09b0bcfb2..4ea565ba8 100644 --- a/test/e2e/channels/channel-message-ops-e2e.test.ts +++ b/test/e2e/channels/channel-message-ops-e2e.test.ts @@ -113,6 +113,164 @@ describe.skipIf(SHOULD_SKIP_E2E || SHOULD_SKIP_MUTABLE_TESTS)( }, ); + it( + "should retrieve a message via channels get-message", + { timeout: 60000 }, + async () => { + setupTestFailureHandler( + "should retrieve a message via channels get-message", + ); + + // Use a fresh channel/serial so we don't see updates from other tests + const getChannel = getMutableChannelName("msg-get"); + const serial = await publishAndGetSerial(getChannel, "fresh-message"); + + const result = await runCommand( + ["channels", "get-message", getChannel, serial, "--json"], + { + env: { ABLY_API_KEY: E2E_API_KEY || "" }, + timeoutMs: 30000, + }, + ); + + expect(result.exitCode).toBe(0); + + const records = parseNdjsonLines(result.stdout); + const parsed = records.find((r) => r.type === "result") ?? records[0]; + expect(parsed.success).toBe(true); + expect(parsed.message).toBeDefined(); + const message = parsed.message as Record; + expect(message.serial).toBe(serial); + expect(message.data).toBe("fresh-message"); + // Timestamp must be ISO 8601 (history-style normalisation) + expect(message.timestamp).toMatch( + /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$/, + ); + }, + ); + + it( + "should return the latest version after an update via channels get-message", + { timeout: 60000 }, + async () => { + setupTestFailureHandler( + "should return the latest version after an update via channels get-message", + ); + + // Publish, update, then verify get-message returns the updated payload + const updateChannel = getMutableChannelName("msg-get-after-update"); + const serial = await publishAndGetSerial(updateChannel, "original"); + + const updateResult = await runCommand( + [ + "channels", + "update", + updateChannel, + serial, + "edited-text", + "--json", + ], + { + env: { ABLY_API_KEY: E2E_API_KEY || "" }, + timeoutMs: 30000, + }, + ); + expect(updateResult.exitCode).toBe(0); + + // Retry get-message — update is eventually consistent + let latestMessage: Record | undefined; + for (let attempt = 0; attempt < 10; attempt++) { + const getResult = await runCommand( + ["channels", "get-message", updateChannel, serial, "--json"], + { + env: { ABLY_API_KEY: E2E_API_KEY || "" }, + timeoutMs: 30000, + }, + ); + if (getResult.exitCode === 0) { + const records = parseNdjsonLines(getResult.stdout); + const parsed = + records.find((r) => r.type === "result") ?? records[0]; + latestMessage = parsed.message as + | Record + | undefined; + if (latestMessage?.data === "edited-text") break; + } + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + + expect(latestMessage).toBeDefined(); + expect(latestMessage!.data).toBe("edited-text"); + // The action must reflect that this is an update, not the original create + expect(latestMessage!.action).toBe("message.update"); + // The version block must be populated and differ from the message serial + expect(latestMessage!.version).toBeDefined(); + const version = latestMessage!.version as Record; + expect(version.serial).toBeDefined(); + expect(version.serial).not.toBe(serial); + }, + ); + + it( + "should render human-readable output without --json", + { timeout: 60000 }, + async () => { + setupTestFailureHandler( + "should render human-readable output without --json", + ); + + const humanChannel = getMutableChannelName("msg-get-human"); + const serial = await publishAndGetSerial(humanChannel, "human-text"); + + const result = await runCommand( + ["channels", "get-message", humanChannel, serial], + { + env: { ABLY_API_KEY: E2E_API_KEY || "" }, + timeoutMs: 30000, + }, + ); + + expect(result.exitCode).toBe(0); + // Field labels rendered by formatMessagesOutput must appear + expect(result.stdout).toContain("Channel"); + expect(result.stdout).toContain("Serial"); + expect(result.stdout).toContain(serial); + expect(result.stdout).toContain("Data"); + expect(result.stdout).toContain("human-text"); + }, + ); + + it( + "should fail with a non-zero exit code for an unknown serial", + { timeout: 60000 }, + async () => { + setupTestFailureHandler( + "should fail with a non-zero exit code for an unknown serial", + ); + + const result = await runCommand( + [ + "channels", + "get-message", + channelName, + "0000000000-000@deadbeef:000", + "--json", + ], + { + env: { ABLY_API_KEY: E2E_API_KEY || "" }, + timeoutMs: 30000, + }, + ); + + expect(result.exitCode).not.toBe(0); + + const records = parseNdjsonLines(result.stdout); + const errorRecord = records.find((r) => r.type === "error"); + expect(errorRecord).toBeDefined(); + expect(errorRecord!.success).toBe(false); + }, + ); + it( "should delete a message via channels delete", { timeout: 60000 }, diff --git a/test/helpers/mock-ably-rest.ts b/test/helpers/mock-ably-rest.ts index 9759f969e..f3e7a0275 100644 --- a/test/helpers/mock-ably-rest.ts +++ b/test/helpers/mock-ably-rest.ts @@ -34,6 +34,7 @@ export interface MockRestChannel { publish: Mock; history: Mock; status: Mock; + getMessage: Mock; updateMessage: Mock; deleteMessage: Mock; appendMessage: Mock; @@ -142,6 +143,29 @@ function createMockRestChannel(name: string): MockRestChannel { name, publish: vi.fn().mockResolvedValue({ serials: ["mock-serial-001"] }), history: vi.fn().mockResolvedValue(createMockPaginatedResult([])), + getMessage: vi.fn().mockResolvedValue({ + id: "mock-message-id", + name: "mock-event", + data: { hello: "world" }, + encoding: "json", + extras: { headers: { foo: "bar" } }, + serial: "mock-serial-001", + timestamp: 1_700_000_000_000, + clientId: "mock-client", + connectionId: "mock-connection", + action: "message.update", + version: { + serial: "mock-serial-001@v2", + timestamp: 1_700_000_001_000, + clientId: "mock-editor", + description: "Fixed typo", + }, + annotations: { + summary: { + "reaction:distinct.v1": { unique: 3 }, + }, + }, + }), updateMessage: vi .fn() .mockResolvedValue({ versionSerial: "mock-version-serial-update" }), diff --git a/test/unit/commands/channels/get-message.test.ts b/test/unit/commands/channels/get-message.test.ts new file mode 100644 index 000000000..e5e039eef --- /dev/null +++ b/test/unit/commands/channels/get-message.test.ts @@ -0,0 +1,288 @@ +import { describe, it, expect, beforeEach } from "vitest"; +import { runCommand } from "@oclif/test"; +import { getMockAblyRest } from "../../../helpers/mock-ably-rest.js"; +import { parseNdjsonLines } from "../../../helpers/ndjson.js"; +import { + standardHelpTests, + standardArgValidationTests, + standardFlagTests, +} from "../../../helpers/standard-tests.js"; + +const COMMAND = "channels:get-message"; + +describe("channels:get-message command", () => { + beforeEach(() => { + getMockAblyRest(); + }); + + standardHelpTests(COMMAND, import.meta.url); + standardArgValidationTests(COMMAND, import.meta.url, { + requiredArgs: ["test-channel", "serial-001"], + }); + standardFlagTests(COMMAND, import.meta.url, ["--json", "--cipher"]); + + describe("functionality", () => { + it("calls channel.getMessage with the supplied serial", async () => { + const mock = getMockAblyRest(); + const channel = mock.channels._getChannel("test-channel"); + + await runCommand( + [COMMAND, "test-channel", "serial-001"], + import.meta.url, + ); + + expect(mock.channels.get).toHaveBeenCalledWith("test-channel", {}); + expect(channel.getMessage).toHaveBeenCalledExactlyOnceWith("serial-001"); + }); + + it("passes cipher.key to channels.get when --cipher is provided", async () => { + const mock = getMockAblyRest(); + + await runCommand( + [COMMAND, "test-channel", "serial-001", "--cipher", "my-secret-key"], + import.meta.url, + ); + + expect(mock.channels.get).toHaveBeenCalledWith("test-channel", { + cipher: { key: "my-secret-key" }, + }); + }); + }); + + describe("JSON output", () => { + it("emits a `result` envelope with type=result, command, and success=true", async () => { + const { stdout } = await runCommand( + [COMMAND, "test-channel", "serial-001", "--json"], + import.meta.url, + ); + + const records = parseNdjsonLines(stdout); + const result = records.find((r) => r.type === "result"); + + expect(result).toBeDefined(); + expect(result).toMatchObject({ + type: "result", + command: "channels:get-message", + success: true, + }); + }); + + it("nests the SDK message under the `message` domain key with every populated field", async () => { + const { stdout } = await runCommand( + [COMMAND, "test-channel", "serial-001", "--json"], + import.meta.url, + ); + + const records = parseNdjsonLines(stdout); + const result = records.find((r) => r.type === "result") as + | { message: Record } + | undefined; + + expect(result).toBeDefined(); + expect(result!.message).toMatchObject({ + id: "mock-message-id", + name: "mock-event", + data: { hello: "world" }, + encoding: "json", + extras: { headers: { foo: "bar" } }, + serial: "mock-serial-001", + clientId: "mock-client", + connectionId: "mock-connection", + action: "message.update", + }); + }); + + it("normalises `timestamp` to ISO 8601 in JSON", async () => { + const { stdout } = await runCommand( + [COMMAND, "test-channel", "serial-001", "--json"], + import.meta.url, + ); + + const records = parseNdjsonLines(stdout); + const result = records.find((r) => r.type === "result") as + | { message: { timestamp: string } } + | undefined; + + expect(result!.message.timestamp).toBe("2023-11-14T22:13:20.000Z"); + }); + + it("preserves a legitimate epoch-zero timestamp (does not drop to undefined)", async () => { + const mock = getMockAblyRest(); + const channel = mock.channels._getChannel("test-channel"); + channel.getMessage.mockResolvedValue({ + id: "epoch-id", + serial: "epoch-serial", + timestamp: 0, + action: "message.create", + data: "epoch-data", + }); + + const { stdout } = await runCommand( + [COMMAND, "test-channel", "epoch-serial", "--json"], + import.meta.url, + ); + + const records = parseNdjsonLines(stdout); + const result = records.find((r) => r.type === "result") as + | { message: { timestamp: string } } + | undefined; + + expect(result!.message.timestamp).toBe("1970-01-01T00:00:00.000Z"); + }); + + it("stringifies `action` in JSON for predictable typing", async () => { + const { stdout } = await runCommand( + [COMMAND, "test-channel", "serial-001", "--json"], + import.meta.url, + ); + + const records = parseNdjsonLines(stdout); + const result = records.find((r) => r.type === "result") as + | { message: { action: string } } + | undefined; + + expect(typeof result!.message.action).toBe("string"); + expect(result!.message.action).toBe("message.update"); + }); + + it("preserves nested `version` block verbatim", async () => { + const { stdout } = await runCommand( + [COMMAND, "test-channel", "serial-001", "--json"], + import.meta.url, + ); + + const records = parseNdjsonLines(stdout); + const result = records.find((r) => r.type === "result") as + | { message: { version: Record } } + | undefined; + + expect(result!.message.version).toEqual({ + serial: "mock-serial-001@v2", + timestamp: 1_700_000_001_000, + clientId: "mock-editor", + description: "Fixed typo", + }); + }); + + it("preserves nested `annotations.summary` verbatim", async () => { + const { stdout } = await runCommand( + [COMMAND, "test-channel", "serial-001", "--json"], + import.meta.url, + ); + + const records = parseNdjsonLines(stdout); + const result = records.find((r) => r.type === "result") as + | { message: { annotations: { summary: Record } } } + | undefined; + + expect(result!.message.annotations.summary).toEqual({ + "reaction:distinct.v1": { unique: 3 }, + }); + }); + + it("emits a trailing { type: 'status', status: 'completed' } line", async () => { + const { stdout } = await runCommand( + [COMMAND, "test-channel", "serial-001", "--json"], + import.meta.url, + ); + + const records = parseNdjsonLines(stdout); + const last = records.at(-1); + expect(last).toMatchObject({ type: "status", status: "completed" }); + }); + }); + + describe("human-readable output", () => { + it("renders header line `[timestamp]` only — no ordinal index for a single record", async () => { + const { stdout } = await runCommand( + [COMMAND, "test-channel", "serial-001"], + import.meta.url, + ); + + // The default mock timestamp 1_700_000_000_000 = 2023-11-14T22:13:20.000Z + const firstLine = stdout.split("\n")[0]; + expect(firstLine).toContain("2023-11-14T22:13:20.000Z"); + // No ordinal index like `[1]` should precede the timestamp on the header. + expect(firstLine).not.toMatch(/^\s*\[\d+]\s+\[/); + }); + + it("renders ID, Channel, Event, Action, Client ID, Serial, and Data labels", async () => { + const { stdout } = await runCommand( + [COMMAND, "test-channel", "serial-001"], + import.meta.url, + ); + + expect(stdout).toContain("ID: mock-message-id"); + expect(stdout).toContain("test-channel"); + expect(stdout).toContain("mock-event"); + expect(stdout).toContain("message.update"); + expect(stdout).toContain("mock-client"); + expect(stdout).toContain("Serial: mock-serial-001"); + expect(stdout).toContain("Data"); + expect(stdout).toContain("hello"); + }); + + it("renders nested Version block when version.serial differs from message.serial", async () => { + const { stdout } = await runCommand( + [COMMAND, "test-channel", "serial-001"], + import.meta.url, + ); + + expect(stdout).toContain("Version"); + expect(stdout).toContain("mock-serial-001@v2"); + expect(stdout).toContain("mock-editor"); + }); + + it("renders Annotations block when annotations.summary is populated", async () => { + const { stdout } = await runCommand( + [COMMAND, "test-channel", "serial-001"], + import.meta.url, + ); + + expect(stdout).toContain("Annotations"); + expect(stdout).toContain("Summary"); + expect(stdout).toContain("reaction:distinct.v1"); + expect(stdout.indexOf("Annotations")).toBeLessThan( + stdout.indexOf("Summary"), + ); + expect(stdout.indexOf("Summary")).toBeLessThan( + stdout.indexOf("reaction:distinct.v1"), + ); + }); + + it("renders message.delete action when retrieving a deleted message", async () => { + const mock = getMockAblyRest(); + const channel = mock.channels._getChannel("test-channel"); + channel.getMessage.mockResolvedValue({ + id: "deleted-id", + serial: "del-serial-001", + timestamp: 1_700_000_000_000, + action: "message.delete", + data: null, + }); + + const { stdout } = await runCommand( + [COMMAND, "test-channel", "del-serial-001"], + import.meta.url, + ); + + expect(stdout).toContain("message.delete"); + }); + }); + + describe("error handling", () => { + it("surfaces Ably errors via this.fail", async () => { + const mock = getMockAblyRest(); + const channel = mock.channels._getChannel("test-channel"); + channel.getMessage.mockRejectedValue(new Error("Message not found")); + + const { error } = await runCommand( + [COMMAND, "test-channel", "serial-001"], + import.meta.url, + ); + + expect(error).toBeDefined(); + expect(error?.message).toContain("Message not found"); + }); + }); +});