diff --git a/package.json b/package.json index f485d2e5..e1b26688 100644 --- a/package.json +++ b/package.json @@ -27,6 +27,9 @@ "**/__tests__/**/*.test.ts", "**/tests/**/*.test.ts" ], + "transform": { + "^.+\\.tsx?$": ["ts-jest", { "tsconfig": "tsconfig.test.json" }] + }, "moduleFileExtensions": [ "ts", "js", @@ -59,6 +62,7 @@ "supertest": "^7.2.2", "ts-jest": "^29.4.9", "ts-node-dev": "^2.0.0", + "fast-check": "^3.22.0", "typedoc": "^0.28.19", "typescript": "^5.4.2" } diff --git a/prisma/schema.prisma b/prisma/schema.prisma index f1a2f1fe..abb3e5cc 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -38,6 +38,46 @@ model TokenTransfer { @@schema("wraith") } +// ─── Host-function invocation log ──────────────────────────────────────────── +// One row per contract event — includes token events and all other contracts. +// Allows downstream consumers to interpret events from arbitrary contracts. +model HostFnLog { + id Int @id @default(autoincrement()) + + // The contract that emitted the event (C...) + contractId String + + // topics[0] decoded as a symbol string (e.g. "transfer", "swap", "deposit") + functionName String + + // topics[1..n] serialised via scValToNative — BigInt values become strings + args Json + + // Event value serialised via scValToNative; null if the value is scvVoid + result Json? + + // Gas consumed by the invocation — populated externally when tx metadata is + // available; null otherwise + gasUsed BigInt? + + // Ledger sequence number where the event was emitted + ledger Int + + // UTC close time of the ledger + ledgerClosedAt DateTime + + // Transaction hash (SHA-256 hex, no 0x prefix) + txHash String + + // Stellar RPC paging token — unique per event, used for deduplication + eventId String @unique + + createdAt DateTime @default(now()) + + @@index([contractId]) + @@index([contractId, functionName]) + @@index([ledger]) + @@index([txHash]) // ─── NFT Transfers ──────────────────────────────────────────────────────────── model NftTransfer { id Int @id @default(autoincrement()) diff --git a/src/api.ts b/src/api.ts index cc3dea74..24c38372 100644 --- a/src/api.ts +++ b/src/api.ts @@ -1,6 +1,8 @@ import express, { Request, Response, NextFunction } from "express"; import cors from "cors"; import rateLimit from "express-rate-limit"; +import { queryTransfers, queryAllTransfers, queryByTxHash, querySummary, getLastIndexedLedger, prisma } from "./db"; +import { queryHostFnLogs } from "./indexer/host-fn-log"; import { queryTransfers, queryAllTransfers, queryByTxHash, querySummary, queryNftTransfers, getNftOwner, getNftMetadata, getLastIndexedLedger, prisma } from "./db"; import { getLatestLedger } from "./rpc"; import { getIndexerStats } from "./indexer"; @@ -553,6 +555,40 @@ export function createApp(): express.Application { } ); + // ── GET /host-fn/:contractId ───────────────────────────────────────────────── + /** + * Query raw host-function invocation logs for a contract. + * + * Every contract event indexed by Wraith is stored here — not just SEP-41 + * token events — so downstream consumers can interpret arbitrary contracts. + * + * Query params: + * functionName — filter by function name (e.g. "swap") + * limit — max rows (default 50, hard cap 200) + * offset — pagination offset (default 0) + */ + app.get( + "/host-fn/:contractId", + async (req: Request, res: Response, next: NextFunction) => { + try { + const { contractId } = req.params; + const functionName = req.query.functionName as string | undefined; + const limit = parseIntParam(req.query.limit, 50); + const offset = parseIntParam(req.query.offset, 0); + + const { total, logs } = await queryHostFnLogs({ + contractId, + functionName, + limit, + offset, + }); + + res.json({ + contractId, + total, + limit: Math.min(limit, 200), + offset, + logs, // ── GET /nfts/transfers ────────────────────────────────────────────────────── /** * Query CAP-46 NFT transfer events. @@ -638,6 +674,7 @@ export function createApp(): express.Application { } catch (err) { next(err); } + }, } ); diff --git a/src/indexer.ts b/src/indexer.ts index cf67eaf2..39f7f9e9 100644 --- a/src/indexer.ts +++ b/src/indexer.ts @@ -12,6 +12,8 @@ import { pruneOldTransfers, } from "./db"; import { emitTransfer } from "./events"; +import { parseHostFnEvent, upsertHostFnLogs, type HostFnRecord } from "./indexer/host-fn-log"; +import { pollParallel } from "./indexer/parallel"; import { isNftTransferEvent, parseNftEvents, fetchNftMetadata } from "./ingester/nft"; import { createSourceSwitcherWithConfig } from "./indexer/sources"; @@ -68,6 +70,10 @@ export function resolveSacContractIds(): string[] { } // ─── Config ─────────────────────────────────────────────────────────────────── +const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS ?? "6000", 10); +const BATCH_SIZE = parseInt(process.env.EVENTS_BATCH_SIZE ?? "10000", 10); +const INGEST_WORKERS = parseInt(process.env.INGEST_WORKERS ?? "1", 10); +const SAC_CONTRACT_IDS = resolveSacContractIds(); const POLL_INTERVAL_MS = parseInt(process.env.POLL_INTERVAL_MS ?? "6000", 10); const BATCH_SIZE = parseInt(process.env.EVENTS_BATCH_SIZE ?? "10000", 10); const SAC_CONTRACT_IDS = resolveSacContractIds(); @@ -129,6 +135,10 @@ async function pollOnce( return highestLedger; } + // Parse token transfer events + const records = parseEvents(events); + + // Persist token transfers // Split events by type: NFT (4 topics) vs fungible (3 topics) const fungibleEvents = events.filter((e) => !isNftTransferEvent(e)); const nftRawEvents = events.filter((e) => isNftTransferEvent(e)); @@ -150,6 +160,14 @@ async function pollOnce( records.forEach(emitTransfer); } + // Log every event as a raw host-fn invocation for downstream consumers (#84) + const hostFnRecords = events + .map(raw => { try { return parseHostFnEvent(raw); } catch { return null; } }) + .filter((r): r is HostFnRecord => r !== null); + if (hostFnRecords.length > 0) { + await upsertHostFnLogs(hostFnRecords).catch(err => + console.error("[indexer] host-fn log error:", err), + ); // ── NFT path ───────────────────────────────────────────────────────────────── const nftParsed = parseNftEvents(nftRawEvents); const nftRecords = nftParsed.map((p) => p.record); @@ -234,7 +252,20 @@ export async function startIndexer(): Promise { continue; } - currentLedger = await pollOnce(currentLedger, target); + if (INGEST_WORKERS > 1 && SAC_CONTRACT_IDS.length > 1) { + // Parallel path: shard contracts across N workers for higher throughput (#83) + const { totalInserted, highestLedger } = await pollParallel( + SAC_CONTRACT_IDS, + currentLedger, + target, + BATCH_SIZE, + INGEST_WORKERS, + ); + totalIndexed += totalInserted; + currentLedger = highestLedger; + } else { + currentLedger = await pollOnce(currentLedger, target); + } // Periodic data retention cleanup pollCycleCount++; diff --git a/src/indexer/host-fn-log.ts b/src/indexer/host-fn-log.ts new file mode 100644 index 00000000..a2e8f957 --- /dev/null +++ b/src/indexer/host-fn-log.ts @@ -0,0 +1,170 @@ +/** + * InvokeHostFn decoder and log store (#84). + * + * Every contract event flowing through the indexer is also recorded as a + * raw host-function invocation so downstream consumers can interpret events + * from arbitrary contracts — not just SAC token transfers. + * + * Storage layout: + * functionName — topics[0] decoded as a symbol string + * args — topics[1..n] serialised to JSON via scValToNative + * result — value field serialised to JSON via scValToNative + * gasUsed — nullable; populated externally if transaction metadata + * is available (requires a separate getTransaction call) + */ + +import * as StellarSdk from "@stellar/stellar-sdk"; +import { Prisma } from "@prisma/client"; +import { prisma } from "../db"; +import type { RawEvent } from "../rpc"; + +// ─── Types ──────────────────────────────────────────────────────────────────── + +export interface HostFnRecord { + contractId: string; + functionName: string; + args: unknown; + result: unknown; + gasUsed: bigint | null; + ledger: number; + ledgerClosedAt: Date; + txHash: string; + eventId: string; +} + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +/** + * Convert any value returned by scValToNative to something JSON.stringify can + * handle. BigInt values (i128/u128/i64/u64) are turned into decimal strings. + */ +function toJsonSafe(val: unknown): unknown { + if (typeof val === "bigint") return val.toString(); + if (Array.isArray(val)) return val.map(toJsonSafe); + if (val !== null && typeof val === "object") { + return Object.fromEntries( + Object.entries(val as Record).map(([k, v]) => [k, toJsonSafe(v)]), + ); + } + return val; +} + +function scValToSafeJson(scVal: StellarSdk.xdr.ScVal): unknown { + try { + return toJsonSafe(StellarSdk.scValToNative(scVal)); + } catch { + // Fall back to base64 XDR so no data is lost on unknown future ScVal types + return { xdr: scVal.toXDR("base64") }; + } +} + +// ─── Parser ─────────────────────────────────────────────────────────────────── + +/** + * Convert any raw contract event into a HostFnRecord. + * Returns null if the event cannot be decoded (e.g. empty topic list). + */ +export function parseHostFnEvent(raw: RawEvent): HostFnRecord | null { + const { topic, value, contractId, ledger, ledgerClosedAt, txHash, id: eventId } = raw; + + if (!topic || topic.length === 0) return null; + + let functionName: string; + try { + const native = StellarSdk.scValToNative(topic[0]); + if (typeof native !== "string") return null; + functionName = native; + } catch { + return null; + } + + const args = topic.slice(1).map(scValToSafeJson); + const result = scValToSafeJson(value); + + return { + contractId, + functionName, + args, + result, + gasUsed: null, + ledger, + ledgerClosedAt: new Date(ledgerClosedAt), + txHash, + eventId, + }; +} + +// ─── DB helpers ─────────────────────────────────────────────────────────────── + +/** + * Idempotently persist a batch of host-fn log records. + * Conflicts on `eventId` are silently ignored — safe to replay ledger ranges. + */ +export async function upsertHostFnLogs(records: HostFnRecord[]): Promise { + if (records.length === 0) return 0; + + const result = await prisma.hostFnLog.createMany({ + data: records.map(r => ({ + contractId: r.contractId, + functionName: r.functionName, + args: r.args as Prisma.InputJsonValue, + result: r.result != null ? (r.result as Prisma.InputJsonValue) : Prisma.JsonNull, + gasUsed: r.gasUsed, + ledger: r.ledger, + ledgerClosedAt: r.ledgerClosedAt, + txHash: r.txHash, + eventId: r.eventId, + })), + skipDuplicates: true, + }); + + return result.count; +} + +export type HostFnQueryParams = { + contractId: string; + functionName?: string; + limit?: number; + offset?: number; +}; + +/** + * Query host-function invocations for a given contract. + * Results are ordered newest-ledger-first. + */ +export async function queryHostFnLogs( + params: HostFnQueryParams, +): Promise<{ total: number; logs: HostFnRecord[] }> { + const { contractId, functionName, limit = 50, offset = 0 } = params; + const cap = Math.min(limit, 200); + + const where = { + contractId, + ...(functionName ? { functionName } : {}), + }; + + const [total, rows] = await prisma.$transaction([ + prisma.hostFnLog.count({ where }), + prisma.hostFnLog.findMany({ + where, + orderBy: [{ ledger: "desc" }, { id: "desc" }], + take: cap, + skip: offset, + }), + ]); + + return { + total, + logs: rows.map(r => ({ + contractId: r.contractId, + functionName: r.functionName, + args: r.args, + result: r.result, + gasUsed: r.gasUsed, + ledger: r.ledger, + ledgerClosedAt: r.ledgerClosedAt, + txHash: r.txHash, + eventId: r.eventId, + })), + }; +} diff --git a/src/indexer/parallel.ts b/src/indexer/parallel.ts new file mode 100644 index 00000000..e3a52a71 --- /dev/null +++ b/src/indexer/parallel.ts @@ -0,0 +1,106 @@ +/** + * Parallel partition ingest (#83). + * + * Shards the watched contract IDs by (sum-of-char-codes % N) so each worker + * owns a stable, non-overlapping subset of contracts. All workers issue their + * own RPC calls and DB writes concurrently via Promise.all, giving roughly N× + * throughput on multi-contract deployments. + * + * Ordering guarantee: events are ordered within each partition because every + * worker processes its own ledger range sequentially with the same fromLedger / + * toLedger window. Cross-partition ordering is not guaranteed and is not + * required by the data model (eventId is the canonical ordering key). + */ + +import { fetchEventsSafe } from "../rpc"; +import { parseEvents } from "../decoder"; +import { upsertTransfers, setLastIndexedLedger } from "../db"; +import { emitTransfer } from "../events"; + +export const DEFAULT_WORKERS = 4; + +/** + * Deterministically distribute contract IDs across N buckets. + * The same contractId always maps to the same bucket so ledger state is + * consistent within a partition across poll cycles. + */ +export function partitionByContract(contractIds: string[], n: number): string[][] { + const buckets: string[][] = Array.from({ length: n }, () => []); + for (const id of contractIds) { + let hash = 0; + for (let i = 0; i < id.length; i++) { + hash = (hash + id.charCodeAt(i)) | 0; // keep 32-bit integer + } + buckets[Math.abs(hash) % n].push(id); + } + return buckets.filter(b => b.length > 0); +} + +interface WorkerResult { + inserted: number; + highestLedger: number; +} + +async function runPartitionWorker( + partition: string[], + fromLedger: number, + toLedger: number, + batchSize: number, +): Promise { + const { events, highestLedger } = await fetchEventsSafe( + fromLedger, + toLedger, + partition, + batchSize, + ); + + if (events.length === 0) { + return { inserted: 0, highestLedger }; + } + + const records = parseEvents(events); + const inserted = await upsertTransfers(records); + + if (inserted > 0) { + records.forEach(emitTransfer); + } + + return { inserted, highestLedger }; +} + +/** + * Poll one ledger window across all contract partitions in parallel. + * + * @returns Total rows inserted and the highest ledger seen across all workers. + */ +export async function pollParallel( + contractIds: string[], + fromLedger: number, + toLedger: number, + batchSize: number, + workerCount: number = DEFAULT_WORKERS, +): Promise<{ totalInserted: number; highestLedger: number }> { + const partitions = partitionByContract(contractIds, Math.min(workerCount, contractIds.length || 1)); + + const results = await Promise.all( + partitions.map(partition => + runPartitionWorker(partition, fromLedger, toLedger, batchSize), + ), + ); + + const totalInserted = results.reduce((sum, r) => sum + r.inserted, 0); + const highestLedger = results.reduce( + (max, r) => Math.max(max, r.highestLedger), + fromLedger, + ); + + await setLastIndexedLedger(highestLedger); + + if (totalInserted > 0) { + console.log( + `[parallel] ${partitions.length} workers processed ${totalInserted} new records (ledger ${highestLedger})`, + ); + } + + return { totalInserted, highestLedger }; +} diff --git a/tests/decoder.property.test.ts b/tests/decoder.property.test.ts new file mode 100644 index 00000000..c3642cc4 --- /dev/null +++ b/tests/decoder.property.test.ts @@ -0,0 +1,157 @@ +import fc from "fast-check"; +import { xdr, nativeToScVal } from "@stellar/stellar-sdk"; +import { parseEvent } from "../src/decoder"; +import type { RawEvent } from "../src/rpc"; +import * as fixtures from "../src/__tests__/fixtures/events.json"; + +// i128 bounds +const I128_MAX = 170141183460469231731687303715884105727n; +const I128_MIN = -170141183460469231731687303715884105728n; + +// Edge-case amounts: zero, max-u128 bit pattern as signed i128 (-1), i128 extremes, i64 cast values +const EDGE_AMOUNTS: bigint[] = [ + 0n, + 1n, + -1n, // all bits set = max-u128 interpreted as signed i128 + I128_MAX, + I128_MIN, + 9223372036854775807n, // i64 max — negative i128 cast boundary + -9223372036854775808n, // i64 min — negative i128 cast + 1_000_000_000n, // 100 XLM in stroops +]; + +// Re-use pre-encoded addresses from the fixture to avoid keygen in the hot loop +const aliceScVal = xdr.ScVal.fromXDR(fixtures.transfer.topic[1], "base64"); +const bobScVal = xdr.ScVal.fromXDR(fixtures.transfer.topic[2], "base64"); + +const KNOWN_TYPES = ["transfer", "mint", "burn", "clawback"] as const; + +const baseEvent = { + ledger: 100, + ledgerClosedAt: "2024-01-01T00:00:00Z", + contractId: fixtures.contractId, + txHash: "prop_test_0000000000000000000000000000000000000000000000000000000001", + id: "0000000000000000100-00001", + type: "contract", +}; + +function makeRawEvent(topics: xdr.ScVal[], value: xdr.ScVal): RawEvent { + return { ...baseEvent, topic: topics, value }; +} + +// Produce topics appropriate for a given event type so that the decoder does not throw +// "wrong number of topics" — the property under test is about amount edge-cases, not topic +// structure validation. +function topicsFor(eventType: string): xdr.ScVal[] { + const sym = xdr.ScVal.scvSymbol(eventType); + if (eventType === "transfer" || eventType === "mint") { + return [sym, aliceScVal, bobScVal]; + } + if (eventType === "burn" || eventType === "clawback") { + return [sym, aliceScVal]; + } + return [sym]; +} + +// Arbitrary: i128 ScVal covering edge cases and the full signed range +const arbI128ScVal = fc + .oneof( + fc.constantFrom(...EDGE_AMOUNTS), + fc.bigInt({ min: I128_MIN, max: I128_MAX }), + ) + .map(n => nativeToScVal(n, { type: "i128" })); + +// Arbitrary: arbitrary string symbol ScVal (may or may not be a known event type) +const arbSymbolScVal = fc + .string({ minLength: 0, maxLength: 32 }) + .map(s => xdr.ScVal.scvSymbol(s)); + +describe("decoder property-based tests (#73)", () => { + it("never produces an unhandled exception for 10 000 arbitrary inputs", () => { + // Generate: known or unknown event-type symbol + matching topic count + i128 amount. + // The decoder must either return TransferRecord|null or throw an Error — never an + // unclassified crash (TypeError, RangeError from an unexpected code path, etc.). + const arbEventType = fc.oneof( + fc.constantFrom(...KNOWN_TYPES), + fc.string({ minLength: 1, maxLength: 16 }), + ); + + const arbRaw = fc + .tuple(arbEventType, arbI128ScVal) + .map(([eventType, amountScVal]) => makeRawEvent(topicsFor(eventType), amountScVal)); + + fc.assert( + fc.property(arbRaw, raw => { + try { + const result = parseEvent(raw); + // Must be null or a plain object (TransferRecord) + expect(result === null || (typeof result === "object" && result !== null)).toBe(true); + } catch (err) { + // Thrown values must be proper Error instances — not unhandled crashes + expect(err).toBeInstanceOf(Error); + } + }), + { numRuns: 10_000 }, + ); + }); + + it("accepts every edge-case i128 amount without throwing", () => { + for (const amount of EDGE_AMOUNTS) { + const amountScVal = nativeToScVal(amount, { type: "i128" }); + + for (const eventType of KNOWN_TYPES) { + const raw = makeRawEvent(topicsFor(eventType), amountScVal); + const result = parseEvent(raw); + expect(result).not.toBeNull(); + expect(typeof result!.amount).toBe("string"); + // Decoded value round-trips through BigInt + expect(BigInt(result!.amount)).toBe(amount); + } + } + }); + + it("returns null for arbitrary non-token symbol events without throwing", () => { + fc.assert( + fc.property( + arbSymbolScVal.filter(scv => { + try { + const native = (scv as xdr.ScVal).switch().name; + void native; + } catch { return false; } + return true; + }), + arbI128ScVal, + (typeTopic, amountScVal) => { + // Build a single-topic raw event (unknown event type path) + const raw = makeRawEvent([typeTopic], amountScVal); + try { + const result = parseEvent(raw); + // For truly unknown symbols parseEvent returns null + expect(result === null || typeof result === "object").toBe(true); + } catch (err) { + expect(err).toBeInstanceOf(Error); + } + }, + ), + { numRuns: 2_000 }, + ); + }); + + it("known fixtures still parse correctly", () => { + const cases: Array<{ fixture: { topic: string[]; value: string }; type: string }> = [ + { fixture: fixtures.transfer, type: "transfer" }, + { fixture: fixtures.mint, type: "mint" }, + { fixture: fixtures.burn, type: "burn" }, + { fixture: fixtures.clawback, type: "clawback" }, + ]; + + for (const { fixture, type } of cases) { + const raw = makeRawEvent( + fixture.topic.map((t: string) => xdr.ScVal.fromXDR(t, "base64")), + xdr.ScVal.fromXDR(fixture.value, "base64"), + ); + const result = parseEvent(raw); + expect(result?.eventType).toBe(type); + } + }); +}); diff --git a/tests/integration/e2e.test.ts b/tests/integration/e2e.test.ts new file mode 100644 index 00000000..77ed69d0 --- /dev/null +++ b/tests/integration/e2e.test.ts @@ -0,0 +1,104 @@ +/** + * End-to-end ingest → query test (#74). + * + * Requires a live DATABASE_URL (set via .env or CI docker-compose service). + * The suite is skipped automatically when DATABASE_URL is absent so the unit + * test suite continues to pass without a database. + * + * Flow: + * 1. Insert a pre-recorded transfer fixture directly via Prisma + * (simulating what the indexer writes after decoding an RPC event). + * 2. Query GET /transfers/incoming/:address via the Express app. + * 3. Assert the fixture row appears in the response. + * 4. Clean up the inserted row so the test is idempotent. + */ + +import request from "supertest"; +import { createApp } from "../../src/api"; +import { prisma } from "../../src/db"; +import fixture from "./fixtures/horizon-event.json"; + +const HAS_DB = !!process.env.DATABASE_URL; + +// Skip every test in this suite when no DB is available. +const describeE2E = HAS_DB ? describe : describe.skip; + +describeE2E("E2E: ingest → query (#74)", () => { + beforeAll(async () => { + // Ensure the row does not already exist from a prior interrupted run + await prisma.tokenTransfer.deleteMany({ + where: { eventId: fixture.eventId }, + }); + + // Insert fixture as if the indexer had processed a real horizon event + await prisma.tokenTransfer.create({ + data: { + contractId: fixture.contractId, + eventType: fixture.eventType, + fromAddress: fixture.fromAddress, + toAddress: fixture.toAddress, + amount: fixture.amount, + ledger: fixture.ledger, + ledgerClosedAt: new Date(fixture.ledgerClosedAt), + txHash: fixture.txHash, + eventId: fixture.eventId, + }, + }); + }); + + afterAll(async () => { + await prisma.tokenTransfer.deleteMany({ + where: { eventId: fixture.eventId }, + }); + await prisma.$disconnect(); + }); + + it("GET /transfers/incoming/:address returns the indexed row", async () => { + const app = createApp(); + const res = await request(app) + .get(`/transfers/incoming/${fixture.toAddress}`) + .query({ contractId: fixture.contractId }); + + expect(res.status).toBe(200); + expect(res.body.total).toBeGreaterThanOrEqual(1); + + const row = (res.body.transfers as Array>).find( + t => t["eventId"] === fixture.eventId, + ); + expect(row).toBeDefined(); + expect(row!["eventType"]).toBe("transfer"); + expect(row!["fromAddress"]).toBe(fixture.fromAddress); + expect(row!["toAddress"]).toBe(fixture.toAddress); + expect(row!["amount"]).toBe(fixture.amount); + expect(row!["contractId"]).toBe(fixture.contractId); + expect(row!["txHash"]).toBe(fixture.txHash); + }); + + it("GET /transfers/outgoing/:address returns the indexed row", async () => { + const app = createApp(); + const res = await request(app) + .get(`/transfers/outgoing/${fixture.fromAddress}`) + .query({ contractId: fixture.contractId }); + + expect(res.status).toBe(200); + + const row = (res.body.transfers as Array>).find( + t => t["eventId"] === fixture.eventId, + ); + expect(row).toBeDefined(); + expect(row!["fromAddress"]).toBe(fixture.fromAddress); + }); + + it("GET /transfers/tx/:txHash returns the indexed row", async () => { + const app = createApp(); + const res = await request(app).get(`/transfers/tx/${fixture.txHash}`); + + expect(res.status).toBe(200); + expect(Array.isArray(res.body.transfers)).toBe(true); + expect( + (res.body.transfers as Array>).some( + t => t["eventId"] === fixture.eventId, + ), + ).toBe(true); + }); +}); diff --git a/tests/integration/fixtures/horizon-event.json b/tests/integration/fixtures/horizon-event.json new file mode 100644 index 00000000..9343098f --- /dev/null +++ b/tests/integration/fixtures/horizon-event.json @@ -0,0 +1,12 @@ +{ + "description": "Pre-recorded SEP-41 transfer event fixture for e2e ingest→query tests", + "contractId": "CBC42KFZO33TYVFDOUXFRWXYYXHFGH7W5GM4IJQSXKGFINKL2XPP4XTE", + "eventType": "transfer", + "fromAddress": "GDWCO35QUYQLGO6P7OLW4BZWNMMGGUWNPLRVPLCBVG7YNVDZKUDIW4KN", + "toAddress": "GCXOO7OIJZ2HEOZODLOEISNVO6CBPK4PISRJCZYRFT37H7XGHDLB3C7O", + "amount": "1000000000", + "ledger": 9999001, + "ledgerClosedAt": "2024-06-01T12:00:00.000Z", + "txHash": "e2e0000000000000000000000000000000000000000000000000000000000001", + "eventId": "e2e-0000000009999001000-0000000001" +}