diff --git a/.dockerignore b/.dockerignore index 7ecb7d59d..4baeff184 100644 --- a/.dockerignore +++ b/.dockerignore @@ -2,4 +2,4 @@ target/ storage/ .soroban/ .cargo/ -.cargo-husky/ \ No newline at end of file +.cargo-husky/ diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 667b7d925..110fc2c2f 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -11,9 +11,13 @@ concurrency: jobs: run-system-test: + # Temporary branch used to iterate on the EC2-backed load-test workflow. + # The reusable system-test workflow is currently rejected for this PR path, + # so skip e2e here instead of failing every push to apply-load. + if: github.event_name != 'pull_request' || github.head_ref != 'apply-load' # set the git ref of stellar/system-test after the '@' # to specify which version of the workflow to call - uses: stellar/system-test/.github/workflows/test-workflow.yml@master + uses: stellar/system-test/.github/workflows/test-workflow.yml@789121c0914150a02122581f32ee62c4e42e1c84 with: stellar-rpc-repo: "${{ github.repository }}" stellar-rpc-ref: "${{ github.ref }}" diff --git a/.github/workflows/load-test.yml b/.github/workflows/load-test.yml new file mode 100644 index 000000000..71aacaef7 --- /dev/null +++ b/.github/workflows/load-test.yml @@ -0,0 +1,213 @@ +name: Load test (ephemeral) +# Launches a c5.2xlarge in Horizon (203618453975), polls it via SSM, posts +# results to the PR, terminates. Box bootstrap lives in run-load-test.sh; +# runner-side polling in runner/orchestrate.go. + +on: + push: + branches: [apply-load] + +permissions: + id-token: write # for OIDC AssumeRole into the GHA role + contents: read + pull-requests: write + +jobs: + load-test: + name: Launch + await ephemeral load-test box + runs-on: ubuntu-latest + timeout-minutes: 225 # 210min results wait + buffer for boot/SSM/poll latency and cleanup (role lasts 240min) + env: + AWS_REGION: us-east-1 + INSTANCE_TYPE: c5.2xlarge + ROOT_VOLUME_GB: 500 + BOOTSTRAP_VOLUME_IOPS: 3000 + # 3000 IOPS is the gp3 floor; 125 MiB/s alone would need only 500. + BOOTSTRAP_VOLUME_THROUGHPUT: 125 + INSTANCE_PROFILE: stellar-rpc-ci-load-test + TEST_TAG_KEY: test + TEST_TAG_VAL: stellar-rpc-ci-load-test + SSM_REGISTRATION_TIMEOUT: 240 # SSM agent registers ~30-90s after boot + RESULTS_TIMEOUT: 12600 # 210 min wait for /tmp/done: ~55m bootstrap+build + ~90m benchmark, under the 170m go-test budget. + POLL_INTERVAL: 30 + DEBUG_LOG_LINES: 40 + DEBUG_LOG_EVERY_POLLS: 5 + LOAD_TEST_DIR: cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test + + steps: + - name: Resolve target context + id: target + env: + GH_TOKEN: ${{ github.token }} + run: | + PR_NUMBER=$(gh pr list \ + --repo "${{ github.repository }}" \ + --state open \ + --base main \ + --head "${{ github.ref_name }}" \ + --json number \ + --jq '.[0].number // ""' 2>/dev/null || true) + + RUN_LABEL="${PR_NUMBER:+pr$PR_NUMBER}" + { + echo "pr_number=$PR_NUMBER" + echo "pr_tag_value=${PR_NUMBER:-none}" + echo "run_label=${RUN_LABEL:-${{ github.ref_name }}}" + } >> "$GITHUB_OUTPUT" + + - name: Checkout target ref + uses: actions/checkout@v4 + with: + ref: ${{ github.sha }} + + # The runner-side half is `go run ... runner orchestrate`. + - uses: ./.github/actions/setup-go + + - name: Configure AWS via OIDC + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: ${{ secrets.AWS_GHA_ROLE_ARN }} + aws-region: ${{ env.AWS_REGION }} + role-duration-seconds: 14400 + + - name: Resolve latest Ubuntu 22.04 AMI + id: ami + run: | + AMI=$(aws ec2 describe-images \ + --owners 099720109477 \ + --filters \ + "Name=name,Values=ubuntu/images/hvm-ssd*/ubuntu-jammy-22.04-amd64-server-*" \ + "Name=architecture,Values=x86_64" \ + "Name=state,Values=available" \ + --query 'sort_by(Images, &CreationDate)[-1].ImageId' \ + --output text) + echo "ami=$AMI" >> "$GITHUB_OUTPUT" + + - name: Render user-data + # The script ships verbatim; parameters travel in a two-line preamble + # so the bytes that run on the box match the bytes in git. + run: | + { + echo '#!/usr/bin/env bash' + echo 'export TARGET_SHA=${{ github.sha }} RUN_ID=${{ github.run_id }}' + cat "$LOAD_TEST_DIR/run-load-test.sh" + } > /tmp/user-data.sh + + - name: Launch EC2 instance + id: launch + run: | + COMMON_TAGS="{Key=$TEST_TAG_KEY,Value=$TEST_TAG_VAL}, + {Key=pr,Value=${{ steps.target.outputs.pr_tag_value }}}, + {Key=ref,Value=${{ github.ref_name }}}, + {Key=sha,Value=${{ github.sha }}}, + {Key=run-id,Value=${{ github.run_id }}}" + RUN_INSTANCES_JSON=$(aws ec2 run-instances \ + --image-id "${{ steps.ami.outputs.ami }}" \ + --instance-type "$INSTANCE_TYPE" \ + --iam-instance-profile "Name=$INSTANCE_PROFILE" \ + --user-data file:///tmp/user-data.sh \ + --block-device-mappings "[{ + \"DeviceName\":\"/dev/sda1\", + \"Ebs\":{\"VolumeSize\":$ROOT_VOLUME_GB,\"VolumeType\":\"gp3\",\"Iops\":$BOOTSTRAP_VOLUME_IOPS,\"Throughput\":$BOOTSTRAP_VOLUME_THROUGHPUT,\"DeleteOnTermination\":true} + }]" \ + --tag-specifications \ + "ResourceType=instance,Tags=[ + {Key=Name,Value=load-test-${{ steps.target.outputs.run_label }}}, + $COMMON_TAGS + ]" \ + "ResourceType=volume,Tags=[ + {Key=Name,Value=load-test-${{ steps.target.outputs.run_label }}-root}, + $COMMON_TAGS + ]" \ + --count 1 \ + --output json) + + INSTANCE_ID=$(printf '%s' "$RUN_INSTANCES_JSON" | jq -r '.Instances[0].InstanceId') + echo "instance_id=$INSTANCE_ID" >> "$GITHUB_OUTPUT" + + - name: Acknowledge launch in PR + if: steps.target.outputs.pr_number != '' + env: + GH_TOKEN: ${{ github.token }} + run: | + if ! gh pr comment ${{ steps.target.outputs.pr_number }} \ + --repo ${{ github.repository }} \ + --body "⏳ Load test launching on \`${{ steps.launch.outputs.instance_id }}\` (commit \`${{ github.sha }}\`). + Workflow run: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} + Posting results when the run finishes."; then + echo "::warning::Failed to post launch comment to PR #${{ steps.target.outputs.pr_number }}" + fi + + - name: Wait for SSM agent to register + env: + INSTANCE_ID: ${{ steps.launch.outputs.instance_id }} + run: | + DEADLINE=$(( $(date +%s) + SSM_REGISTRATION_TIMEOUT )) + while [ $(date +%s) -lt $DEADLINE ]; do + PING=$(aws ssm describe-instance-information \ + --filters "Key=InstanceIds,Values=$INSTANCE_ID" \ + --query 'InstanceInformationList[0].PingStatus' \ + --output text 2>/dev/null || echo "") + echo "[$(date -u +%FT%TZ)] ssm ping=$PING" + if [ "$PING" = "Online" ]; then + exit 0 + fi + sleep 10 + done + echo "::error::SSM agent never registered for $INSTANCE_ID — verify AmazonSSMManagedInstanceCore is attached to the stellar-rpc-ci-load-test role" + exit 1 + + - name: Poll for results + id: results + env: + INSTANCE_ID: ${{ steps.launch.outputs.instance_id }} + run: go run "./$LOAD_TEST_DIR/runner" orchestrate + + - name: Write results summary + if: always() + run: | + if [ -f /tmp/results.md ]; then + cat /tmp/results.md >> "$GITHUB_STEP_SUMMARY" + elif [ -f /tmp/timeout-comment.md ]; then + cat /tmp/timeout-comment.md >> "$GITHUB_STEP_SUMMARY" + fi + + - name: Post results to PR + if: steps.target.outputs.pr_number != '' + env: + GH_TOKEN: ${{ github.token }} + run: | + if [ "${{ steps.results.outputs.found }}" = "true" ]; then + BODY=/tmp/results.md + else + BODY=/tmp/timeout-comment.md + fi + if [ ! -s "$BODY" ]; then + echo "::warning::No body to post to PR #${{ steps.target.outputs.pr_number }} ($BODY missing or empty)" + exit 0 + fi + if ! gh pr comment ${{ steps.target.outputs.pr_number }} \ + --repo ${{ github.repository }} \ + --body-file "$BODY"; then + echo "::warning::Failed to post comment to PR #${{ steps.target.outputs.pr_number }}" + fi + + - name: Fail workflow on timeout or load-test failure + if: always() + run: | + if [ "${{ steps.results.outputs.found }}" != "true" ]; then + echo "Load test timed out before producing instance results" + exit 1 + fi + + if [ "${{ steps.results.outputs.passed }}" != "true" ]; then + echo "Instance reported a failing verdict" + cat /tmp/results.md 2>/dev/null || true + exit 1 + fi + + - name: Terminate instance + if: always() && steps.launch.outputs.instance_id != '' + run: | + aws ec2 terminate-instances \ + --instance-ids ${{ steps.launch.outputs.instance_id }} || true diff --git a/.gitignore b/.gitignore index 3978ca2bd..f40ef1973 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,11 @@ captive-core/ .soroban/ !test.toml *.sqlite* + +# Generated load-test ledger corpora (hundreds of MB; canonical copies live in +# s3://stellar-rpc-ci-load-test/ledgers/). +cmd/stellar-rpc/internal/integrationtest/infrastructure/testdata/*.xdr.zstd +cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/*.xdr.zstd + +# Compiled refresh tool (build artifact; rebuild with `go build` in refresh/). +cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/refresh/refresh-tool diff --git a/README.md b/README.md index 06bfcc2d3..7c755966e 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ Integration tests: ```bash STELLAR_RPC_INTEGRATION_TESTS_ENABLED=true \ -STELLAR_RPC_INTEGRATION_TESTS_CORE_MAX_SUPPORTED_PROTOCOL=23 \ +STELLAR_RPC_INTEGRATION_TESTS_CORE_MAX_SUPPORTED_PROTOCOL=25 \ STELLAR_RPC_INTEGRATION_TESTS_CAPTIVE_CORE_BIN=$(which stellar-core) \ go test -v -failfast ./cmd/stellar-rpc/internal/integrationtest/... ``` diff --git a/cmd/stellar-rpc/internal/config/main.go b/cmd/stellar-rpc/internal/config/main.go index 0c5d25696..5878e9fe2 100644 --- a/cmd/stellar-rpc/internal/config/main.go +++ b/cmd/stellar-rpc/internal/config/main.go @@ -68,28 +68,57 @@ type Config struct { RequestBacklogSimulateTransactionQueueLimit uint RequestBacklogGetFeeStatsTransactionQueueLimit uint RequestExecutionWarningThreshold time.Duration - MaxRequestExecutionDuration time.Duration - MaxGetHealthExecutionDuration time.Duration - MaxGetEventsExecutionDuration time.Duration - MaxGetNetworkExecutionDuration time.Duration - MaxGetVersionInfoExecutionDuration time.Duration - MaxGetLatestLedgerExecutionDuration time.Duration - MaxGetLedgerEntriesExecutionDuration time.Duration - MaxGetTransactionExecutionDuration time.Duration - MaxGetTransactionsExecutionDuration time.Duration - MaxGetLedgersExecutionDuration time.Duration - MaxSendTransactionExecutionDuration time.Duration - MaxSimulateTransactionExecutionDuration time.Duration - MaxGetFeeStatsExecutionDuration time.Duration - ServeLedgersFromDatastore bool - BufferedStorageBackendConfig ledgerbackend.BufferedStorageBackendConfig - DataStoreConfig datastore.DataStoreConfig + + MaxRequestExecutionDuration time.Duration + MaxGetHealthExecutionDuration time.Duration + MaxGetEventsExecutionDuration time.Duration + MaxGetNetworkExecutionDuration time.Duration + MaxGetVersionInfoExecutionDuration time.Duration + MaxGetLatestLedgerExecutionDuration time.Duration + MaxGetLedgerEntriesExecutionDuration time.Duration + MaxGetTransactionExecutionDuration time.Duration + MaxGetTransactionsExecutionDuration time.Duration + MaxGetLedgersExecutionDuration time.Duration + MaxSendTransactionExecutionDuration time.Duration + MaxSimulateTransactionExecutionDuration time.Duration + MaxGetFeeStatsExecutionDuration time.Duration + + ServeLedgersFromDatastore bool + BufferedStorageBackendConfig ledgerbackend.BufferedStorageBackendConfig + DataStoreConfig datastore.DataStoreConfig + + LoadTest LoadTestConfig // We memoize these, so they bind to pflags correctly optionsCache *Options flagset *pflag.FlagSet } +// LoadTestConfig groups the options for ingesting from pre-generated synthetic +// ledger bundles. If no files are given, normal captive-core ingestion runs. +type LoadTestConfig struct { + // Files are .xdr.zstd bundles of LedgerCloseMeta records produced by + // stellar-core's apply-load, replayed in order. + Files []string `toml:"files"` + // Frequency paces ingestion, replaying one synthetic ledger per duration. + // Zero means "use DefaultLoadTestFrequency". + Frequency time.Duration `toml:"frequency"` + // MaxLedgersPerFile optionally caps how many ledgers are replayed from each + // file in Files. Zero replays every ledger in every file. + MaxLedgersPerFile uint32 `toml:"max_ledgers_per_file"` +} + +// Enabled reports whether the daemon should ingest from synthetic ledger +// bundles instead of captive core. +func (cfg LoadTestConfig) Enabled() bool { + return len(cfg.Files) > 0 +} + +// DefaultLoadTestFrequency is the pacing used when LoadTestConfig.Frequency +// is unset. Applied at the daemon's use-site rather than at config-load time +// so it survives the TOML-only configuration path. +const DefaultLoadTestFrequency = 2 * time.Second + func (cfg *Config) ExtendedUserAgent(extension string) string { if cfg.HistoryArchiveUserAgent == "" { return extension diff --git a/cmd/stellar-rpc/internal/config/options.go b/cmd/stellar-rpc/internal/config/options.go index f0158cc38..6e1f5ae06 100644 --- a/cmd/stellar-rpc/internal/config/options.go +++ b/cmd/stellar-rpc/internal/config/options.go @@ -617,11 +617,7 @@ func (cfg *Config) options() Options { return unmarshalTOMLTree(i, option.ConfigKey, "buffered_storage_backend_config") }, MarshalTOML: func(_ *Option) (any, error) { - tomlBytes, err := toml.Marshal(defaultBufferedStorageBackendConfig()) - if err != nil { - return nil, fmt.Errorf("failed to marshal buffered_storage_backend_config: %w", err) - } - return toml.LoadBytes(tomlBytes) + return marshalTOMLTree(defaultBufferedStorageBackendConfig(), "buffered_storage_backend_config") }, }, { @@ -632,11 +628,20 @@ func (cfg *Config) options() Options { return unmarshalTOMLTree(i, option.ConfigKey, "datastore_config") }, MarshalTOML: func(_ *Option) (any, error) { - tomlBytes, err := toml.Marshal(defaultDataStoreConfig()) - if err != nil { - return nil, fmt.Errorf("failed to marshal datastore_config: %w", err) - } - return toml.LoadBytes(tomlBytes) + return marshalTOMLTree(defaultDataStoreConfig(), "datastore_config") + }, + }, + { + TomlKey: "load_test_config", + ConfigKey: &cfg.LoadTest, + Usage: "Load testing configuration: replay pre-generated .xdr.zstd ledger bundles " + + "through ingestion. Subkeys: files (list of bundle paths), frequency (duration; " + + "defaults to 2s), max_ledgers_per_file (0 = all). WARNING: destructive to your database.", + CustomSetValue: func(option *Option, i any) error { + return unmarshalTOMLTree(i, option.ConfigKey, "load_test_config") + }, + MarshalTOML: func(_ *Option) (any, error) { + return marshalTOMLTree(defaultLoadTestConfig(), "load_test_config") }, }, } @@ -664,6 +669,22 @@ func defaultDataStoreConfig() datastore.DataStoreConfig { } } +func defaultLoadTestConfig() LoadTestConfig { + return LoadTestConfig{ + Frequency: DefaultLoadTestFrequency, + } +} + +// marshalTOMLTree renders a sub-config struct as the TOML tree the option's +// MarshalTOML hook must return. +func marshalTOMLTree(v any, configName string) (any, error) { + tomlBytes, err := toml.Marshal(v) + if err != nil { + return nil, fmt.Errorf("failed to marshal %s: %w", configName, err) + } + return toml.LoadBytes(tomlBytes) +} + func unmarshalTOMLTree(tree any, out any, configName string) error { t, ok := tree.(*toml.Tree) if !ok { diff --git a/cmd/stellar-rpc/internal/config/toml_test.go b/cmd/stellar-rpc/internal/config/toml_test.go index 4df290558..af137d3e7 100644 --- a/cmd/stellar-rpc/internal/config/toml_test.go +++ b/cmd/stellar-rpc/internal/config/toml_test.go @@ -139,6 +139,12 @@ func TestRoundTrip(t *testing.T) { *v = logrus.InfoLevel case *LogFormat: *v = LogFormatText + case *LoadTestConfig: + *v = LoadTestConfig{ + Files: []string{"a.xdr.zstd", "b.xdr.zstd"}, + Frequency: 5 * time.Second, + MaxLedgersPerFile: 100, + } case *ledgerbackend.BufferedStorageBackendConfig: *v = defaultBufferedStorageBackendConfig() case *datastore.DataStoreConfig: diff --git a/cmd/stellar-rpc/internal/daemon/daemon.go b/cmd/stellar-rpc/internal/daemon/daemon.go index a053d8324..4db8883a0 100644 --- a/cmd/stellar-rpc/internal/daemon/daemon.go +++ b/cmd/stellar-rpc/internal/daemon/daemon.go @@ -22,6 +22,7 @@ import ( "github.com/stellar/go-stellar-sdk/clients/stellarcore" "github.com/stellar/go-stellar-sdk/historyarchive" "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" + "github.com/stellar/go-stellar-sdk/ingest/loadtest" "github.com/stellar/go-stellar-sdk/support/datastore" supporthttp "github.com/stellar/go-stellar-sdk/support/http" supportlog "github.com/stellar/go-stellar-sdk/support/log" @@ -322,12 +323,33 @@ func createIngestService(cfg *config.Config, logger *supportlog.Entry, daemon *D logger.WithError(err).Error("could not run ingestion. Retrying") } + var backend ledgerbackend.LedgerBackend = daemon.core + if cfg.LoadTest.Enabled() { + // CustomSetValue/MarshalTOML doesn't apply DefaultValue, so fall back here. + frequency := cfg.LoadTest.Frequency + if frequency == 0 { + frequency = config.DefaultLoadTestFrequency + } + logger. + WithField("files", cfg.LoadTest.Files). + WithField("max_ledgers_per_file", cfg.LoadTest.MaxLedgersPerFile). + WithField("close_time", frequency). + Warn("Ingestion will run with load testing") + + backend = loadtest.NewLedgerBackend(loadtest.LedgerBackendConfig{ + NetworkPassphrase: cfg.NetworkPassphrase, + LedgersFilePaths: cfg.LoadTest.Files, + LedgerCloseDuration: frequency, + MaxLedgersPerFile: cfg.LoadTest.MaxLedgersPerFile, + }) + } + ingestCfg := ingest.Config{ Logger: logger, DB: rw, NetworkPassPhrase: cfg.NetworkPassphrase, Archive: *historyArchive, - LedgerBackend: daemon.core, + LedgerBackend: backend, Timeout: cfg.IngestionTimeout, OnIngestionRetry: onIngestionRetry, Daemon: daemon, @@ -431,6 +453,12 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows d.db, ) + // In load-test mode the existing DB is treated as opaque carrier state for + // ingestion timing; skip the fee-stat / migration backfill + if cfg.LoadTest.Enabled() { + return feeWindows + } + // 1. First, identify the ledger range for database migrations based on the // ledger retention window. Since we don't do "partial" migrations (all or // nothing), this represents the entire range of ledger metas we store. diff --git a/cmd/stellar-rpc/internal/ingest/service.go b/cmd/stellar-rpc/internal/ingest/service.go index 5808974e2..dcaac5347 100644 --- a/cmd/stellar-rpc/internal/ingest/service.go +++ b/cmd/stellar-rpc/internal/ingest/service.go @@ -12,6 +12,7 @@ import ( "github.com/stellar/go-stellar-sdk/historyarchive" backends "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" + "github.com/stellar/go-stellar-sdk/ingest/loadtest" "github.com/stellar/go-stellar-sdk/support/log" "github.com/stellar/go-stellar-sdk/xdr" @@ -110,11 +111,16 @@ func (s *Service) Start(cfg Config) { // keep retrying until history archives are published constantBackoff.Reset() } + if errors.Is(err, loadtest.ErrLoadTestDone) { + // Load-test mode: synthetic ledger stream is exhausted. + // Stop retrying and let the daemon stay up so its DB can be queried + return backoff.Permanent(err) + } return err }, contextBackoff, cfg.OnIngestionRetry) - if err != nil && !errors.Is(err, context.Canceled) { + if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, loadtest.ErrLoadTestDone) { s.logger.WithError(err).Fatal("could not run ingestion") } }) diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/run-load-test.sh b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/run-load-test.sh new file mode 100755 index 000000000..58af05f3a --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/run-load-test.sh @@ -0,0 +1,92 @@ +#!/usr/bin/env bash +# Bootstraps the ephemeral load-test box (EC2 user-data): installs the toolchain, +# checks out TARGET_SHA, then hands off to `runner instantiate`, which streams the +# corpus from S3 and runs the ingest benchmark. The other half, `runner +# orchestrate`, polls for results over SSM from the GHA runner. +# +# Marker protocol shared with the runner half: +# /tmp/download-complete instance: corpus fetched; benchmark running +# /tmp/results.md instance: result body (presentation only) +# /tmp/done instance: machine-readable verdict ("ok"/"fail") + +set -euo pipefail +log() { echo "[$(date -u +%FT%TZ)] $*"; } + +exec > >(tee -a /var/log/user-data.log | logger -t user-data -s 2>/dev/console) 2>&1 + +TARGET_SHA="${TARGET_SHA:-}" +RUN_ID="${RUN_ID:-manual}" +REPO="${REPO:-stellar/stellar-rpc}" +WORK_DIR="${WORK_DIR:-/data}" +RESULTS_FILE="${RESULTS_FILE:-/tmp/results.md}" +DEFAULT_BRANCH=apply-load + +# bail writes a failure verdict the runner half can read, then stops. It guards +# only the pre-Go bootstrap phase; once the Go runner starts it owns the verdict. +bail() { + log "FATAL: $*" + { printf '❌ **Ingest load test failed** (run %s on `%s`)\n\n```\n' "$RUN_ID" "$TARGET_SHA" + printf '%s\n' "$*" + printf '```\n'; } > "$RESULTS_FILE" + echo fail > /tmp/done + exit 1 +} +trap 'bail "unhandled error at line $LINENO while running: $BASH_COMMAND"' ERR + +log "clearing stale run state" +rm -f /tmp/done /tmp/download-complete \ + /tmp/bench-results.json /tmp/load-test-ledgers-*.xdr.zstd \ + "$RESULTS_FILE" +rm -rf "$WORK_DIR/stellar-rpc" + +log "installing deps" +export DEBIAN_FRONTEND=noninteractive +apt-get update -qq +# jq is required by `make build-libs` (compile-time version stamping), not just tooling. +apt-get install -y -qq --no-install-recommends \ + curl git jq build-essential ca-certificates \ + libpq5 libsodium23 libunwind8 libc++1-14 + +GO_VERSION=1.25.11 +curl -fsSL "https://go.dev/dl/go${GO_VERSION}.linux-amd64.tar.gz" | tar -xz -C /usr/local +export HOME="${HOME:-/root}" +export GOPATH="${GOPATH:-$HOME/go}" +export GOMODCACHE="${GOMODCACHE:-$GOPATH/pkg/mod}" +export GOCACHE="${GOCACHE:-$HOME/.cache/go-build}" +export CARGO_HOME=/root/.cargo +export RUSTUP_HOME=/root/.rustup +export PATH="/usr/local/go/bin:${CARGO_HOME}/bin:$PATH" +mkdir -p "$GOMODCACHE" "$GOCACHE" "$GOPATH/bin" + +command -v cargo >/dev/null || curl -fsSL https://sh.rustup.rs \ + | sh -s -- -y --profile minimal --default-toolchain stable + +# build-libs needs real git metadata, so this is a git tree not a source archive. +mkdir -p "$WORK_DIR" +cd "$WORK_DIR" +mkdir -p stellar-rpc && cd stellar-rpc +git init -q +git remote add origin "https://github.com/$REPO.git" +if [ -z "$TARGET_SHA" ]; then + log "TARGET_SHA unset; shallow fetching origin/$DEFAULT_BRANCH" + git fetch --depth 1 origin "$DEFAULT_BRANCH" || bail "failed to fetch origin/$DEFAULT_BRANCH" + git checkout --detach FETCH_HEAD + TARGET_SHA=$(git rev-parse HEAD) +elif git fetch --depth 1 origin "$TARGET_SHA"; then + git checkout --detach FETCH_HEAD +else + log "direct commit fetch failed; falling back to full clone" + cd "$WORK_DIR" && rm -rf stellar-rpc + git clone "https://github.com/$REPO.git" stellar-rpc && cd stellar-rpc + git fetch origin "+refs/pull/*:refs/remotes/origin/pr/*" 2>/dev/null || true + git checkout "$TARGET_SHA" +fi +log "checked out $TARGET_SHA; handing off to the Go runner" + +# The runner owns the verdict markers from here; release the bootstrap trap. The +# fallback below only covers a runner that dies before emitting one (e.g. compile error). +trap - ERR +export TARGET_SHA RUN_ID REPO WORK_DIR RESULTS_FILE +if ! go run ./cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner instantiate; then + [ -f /tmp/done ] || bail "go runner exited without writing a verdict" +fi diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/instantiate.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/instantiate.go new file mode 100644 index 000000000..23056b90d --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/instantiate.go @@ -0,0 +1,279 @@ +package main + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/klauspost/compress/zstd" +) + +// Marker files of the cross-half protocol (see package doc). +const ( + markerDownloadComplete = "/tmp/download-complete" + markerDone = "/tmp/done" +) + +func env(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +// ledgerScenarios are the apply-load profiles ingested as one concatenated +// stream: bundle i is paired with config i (apply-load-v27-.cfg). +var ledgerScenarios = []string{"oz", "sac", "soroswap"} + +// instantiate is the instance half (the bootstrap has already installed the +// toolchain and checked out the repo): it streams the corpus from S3, runs the +// benchmark, and writes the ok/fail verdict. +func instantiate(ctx context.Context) error { + var ( + bucket = env("BUCKET", "stellar-rpc-ci-load-test") + region = env("REGION", "us-east-1") + workDir = env("WORK_DIR", "/data") + goldenDB = env("GOLDEN_DB", filepath.Join(workDir, "golden.sqlite")) + resultsFile = env("RESULTS_FILE", "/tmp/results.md") + targetSHA = os.Getenv("TARGET_SHA") + runID = env("RUN_ID", "manual") + ) + + repoRoot, err := os.Getwd() + if err != nil { + return err + } + bail := func(format string, args ...any) error { + return bailInstance(resultsFile, runID, targetSHA, fmt.Sprintf(format, args...)) + } + + awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region)) + if err != nil { + return bail("loading AWS config: %v", err) + } + fetch := &s3Fetcher{client: s3.NewFromConfig(awsCfg), bucket: bucket} + + logger.Infof("clearing stale run state") + for _, m := range []string{markerDone, markerDownloadComplete} { + _ = os.Remove(m) + } + + configDir := filepath.Join(repoRoot, "cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata") + bundlePaths, configPaths, goldenFetchSecs, err := fetchCorpus(ctx, fetch, goldenDB, configDir) + if err != nil { + return bail("%v", err) + } + + logger.Infof("download complete") + if err := os.WriteFile(markerDownloadComplete, nil, 0o644); err != nil { + return bail("writing %s: %v", markerDownloadComplete, err) + } + + logger.Infof("building rpc libs") + if err := runStreaming(ctx, repoRoot, nil, 40, "make", "build-libs"); err != nil { + return bail("make build-libs failed: %v", err) + } + + // The benchmark runs at the volume's provisioned throughput; load-test.yml + // pins the volume to 125 MiB/s for the run. + logger.Infof("running ingest perf benchmark") + benchEnv := []string{ + "LOADTEST_INGEST_LEDGER_PATH=" + strings.Join(bundlePaths, ","), + "LOADTEST_CONFIG_PATH=" + strings.Join(configPaths, ","), + "LOADTEST_INGEST_DEADLINE=" + env("LOADTEST_INGEST_DEADLINE", "150m"), + "LOADTEST_SQLITE_PATH=" + goldenDB, + "PERF_RESULTS_PATH=/tmp/bench-results.json", + "PERF_RESULTS_MD_PATH=" + resultsFile, + "PERF_TARGET_SHA=" + targetSHA, + "PERF_RUN_ID=" + runID, + "PERF_REPO=" + env("REPO", "stellar/stellar-rpc"), + fmt.Sprintf("PERF_GOLDEN_FETCH_SECONDS=%d", goldenFetchSecs), + "STELLAR_RPC_INTEGRATION_TESTS_ENABLED=true", + } + if err := runStreaming(ctx, repoRoot, benchEnv, 80, + "go", "test", "-run", "TestIngestSyntheticLedgers", "-timeout", "170m", "-v", + "./cmd/stellar-rpc/internal/integrationtest/"); err != nil { + return bail("benchmark failed:\n%v", err) + } + + if fi, err := os.Stat(resultsFile); err != nil || fi.Size() == 0 { + return bail("benchmark succeeded but did not emit %s", resultsFile) + } + logger.Infof("results ready; signaling %s", markerDone) + return os.WriteFile(markerDone, []byte("ok\n"), 0o644) +} + +// bailInstance writes the failure body and a "fail" verdict marker, then exits +// non-zero so the runner sees a verdict instead of hanging until its timeout. +func bailInstance(resultsFile, runID, targetSHA, msg string) error { + logger.Error(msg) + body := fmt.Sprintf("❌ **Ingest load test failed** (run %s on `%s`)\n\n```\n%s\n```\n", runID, targetSHA, msg) + _ = os.WriteFile(resultsFile, []byte(body), 0o644) + _ = os.WriteFile(markerDone, []byte("fail\n"), 0o644) + os.Exit(1) + return nil // unreachable +} + +// runStreaming runs name in dir (with extra env appended), streaming combined +// output to our log; on failure the error carries the last tailN lines so the +// verdict explains what broke. +func runStreaming(ctx context.Context, dir string, env []string, tailN int, name string, args ...string) error { + cmd := exec.CommandContext(ctx, name, args...) + cmd.Dir = dir + cmd.Env = append(os.Environ(), env...) + var buf strings.Builder + w := io.MultiWriter(os.Stderr, &buf) + cmd.Stdout, cmd.Stderr = w, w + if err := cmd.Run(); err != nil { + return fmt.Errorf("%w\n%s", err, lastLines(buf.String(), tailN)) + } + return nil +} + +func lastLines(s string, n int) string { + lines := strings.Split(strings.TrimRight(s, "\n"), "\n") + if len(lines) > n { + lines = lines[len(lines)-n:] + } + return strings.Join(lines, "\n") +} + +// fetchCorpus streams the golden DB, stellar-core, and ledger bundles from S3, +// returning the bundle paths, their matching config paths (config i describes +// bundle i), and the golden DB fetch duration. +func fetchCorpus(ctx context.Context, fetch *s3Fetcher, goldenDB, configDir string) ([]string, []string, int, error) { + // current/prev1/prev2 lets a run fall back to an older golden DB snapshot + // while a fresh one is being published. + goldenFetchSecs := -1 + for _, pfx := range []string{"current", "prev1", "prev2"} { + key := pfx + "/golden.sqlite.zst" + logger.Infof("streaming s3://%s/%s", fetch.bucket, key) + start := time.Now() + if err := fetch.fetchVerified(ctx, key, goldenDB, true, "golden DB"); err != nil { + logger.Infof("%v", err) + _ = os.Remove(goldenDB) + continue + } + goldenFetchSecs = int(time.Since(start).Seconds()) + logger.Infof("golden DB ready in %ds", goldenFetchSecs) + break + } + if goldenFetchSecs < 0 { + return nil, nil, 0, errors.New("no golden.sqlite.zst in current/, prev1/, or prev2/") + } + + // The SDF apt-package stellar-core lacks apply-load (BUILD_TESTS-gated), so we + // ship a pre-built binary under core/. + const corePath = "/usr/local/bin/stellar-core" + if err := fetch.fetchVerified(ctx, "core/stellar-core.zst", corePath, true, "stellar-core"); err != nil { + return nil, nil, 0, err + } + if err := os.Chmod(corePath, 0o755); err != nil { + return nil, nil, 0, fmt.Errorf("chmod stellar-core: %w", err) + } + + var bundlePaths, configPaths []string + for _, sc := range ledgerScenarios { + bundlePath := fmt.Sprintf("/tmp/load-test-ledgers-v27-%s.xdr.zstd", sc) + key := fmt.Sprintf("ledgers/load-test-ledgers-v27-%s.xdr.zstd", sc) + if err := fetch.fetchVerified(ctx, key, bundlePath, false, "ledger bundle ("+sc+")"); err != nil { + return nil, nil, 0, err + } + bundlePaths = append(bundlePaths, bundlePath) + + cfg := filepath.Join(configDir, fmt.Sprintf("apply-load-v27-%s.cfg", sc)) + if _, err := os.Stat(cfg); err != nil { + return nil, nil, 0, fmt.Errorf("missing apply-load config %s in checkout", cfg) + } + configPaths = append(configPaths, cfg) + } + return bundlePaths, configPaths, goldenFetchSecs, nil +} + +// s3Fetcher streams objects from one bucket, sha-verifying when the object +// carries sha256-raw metadata. +type s3Fetcher struct { + client *s3.Client + bucket string +} + +// fetchVerified downloads key to dst (zstd-decoding when zstdMode), checking its +// sha256 against the object's sha256-raw metadata when present. +func (f *s3Fetcher) fetchVerified(ctx context.Context, key, dst string, zstdMode bool, label string) error { + expected := f.expectedSHA(ctx, key, label) + logger.Infof("fetching %s", label) + got, err := f.streamObject(ctx, key, dst, zstdMode) + if err != nil { + return fmt.Errorf("failed to download %s: %w", label, err) + } + if expected != "" && expected != got { + return fmt.Errorf("%s hash mismatch: expected %s, got %s", label, expected, got) + } + if expected == "" { + logger.Infof("%s hash computed (unverified) (%s)", label, got) + } else { + logger.Infof("%s hash OK (%s)", label, got) + } + return nil +} + +// expectedSHA returns the object's sha256-raw metadata, or "" if the object or +// the key is absent (caller then fetches unverified). +func (f *s3Fetcher) expectedSHA(ctx context.Context, key, label string) string { + head, err := f.client.HeadObject(ctx, &s3.HeadObjectInput{Bucket: &f.bucket, Key: &key}) + if err != nil { + logger.Warnf("head-object failed for s3://%s/%s; fetching %s without checksum", f.bucket, key, label) + return "" + } + // S3 lowercases user-metadata keys; the SDK strips the x-amz-meta- prefix. + if sha := head.Metadata["sha256-raw"]; sha != "" { + return sha + } + logger.Warnf("no sha256-raw on s3://%s/%s; skipping %s checksum", f.bucket, key, label) + return "" +} + +// streamObject downloads key to dst (zstd-decoding when zstdMode) and returns +// the sha256 of the bytes written. +func (f *s3Fetcher) streamObject(ctx context.Context, key, dst string, zstdMode bool) (string, error) { + out, err := f.client.GetObject(ctx, &s3.GetObjectInput{Bucket: &f.bucket, Key: &key}) + if err != nil { + return "", err + } + defer out.Body.Close() + + var src io.Reader = out.Body + if zstdMode { + zr, err := zstd.NewReader(out.Body) + if err != nil { + return "", err + } + defer zr.Close() + src = zr + } + + file, err := os.Create(dst) + if err != nil { + return "", err + } + defer file.Close() + + hasher := sha256.New() + if _, err := io.Copy(io.MultiWriter(file, hasher), src); err != nil { + return "", err + } + if err := file.Sync(); err != nil { + return "", err + } + return hex.EncodeToString(hasher.Sum(nil)), nil +} diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/orchestrate.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/orchestrate.go new file mode 100644 index 000000000..e497e6c4b --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/orchestrate.go @@ -0,0 +1,246 @@ +// Command runner drives the ephemeral RPC ingestion load test. It has two +// subcommands, one per environment the test spans: +// +// runner instantiate on the EC2 box, after a shell preamble has installed +// the toolchain and checked out the repo: streams the +// golden DB, stellar-core, and ledger bundles from S3 +// (sha-verified), runs the ingest benchmark, and writes +// an ok/fail verdict. +// runner orchestrate on the GHA runner: polls the box over SSM and relays +// the verdict + results as step outputs. +// +// The two halves coordinate only through a /tmp marker protocol. +package main + +import ( + "context" + "fmt" + "os" + "strconv" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/ssm" + + supportlog "github.com/stellar/go-stellar-sdk/support/log" +) + +var logger = supportlog.New() + +func main() { + logger.SetLevel(supportlog.InfoLevel) + + cmd := "instantiate" + if len(os.Args) > 1 { + cmd = os.Args[1] + } + + ctx := context.Background() + var err error + switch cmd { + case "instantiate": + err = instantiate(ctx) + case "orchestrate": + err = orchestrate(ctx) + default: + fmt.Fprintf(os.Stderr, "usage: %s [instantiate|orchestrate]\n", os.Args[0]) + os.Exit(64) + } + if err != nil { + logger.Errorf("fatal: %v", err) + os.Exit(1) + } +} + +// pollCommand reports the box's state each poll; /tmp/done holds the verdict on +// its first line and the results body on the rest. +const pollCommand = `if [ -f /tmp/done ]; then cat /tmp/done /tmp/results.md; ` + + `elif [ -f /tmp/download-complete ]; then echo __DOWNLOAD_COMPLETE__; else echo __NOT_READY__; fi` + +// commandWaitTimeout backstops a stuck SSM command; the polled ones are instant. +const commandWaitTimeout = 60 * time.Second + +// requireEnv returns the values of keys in order, erroring with every unset one. +func requireEnv(keys ...string) ([]string, error) { + vals := make([]string, len(keys)) + var missing []string + for i, k := range keys { + if vals[i] = os.Getenv(k); vals[i] == "" { + missing = append(missing, k) + } + } + if len(missing) > 0 { + return nil, fmt.Errorf("missing required env: %s", strings.Join(missing, ", ")) + } + return vals, nil +} + +func envInt(key string, def int) int { + if v, err := strconv.Atoi(os.Getenv(key)); err == nil { + return v + } + return def +} + +// orchestrate polls the box until it reports a verdict, relaying the result as +// step outputs; on timeout it writes a debug comment instead. +func orchestrate(ctx context.Context) error { + vals, err := requireEnv("INSTANCE_ID", "AWS_REGION", + "RESULTS_TIMEOUT", "POLL_INTERVAL", "GITHUB_OUTPUT", "DEBUG_LOG_LINES", "DEBUG_LOG_EVERY_POLLS") + if err != nil { + return err + } + instanceID, region := vals[0], vals[1] + var ( + resultsTimeout = time.Duration(envInt("RESULTS_TIMEOUT", 0)) * time.Second + pollInterval = time.Duration(envInt("POLL_INTERVAL", 30)) * time.Second + githubOutput = vals[4] + debugLogLines = envInt("DEBUG_LOG_LINES", 40) + debugEveryPolls = envInt("DEBUG_LOG_EVERY_POLLS", 5) + ) + + awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region)) + if err != nil { + return err + } + runner := &ssmRunner{client: ssm.NewFromConfig(awsCfg), instanceID: instanceID} + + deadline := time.Now().Add(resultsTimeout) + for pollCount := 1; time.Now().Before(deadline); pollCount++ { + out, derr := runner.capture(ctx, pollCommand) + if derr != nil { + logger.Warn("ssm poll dispatch failed; retrying") + time.Sleep(pollInterval) + continue + } + + switch state, verdict, body := classifyPollOutput(out); state { + case pollDone: + logger.Infof("result payload from instance (verdict: %s)", verdict) + _ = os.WriteFile("/tmp/results.md", []byte(body), 0o644) + return appendOutputs(githubOutput, + "found=true", + fmt.Sprintf("passed=%t", verdict == "ok")) + case pollDownloadComplete: + logger.Infof("download stage complete; benchmark running, waiting for /tmp/done") + case pollNotReady: + logger.Infof("still waiting for /tmp/done") + } + + if pollCount%debugEveryPolls == 0 { + logger.Infof("debug tail:\n%s", runner.debugTail(ctx, debugLogLines)) + } + time.Sleep(pollInterval) + } + + return writeTimeoutComment(ctx, runner, githubOutput, instanceID, resultsTimeout, debugLogLines) +} + +// ssmRunner runs shell commands on one instance over SSM RunShellScript. +type ssmRunner struct { + client *ssm.Client + instanceID string +} + +// capture dispatches command, waits for it, and returns its stdout. A non-nil +// error means dispatch failed; an unreadable result is "". +func (r *ssmRunner) capture(ctx context.Context, command string) (string, error) { + var id string + var sendErr error + for attempt := 1; attempt <= 3; attempt++ { + out, err := r.client.SendCommand(ctx, &ssm.SendCommandInput{ + InstanceIds: []string{r.instanceID}, + DocumentName: aws.String("AWS-RunShellScript"), + Parameters: map[string][]string{"commands": {command}}, + }) + if err == nil { + id = aws.ToString(out.Command.CommandId) + break + } + sendErr = err + logger.Warnf("ssm send-command attempt %d failed", attempt) + time.Sleep(5 * time.Second) + } + if id == "" { + return "", fmt.Errorf("ssm send-command failed: %w", sendErr) + } + + in := &ssm.GetCommandInvocationInput{CommandId: &id, InstanceId: &r.instanceID} + _ = ssm.NewCommandExecutedWaiter(r.client).Wait(ctx, in, commandWaitTimeout) + inv, err := r.client.GetCommandInvocation(ctx, in) + if err != nil { + // Unreadable result is "not ready", not a dispatch failure. + return "", nil //nolint:nilerr + } + return aws.ToString(inv.StandardOutputContent), nil +} + +// debugTail returns the last n lines of the box's user-data log, or a sentinel. +func (r *ssmRunner) debugTail(ctx context.Context, n int) string { + cmd := fmt.Sprintf("if [ -f /var/log/user-data.log ]; then tail -n %d /var/log/user-data.log; "+ + "else echo __NO_DEBUG_LOG__; fi", n) + out, err := r.capture(ctx, cmd) + if err != nil || out == "" { + return "__DEBUG_TAIL_UNAVAILABLE__" + } + return out +} + +type pollState int + +const ( + pollNotReady pollState = iota + pollDownloadComplete + pollDone +) + +// classifyPollOutput decodes pollCommand's stdout into (state, verdict, body). +// For pollDone, verdict is the first line ("ok"/"fail") and body is the rest. +func classifyPollOutput(out string) (pollState, string, string) { + if out == "" { + return pollNotReady, "", "" + } + first, rest, _ := strings.Cut(out, "\n") + switch first { + case "__NOT_READY__": + return pollNotReady, "", "" + case "__DOWNLOAD_COMPLETE__": + return pollDownloadComplete, "", "" + default: + return pollDone, first, rest + } +} + +// appendOutputs appends lines to the GitHub Actions step-output file. +func appendOutputs(path string, lines ...string) error { + f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0o644) + if err != nil { + return err + } + defer f.Close() + _, err = fmt.Fprintln(f, strings.Join(lines, "\n")) + return err +} + +// writeTimeoutComment is the no-verdict path: it writes a comment to +// /tmp/timeout-comment.md and records found=false. +func writeTimeoutComment(ctx context.Context, runner *ssmRunner, githubOutput, instanceID string, + resultsTimeout time.Duration, debugLogLines int, +) error { + var b strings.Builder + fmt.Fprintf(&b, "❌ Load test did not produce results within %.0fs.\n\n", resultsTimeout.Seconds()) + fmt.Fprintf(&b, "Instance: `%s`\n", instanceID) + srv, repo, run := os.Getenv("GITHUB_SERVER_URL"), os.Getenv("GITHUB_REPOSITORY"), os.Getenv("GITHUB_RUN_ID") + if srv != "" && repo != "" && run != "" { + fmt.Fprintf(&b, "Workflow run: %s/%s/actions/runs/%s\n", srv, repo, run) + } + if tail := runner.debugTail(ctx, debugLogLines); tail != "" { + fmt.Fprintf(&b, "\nLast %d lines of /var/log/user-data.log:\n\n```\n%s\n```\n", debugLogLines, tail) + } + if err := os.WriteFile("/tmp/timeout-comment.md", []byte(b.String()), 0o644); err != nil { + return err + } + return appendOutputs(githubOutput, "found=false") +} diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/runner_test.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/runner_test.go new file mode 100644 index 000000000..b596b9da5 --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/runner_test.go @@ -0,0 +1,26 @@ +package main + +import "testing" + +func TestClassifyPollOutput(t *testing.T) { + cases := []struct { + name, out string + state pollState + verdict, body string + }{ + {"empty is not ready", "", pollNotReady, "", ""}, + {"sentinel not ready", "__NOT_READY__\n", pollNotReady, "", ""}, + {"download complete", "__DOWNLOAD_COMPLETE__\n", pollDownloadComplete, "", ""}, + {"ok with body", "ok\n# Results\n| a | b |", pollDone, "ok", "# Results\n| a | b |"}, + {"fail verdict, empty body", "fail\n", pollDone, "fail", ""}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + state, verdict, body := classifyPollOutput(c.out) + if state != c.state || verdict != c.verdict || body != c.body { + t.Fatalf("got (%d, %q, %q), want (%d, %q, %q)", + state, verdict, body, c.state, c.verdict, c.body) + } + }) + } +} diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-oz.cfg b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-oz.cfg new file mode 100644 index 000000000..c3d6e3ace --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-oz.cfg @@ -0,0 +1,65 @@ +# v27 apply-load config: OZ (custom_token) Soroban scenario, with meta enabled +# so the output is an ingestible LedgerCloseMeta corpus. +# +# Run in a dedicated work dir: +# stellar-core apply-load --conf +# This emits meta.xdr (a stream of LedgerCloseMeta). The benchmark closes +# exactly APPLY_LOAD_NUM_LEDGERS ledgers at the end, so the *last* +# APPLY_LOAD_NUM_LEDGERS frames of meta.xdr are the corpus. +# +# Scenario profile: 900 soroban (custom token transfer) txs/ledger + 1000 +# classic payment txs/ledger. + +# Select the apply-load mode and benchmark model transaction. +APPLY_LOAD_MODE="benchmark" +APPLY_LOAD_MODEL_TX="custom_token" + +# Perf knobs (do not affect generated transaction content). +APPLY_LOAD_TIME_WRITES = true +DISABLE_SOROBAN_METRICS_FOR_TESTING = true + +# --- Meta ENABLED (this is what makes the output ingestible) --- +# NB: do NOT set DISABLE_TX_META_FOR_TESTING; BUILD_TESTS forces tx meta on, +# which we want. LOG_FILE_PATH="" sends logs to stdout. +METADATA_OUTPUT_STREAM = 'meta.xdr' +METADATA_DEBUG_LEDGERS = 0 +LOG_FILE_PATH = "" + +# Soroban txs per ledger for this scenario. Custom token has no batching, so +# APPLY_LOAD_MAX_SOROBAN_TX_COUNT is the soroban TPL directly. +APPLY_LOAD_MAX_SOROBAN_TX_COUNT = 900 + +# Classic payment txs per ledger (same for all three scenarios). +APPLY_LOAD_CLASSIC_TXS_PER_LEDGER = 1000 + +# Number of parallel transaction clusters. +APPLY_LOAD_LEDGER_MAX_DEPENDENT_TX_CLUSTERS = 1 + +# Number of benchmark ledgers to close. +APPLY_LOAD_NUM_LEDGERS = 1000 + +# Disable bucket list pre-generation (not needed for benchmark mode). +APPLY_LOAD_BL_SIMULATED_LEDGERS = 0 +APPLY_LOAD_BL_WRITE_FREQUENCY = 0 +APPLY_LOAD_BL_BATCH_SIZE = 0 +APPLY_LOAD_BL_LAST_BATCH_SIZE = 0 +APPLY_LOAD_BL_LAST_BATCH_LEDGERS = 0 + +# Common apply-load boilerplate. +ARTIFICIALLY_GENERATE_LOAD_FOR_TESTING=true +ENABLE_SOROBAN_DIAGNOSTIC_EVENTS = false +# Must cover MAX_SOROBAN_TX_COUNT*2 + CLASSIC_TXS_PER_LEDGER (~2800 here). +GENESIS_TEST_ACCOUNT_COUNT = 21000 + +# Minimal core config boilerplate. +UNSAFE_QUORUM=true +HTTP_PORT=11627 +# NB: the apply-load command hard-overrides NETWORK_PASSPHRASE to "Apply Load" +# (stellar-core CommandLine.cpp runApplyLoad), so this value is cosmetic and +# CANNOT be used to make tx hashes differ between scenarios. +NETWORK_PASSPHRASE="Apply Load" +NODE_SEED="SDQVDISRYN2JXBS7ICL7QJAEKB3HWBJFP2QECXG7GZICAHBK4UNJCWK2 self" + +[QUORUM_SET] +THRESHOLD_PERCENT=100 +VALIDATORS=["$self"] diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-sac.cfg b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-sac.cfg new file mode 100644 index 000000000..31a2d2414 --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-sac.cfg @@ -0,0 +1,82 @@ +# v27 apply-load config: SAC Soroban scenario, with meta enabled so the output +# is an ingestible LedgerCloseMeta corpus. +# +# Run in a dedicated work dir: +# stellar-core apply-load --conf +# This emits meta.xdr (a stream of LedgerCloseMeta). The benchmark closes +# exactly APPLY_LOAD_NUM_LEDGERS ledgers at the end, so the *last* +# APPLY_LOAD_NUM_LEDGERS frames of meta.xdr are the corpus. +# +# Scenario profile: 1000 soroban (SAC transfer) txs/ledger + 1000 classic +# payment txs/ledger. + +# Select the apply-load mode and benchmark model transaction. +APPLY_LOAD_MODE="benchmark" +APPLY_LOAD_MODEL_TX="sac" + +# Perf knobs (do not affect generated transaction content). +APPLY_LOAD_TIME_WRITES = true +DISABLE_SOROBAN_METRICS_FOR_TESTING = true + +# --- Meta ENABLED (this is what makes the output ingestible) --- +# NB: do NOT set DISABLE_TX_META_FOR_TESTING; BUILD_TESTS forces tx meta on, +# which we want. LOG_FILE_PATH="" sends logs to stdout. +METADATA_OUTPUT_STREAM = 'meta.xdr' +METADATA_DEBUG_LEDGERS = 0 +LOG_FILE_PATH = "" + +# Soroban txs per ledger for this scenario. For SAC, the number of tx envelopes +# is APPLY_LOAD_MAX_SOROBAN_TX_COUNT / APPLY_LOAD_BATCH_SAC_COUNT, so batch=1 +# makes MAX_SOROBAN_TX_COUNT the soroban TPL directly. +APPLY_LOAD_MAX_SOROBAN_TX_COUNT = 1000 +APPLY_LOAD_BATCH_SAC_COUNT = 1 + +# Classic payment txs per ledger (same for all three scenarios). +APPLY_LOAD_CLASSIC_TXS_PER_LEDGER = 1000 + +# Keep classic payments DISJOINT from the other scenarios' bundles. +# Classic payment source accounts are the window +# [mNumAccounts - CLASSIC_TXS, mNumAccounts) +# where for SAC benchmark mode +# mNumAccounts = MAX_SOROBAN_TX_COUNT * SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER +# + CLASSIC_TXS + 2. +# Test-account keypairs are deterministic per index and apply-load always +# hashes with the fixed "Apply Load" passphrase, so overlapping windows yield +# byte-identical classic payments (colliding tx hashes) across scenario +# bundles. With the default multiplier (2) the sac window [2002,3002) overlaps +# oz's [1800,2800). Multiplier 3 shifts sac to [3002,4002), disjoint from oz +# [1800,2800) and soroswap [251,1251). +# (BUILD_TESTS-only key; sets SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER.) +SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER_FOR_TESTING = 3 + +# Number of parallel transaction clusters. +APPLY_LOAD_LEDGER_MAX_DEPENDENT_TX_CLUSTERS = 1 + +# Number of benchmark ledgers to close. +APPLY_LOAD_NUM_LEDGERS = 1000 + +# Disable bucket list pre-generation (not needed for benchmark mode). +APPLY_LOAD_BL_SIMULATED_LEDGERS = 0 +APPLY_LOAD_BL_WRITE_FREQUENCY = 0 +APPLY_LOAD_BL_BATCH_SIZE = 0 +APPLY_LOAD_BL_LAST_BATCH_SIZE = 0 +APPLY_LOAD_BL_LAST_BATCH_LEDGERS = 0 + +# Common apply-load boilerplate. +ARTIFICIALLY_GENERATE_LOAD_FOR_TESTING=true +ENABLE_SOROBAN_DIAGNOSTIC_EVENTS = false +# Must cover MAX_SOROBAN_TX_COUNT*3 + CLASSIC_TXS_PER_LEDGER + 2 (~4002 here). +GENESIS_TEST_ACCOUNT_COUNT = 21000 + +# Minimal core config boilerplate. +UNSAFE_QUORUM=true +HTTP_PORT=11626 +# NB: the apply-load command hard-overrides NETWORK_PASSPHRASE to "Apply Load" +# (stellar-core CommandLine.cpp runApplyLoad), so this value is cosmetic and +# CANNOT be used to make tx hashes differ between scenarios. +NETWORK_PASSPHRASE="Apply Load" +NODE_SEED="SDQVDISRYN2JXBS7ICL7QJAEKB3HWBJFP2QECXG7GZICAHBK4UNJCWK2 self" + +[QUORUM_SET] +THRESHOLD_PERCENT=100 +VALIDATORS=["$self"] diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-soroswap.cfg b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-soroswap.cfg new file mode 100644 index 000000000..3e18bee7d --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-soroswap.cfg @@ -0,0 +1,65 @@ +# v27 apply-load config: Soroswap Soroban scenario, with meta enabled so the +# output is an ingestible LedgerCloseMeta corpus. +# +# Run in a dedicated work dir: +# stellar-core apply-load --conf +# This emits meta.xdr (a stream of LedgerCloseMeta). The benchmark closes +# exactly APPLY_LOAD_NUM_LEDGERS ledgers at the end, so the *last* +# APPLY_LOAD_NUM_LEDGERS frames of meta.xdr are the corpus. +# +# Scenario profile: 250 soroban (Soroswap swap) txs/ledger + 1000 classic +# payment txs/ledger. + +# Select the apply-load mode and benchmark model transaction. +APPLY_LOAD_MODE="benchmark" +APPLY_LOAD_MODEL_TX="soroswap" + +# Perf knobs (do not affect generated transaction content). +APPLY_LOAD_TIME_WRITES = true +DISABLE_SOROBAN_METRICS_FOR_TESTING = true + +# --- Meta ENABLED (this is what makes the output ingestible) --- +# NB: do NOT set DISABLE_TX_META_FOR_TESTING; BUILD_TESTS forces tx meta on, +# which we want. LOG_FILE_PATH="" sends logs to stdout. +METADATA_OUTPUT_STREAM = 'meta.xdr' +METADATA_DEBUG_LEDGERS = 0 +LOG_FILE_PATH = "" + +# Soroban txs per ledger for this scenario. Soroswap has no batching, so +# APPLY_LOAD_MAX_SOROBAN_TX_COUNT is the soroban TPL directly. +APPLY_LOAD_MAX_SOROBAN_TX_COUNT = 250 + +# Classic payment txs per ledger (same for all three scenarios). +APPLY_LOAD_CLASSIC_TXS_PER_LEDGER = 1000 + +# Number of parallel transaction clusters. +APPLY_LOAD_LEDGER_MAX_DEPENDENT_TX_CLUSTERS = 1 + +# Number of benchmark ledgers to close. +APPLY_LOAD_NUM_LEDGERS = 1000 + +# Disable bucket list pre-generation (not needed for benchmark mode). +APPLY_LOAD_BL_SIMULATED_LEDGERS = 0 +APPLY_LOAD_BL_WRITE_FREQUENCY = 0 +APPLY_LOAD_BL_BATCH_SIZE = 0 +APPLY_LOAD_BL_LAST_BATCH_SIZE = 0 +APPLY_LOAD_BL_LAST_BATCH_LEDGERS = 0 + +# Common apply-load boilerplate. +ARTIFICIALLY_GENERATE_LOAD_FOR_TESTING=true +ENABLE_SOROBAN_DIAGNOSTIC_EVENTS = false +# Must cover MAX_SOROBAN_TX_COUNT + 1 + CLASSIC_TXS_PER_LEDGER (~1251 here). +GENESIS_TEST_ACCOUNT_COUNT = 21000 + +# Minimal core config boilerplate. +UNSAFE_QUORUM=true +HTTP_PORT=11628 +# NB: the apply-load command hard-overrides NETWORK_PASSPHRASE to "Apply Load" +# (stellar-core CommandLine.cpp runApplyLoad), so this value is cosmetic and +# CANNOT be used to make tx hashes differ between scenarios. +NETWORK_PASSPHRASE="Apply Load" +NODE_SEED="SDQVDISRYN2JXBS7ICL7QJAEKB3HWBJFP2QECXG7GZICAHBK4UNJCWK2 self" + +[QUORUM_SET] +THRESHOLD_PERCENT=100 +VALIDATORS=["$self"] diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go index a886540f8..f712c8334 100644 --- a/cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go @@ -6,9 +6,12 @@ import ( "context" "crypto/sha256" "embed" + "encoding/json" "errors" "fmt" "net" + "net/http" + "net/http/httptest" "os" "os/exec" "os/signal" @@ -29,6 +32,7 @@ import ( client "github.com/stellar/go-stellar-sdk/clients/rpcclient" "github.com/stellar/go-stellar-sdk/clients/stellarcore" + "github.com/stellar/go-stellar-sdk/historyarchive" "github.com/stellar/go-stellar-sdk/keypair" protocol "github.com/stellar/go-stellar-sdk/protocols/rpc" proto "github.com/stellar/go-stellar-sdk/protocols/stellarcore" @@ -73,7 +77,8 @@ type TestOnlyRPCConfig struct { } type TestConfig struct { - ProtocolVersion int32 + ProtocolVersion int32 + NetworkPassphrase string // Run a previously released version of RPC (in a container) instead of the current version UseReleasedRPCVersion string // Use/Reuse a SQLite file path @@ -94,6 +99,14 @@ type TestConfig struct { DelayDaemonForLedgerN int // don't start daemon until ledger N reached by core DatastoreConfigFunc func(*config.Config) + + // LoadTest mode swaps the daemon's ingestion from captive-core to a synthetic + // ledger stream and skips all the captive-core/history-archive scaffolding. + LoadTest config.LoadTestConfig + + // HistoryRetentionWindow overrides the daemon's retention window. Zero + // uses the harness default (config.OneDayOfLedgers). + HistoryRetentionWindow uint32 } type TestCorePorts struct { @@ -120,6 +133,10 @@ type Test struct { protocolVersion int32 + networkPassphrase string + + fakeArchiveURL string + rpcConfigFilesDir string sqlitePath string @@ -143,6 +160,10 @@ type Test struct { ignoreLedgerCloseTimes bool datastoreConfigFunc func(*config.Config) + + loadTest config.LoadTestConfig + + historyRetentionWindow uint32 } //nolint:cyclop @@ -152,6 +173,7 @@ func NewTest(t testing.TB, cfg *TestConfig) *Test { } i := &Test{t: t} + i.networkPassphrase = StandaloneNetworkPassphrase i.masterAccount = &txnbuild.SimpleAccount{ AccountID: i.MasterKey().Address(), Sequence: 0, @@ -159,6 +181,7 @@ func NewTest(t testing.TB, cfg *TestConfig) *Test { parallel := true shouldWaitForRPC := true + if cfg != nil { i.rpcContainerVersion = cfg.UseReleasedRPCVersion i.protocolVersion = cfg.ProtocolVersion @@ -167,6 +190,16 @@ func NewTest(t testing.TB, cfg *TestConfig) *Test { parallel = !cfg.NoParallel i.datastoreConfigFunc = cfg.DatastoreConfigFunc i.ignoreLedgerCloseTimes = cfg.IgnoreLedgerCloseTimes + i.loadTest = cfg.LoadTest + i.historyRetentionWindow = cfg.HistoryRetentionWindow + if i.isLoadTestMode() { + // apply-load ledgers have close time of 1970-01-01 + i.ignoreLedgerCloseTimes = true + } + + if cfg.NetworkPassphrase != "" { + i.networkPassphrase = cfg.NetworkPassphrase + } if cfg.OnlyRPC != nil { i.onlyRPC = true @@ -210,10 +243,15 @@ func NewTest(t testing.TB, cfg *TestConfig) *Test { i.rpcConfigFilesDir = i.t.TempDir() i.prepareShutdownHandlers() + if i.isLoadTestMode() { + i.fakeArchiveURL = i.startFakeHistoryArchive() + } if i.areThereContainers() { i.spawnContainers() } - if !i.onlyRPC { + + // skipped in load test mode because it doesn't use a live core + if !i.onlyRPC && !i.isLoadTestMode() { i.coreClient = &stellarcore.Client{URL: "http://" + i.testPorts.CoreHTTPHostPort} i.waitForCore() i.waitForCheckpoint() @@ -231,11 +269,48 @@ func NewTest(t testing.TB, cfg *TestConfig) *Test { i.waitForRPC() } - i.upgradeLimits() // upgrades need preflight so need RPC up + if !i.isLoadTestMode() { + i.upgradeLimits() // upgrades need preflight so need RPC up + } return i } +// isLoadTestMode is used to determine if the harness should skip spawning +// containers, waiting for core readiness, and the protocol-limit upgrade. +func (i *Test) isLoadTestMode() bool { + return i.loadTest.Enabled() +} + +// startFakeHistoryArchive serves a minimal .well-known/stellar-history.json. +// Ingestion consults it only when the DB is empty (to pick a start ledger), +// so a constant CurrentLedger of 1 starts the synthetic stream at ledger 1; +// non-empty DBs derive their start from the DB itself and never read this. +func (i *Test) startFakeHistoryArchive() string { + i.t.Helper() + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/.well-known/stellar-history.json" { + http.NotFound(w, r) + return + } + body, err := json.Marshal(historyarchive.HistoryArchiveState{ + Version: 1, + Server: "stellar-rpc-loadtest-fake", + NetworkPassphrase: i.networkPassphrase, + CurrentLedger: 1, + }) + require.NoError(i.t, err) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(body) + })) + i.t.Cleanup(srv.Close) + return srv.URL +} + func (i *Test) areThereContainers() bool { + // Load-test mode bypasses captive-core entirely, so no containers needed. + if i.isLoadTestMode() { + return false + } return i.runRPCInContainer() || !i.onlyRPC } @@ -357,26 +432,52 @@ func (i *Test) getRPConfigForContainer() rpcConfig { archiveURL: fmt.Sprintf("http://%s:%d", inContainerCoreHostname, inContainerCoreArchivePort), sqlitePath: "/db/" + filepath.Base(i.sqlitePath), captiveCoreHTTPQueryPort: i.testPorts.captiveCoreHTTPQueryPort, + networkPassphrase: i.networkPassphrase, + logLevel: "debug", + historyRetentionWindow: i.historyRetentionWindow, } } +// findCoreBinary returns the stellar-core binary to use: +// STELLAR_RPC_INTEGRATION_TESTS_CAPTIVE_CORE_BIN if set, otherwise +// "stellar-core" from PATH. +func findCoreBinary(t testing.TB) string { + if coreBinaryPath := os.Getenv("STELLAR_RPC_INTEGRATION_TESTS_CAPTIVE_CORE_BIN"); coreBinaryPath != "" { + return coreBinaryPath + } + coreBinaryPath, err := exec.LookPath("stellar-core") + require.NoError(t, err, "stellar-core not found in PATH and STELLAR_RPC_INTEGRATION_TESTS_CAPTIVE_CORE_BIN unset") + return coreBinaryPath +} + func (i *Test) getRPConfigForDaemon() rpcConfig { - coreBinaryPath := os.Getenv("STELLAR_RPC_INTEGRATION_TESTS_CAPTIVE_CORE_BIN") - if coreBinaryPath == "" { - i.t.Fatal("missing STELLAR_RPC_INTEGRATION_TESTS_CAPTIVE_CORE_BIN") + stellarCoreURL := "http://" + i.testPorts.CoreHTTPHostPort + archiveURL := "http://" + i.testPorts.CoreArchiveHostPort + logLevel := "debug" + if i.isLoadTestMode() { + stellarCoreURL = "http://localhost:0" // unreachable + unused in load test mode, must be not empty + archiveURL = i.fakeArchiveURL + // warn makes the benchmark an "ingest-only" metric: prod runs at info, + // where per-request logging adds real overhead (a multi-hour run + // accumulates millions of buffered log lines), but here we measure + // the ingestion pipeline without log-I/O noise. + logLevel = "warn" } return rpcConfig{ // Allocate port dynamically and then figure out what the port is endPoint: "localhost:0", adminEndpoint: "localhost:0", - stellarCoreURL: "http://" + i.testPorts.CoreHTTPHostPort, - coreBinaryPath: coreBinaryPath, + stellarCoreURL: stellarCoreURL, + coreBinaryPath: findCoreBinary(i.t), captiveCoreConfigPath: path.Join(i.rpcConfigFilesDir, captiveCoreConfigFilename), captiveCoreStoragePath: i.captiveCoreStoragePath, - archiveURL: "http://" + i.testPorts.CoreArchiveHostPort, + archiveURL: archiveURL, sqlitePath: i.sqlitePath, captiveCoreHTTPQueryPort: i.testPorts.captiveCoreHTTPQueryPort, ignoreLedgerCloseTimes: i.ignoreLedgerCloseTimes, + networkPassphrase: i.networkPassphrase, + logLevel: logLevel, + historyRetentionWindow: i.historyRetentionWindow, } } @@ -392,6 +493,9 @@ type rpcConfig struct { archiveURL string sqlitePath string ignoreLedgerCloseTimes bool + networkPassphrase string + logLevel string + historyRetentionWindow uint32 } func (vars rpcConfig) toMap() map[string]string { @@ -400,6 +504,10 @@ func (vars rpcConfig) toMap() map[string]string { // If we're ignoring close times, permit absurdly high latencies maxHealthyLedgerLatency = time.Duration(1<<63 - 1).String() } + retentionWindow := strconv.Itoa(config.OneDayOfLedgers) + if vars.historyRetentionWindow > 0 { + retentionWindow = strconv.FormatUint(uint64(vars.historyRetentionWindow), 10) + } return map[string]string{ "ENDPOINT": vars.endPoint, "ADMIN_ENDPOINT": vars.adminEndpoint, @@ -414,12 +522,12 @@ func (vars rpcConfig) toMap() map[string]string { "STELLAR_CAPTIVE_CORE_HTTP_QUERY_SNAPSHOT_LEDGERS": "1", "EMIT_CLASSIC_EVENTS": "true", "FRIENDBOT_URL": FriendbotURL, - "NETWORK_PASSPHRASE": StandaloneNetworkPassphrase, + "NETWORK_PASSPHRASE": vars.networkPassphrase, "HISTORY_ARCHIVE_URLS": vars.archiveURL, - "LOG_LEVEL": "debug", + "LOG_LEVEL": vars.logLevel, "DB_PATH": vars.sqlitePath, "INGESTION_TIMEOUT": "10m", - "HISTORY_RETENTION_WINDOW": strconv.Itoa(config.OneDayOfLedgers), + "HISTORY_RETENTION_WINDOW": retentionWindow, "CHECKPOINT_FREQUENCY": strconv.Itoa(checkpointFrequency), "MAX_HEALTHY_LEDGER_LATENCY": maxHealthyLedgerLatency, "PREFLIGHT_ENABLE_DEBUG": "true", @@ -555,6 +663,13 @@ func (i *Test) createRPCDaemon(c rpcConfig) *daemon.Daemon { i.datastoreConfigFunc(&cfg) } + // LoadTest is configured via a TOML-only options entry (see options.go), + // so the env-var path used by toMap can't carry it. Inject directly, + // post-SetValues — same approach as datastoreConfigFunc above. + if i.isLoadTestMode() { + cfg.LoadTest = i.loadTest + } + logger := supportlog.New() logger.SetOutput(newTestLogWriter(i.t, `rpc="daemon" `)) logger.SetExitFunc(func(code int) { diff --git a/cmd/stellar-rpc/internal/integrationtest/ingest_loadtest_test.go b/cmd/stellar-rpc/internal/integrationtest/ingest_loadtest_test.go new file mode 100644 index 000000000..10122aef3 --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/ingest_loadtest_test.go @@ -0,0 +1,766 @@ +package integrationtest + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "testing" + "time" + + "github.com/pelletier/go-toml" + "github.com/stretchr/testify/require" + + rpcclient "github.com/stellar/go-stellar-sdk/clients/rpcclient" + protocol "github.com/stellar/go-stellar-sdk/protocols/rpc" + "github.com/stellar/go-stellar-sdk/xdr" + + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/config" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/db" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/integrationtest/infrastructure" +) + +const ( + defaultLedgerBundlePath = "./infrastructure/testdata/load-test-ledgers-v27-sac.xdr.zstd" + defaultApplyLoadConfigPath = "./infrastructure/load-test/testdata/apply-load-v27-sac.cfg" +) + +// TestIngestSyntheticLedgers replays apply-load-generated ledger bundles through +// RPC ingestion and asserts the resulting DB matches the workloads that produced +// them. Bundles are built offline by stellar-core apply-load and fetched from S3 +// by the CI workflow. +// +// Requires STELLAR_RPC_INTEGRATION_TESTS_ENABLED=true. Optional: LOADTEST_SQLITE_PATH +// (DB to ingest into; empty = fresh tmp DB), comma-separated LOADTEST_CONFIG_PATH / +// LOADTEST_INGEST_LEDGER_PATH where config i describes bundle i, and +// LOADTEST_MAX_LEDGERS_PER_FILE to cap ledgers replayed from each bundle. The +// backend replays the bundles in order, rebasing each ledger's sequence. +func TestIngestSyntheticLedgers(t *testing.T) { + skipUnlessLoadTestSupported(t) + + ledgerPaths := splitPathList(os.Getenv("LOADTEST_INGEST_LEDGER_PATH")) + if len(ledgerPaths) == 0 { + ledgerPaths = []string{defaultLedgerBundlePath} + } + for _, p := range ledgerPaths { + if _, err := os.Stat(p); err != nil { + t.Skipf("no generated ledger bundle at %q; generate one with stellar-core apply-load "+ + "(or set LOADTEST_INGEST_LEDGER_PATH)", p) + } + } + sqlitePath := os.Getenv("LOADTEST_SQLITE_PATH") + + cfgs, err := loadApplyLoadConfigs(t) + require.NoError(t, err) + require.Len(t, cfgs, len(ledgerPaths), + "LOADTEST_CONFIG_PATH and LOADTEST_INGEST_LEDGER_PATH must have the same number of "+ + "comma-separated entries (config i describes bundle i)") + + prof, err := combineConfigs(cfgs) + require.NoError(t, err) + + runIngestPhase(t, sqlitePath, ledgerPaths, prof, maxLedgersPerFile(t)) +} + +// runIngestPhase boots an RPC daemon ingesting from the bundles, waits for it to +// catch up to the last synthetic ledger, then verifies the range via getTransactions. +// Shutdown is left to t.Cleanup (a manual Close wouldn't run on assertion failure); +// once the bundles are exhausted ErrLoadTestDone halts ingestion while the daemon serves reads. +func runIngestPhase(t *testing.T, sqlitePath string, ledgerPaths []string, prof ingestProfile, maxPerFile uint32) { + t.Helper() + + // The op-count expectations must track the backend's per-file cap. + prof = prof.capPerFile(maxPerFile) + + // Empty path -> fresh tmp DB. + if sqlitePath == "" { + sqlitePath = filepath.Join(t.TempDir(), "stellar-rpc.sqlite") + } + + // Read the pre-test bounds and close the handle immediately: holding an + // idle second SQLite connection through the benchmark invites what-ifs. + sdb, err := db.OpenSQLiteDB(sqlitePath) + require.NoError(t, err) + preTestLast, initialCount, err := getLedgerBounds(t.Context(), sdb) + require.NoError(t, sdb.Close()) + require.NoError(t, err) + + i := infrastructure.NewTest(t, &infrastructure.TestConfig{ + NetworkPassphrase: prof.networkPassphrase, + SQLitePath: sqlitePath, + HistoryRetentionWindow: initialCount, + LoadTest: config.LoadTestConfig{ + Files: ledgerPaths, + Frequency: ingestFrequency(t), + MaxLedgersPerFile: maxPerFile, + }, + }) + startedAt := time.Now().UTC() + client := i.GetRPCLient() + + // Synthetic ledgers append past the DB's pre-test latest (empty DB -> startSeq 1). + startSeq := preTestLast + 1 + endSeq := startSeq + prof.totalLedgers - 1 + + arrivals := waitForIngest(t, client, startSeq, endSeq) + finishedAt := time.Now().UTC() + // Measure from the first observed synthetic ledger to exclude the backend's + // one-time corpus preprocessing (not part of the ingestion code under test). + ingestDuration := arrivals[endSeq].Sub(arrivals[startSeq]) + t.Logf("Ingested %d ledgers in %s", prof.totalLedgers, ingestDuration) + + // Completeness from the DB (every synthetic ledger present, contiguous) plus a + // sampled per-profile op check — cheaper than walking every transaction. + verifyLedgerRange(t, sqlitePath, startSeq, endSeq, prof.totalLedgers) + verifySampledOps(t, client, startSeq, prof.segments) + + versionInfo, err := client.GetVersionInfo(t.Context()) + require.NoError(t, err) + + emitPerfReport(t, perfReportInput{ + startedAt: startedAt, + finishedAt: finishedAt, + ledgerCount: prof.totalLedgers, + initialLedgers: initialCount, + arrivals: arrivals, + startSeq: startSeq, + segments: prof.segments, + captiveCoreVersion: versionInfo.CaptiveCoreVersion, + }) +} + +// ingestFrequency is the backend's ledger emit rate (LOADTEST_INGEST_FREQUENCY, default 1ms). +func ingestFrequency(t *testing.T) time.Duration { + t.Helper() + v := os.Getenv("LOADTEST_INGEST_FREQUENCY") + if v == "" { + return time.Millisecond + } + d, err := time.ParseDuration(v) + require.NoError(t, err, "invalid LOADTEST_INGEST_FREQUENCY") + return d +} + +// maxLedgersPerFile is the per-bundle ceiling on replayed ledgers +// (LOADTEST_MAX_LEDGERS_PER_FILE, default 0 = every ledger in every file). +func maxLedgersPerFile(t *testing.T) uint32 { + t.Helper() + v := os.Getenv("LOADTEST_MAX_LEDGERS_PER_FILE") + if v == "" { + return 0 + } + n, err := strconv.ParseUint(v, 10, 32) + require.NoError(t, err, "invalid LOADTEST_MAX_LEDGERS_PER_FILE") + return uint32(n) +} + +// waitForIngest polls getHealth at 25ms granularity until endSeq has been +// ingested (or LOADTEST_INGEST_DEADLINE, default 30m, passes), recording the +// first time each sequence is observed for per-ledger latency stats. +func waitForIngest(t *testing.T, client *rpcclient.Client, startSeq, endSeq uint32) map[uint32]time.Time { + t.Helper() + + ingestDeadline := 30 * time.Minute + if v := os.Getenv("LOADTEST_INGEST_DEADLINE"); v != "" { + var err error + ingestDeadline, err = time.ParseDuration(v) + require.NoError(t, err, "invalid LOADTEST_INGEST_DEADLINE") + } + arrivals := make(map[uint32]time.Time, endSeq-startSeq+1) + deadline := time.After(ingestDeadline) + tick := time.NewTicker(25 * time.Millisecond) + defer tick.Stop() + latestSeen := startSeq - 1 + + for { + select { + case <-deadline: + t.Fatalf("RPC only ingested through ledger %d; wanted %d within %s", latestSeen, endSeq, ingestDeadline) + case now := <-tick.C: + health, err := client.GetHealth(t.Context()) + if err != nil { + continue + } + // Arrivals are contiguous, so sequences past latestSeen are new. + seen := min(health.LatestLedger, endSeq) + for seq := latestSeen + 1; seq <= seen; seq++ { + arrivals[seq] = now + } + latestSeen = max(latestSeen, seen) + if health.LatestLedger >= endSeq { + return arrivals + } + } + } +} + +// verifyLedgerRange asserts the DB holds exactly want synthetic ledgers across +// [startSeq, endSeq]; count==span with matching bounds confirms no gaps. +func verifyLedgerRange(t *testing.T, sqlitePath string, startSeq, endSeq, want uint32) { + t.Helper() + sdb, err := db.OpenSQLiteDB(sqlitePath) + require.NoError(t, err) + count, lo, hi, err := db.NewLedgerReader(sdb).GetLedgerCountInRange(t.Context(), startSeq, endSeq) + require.NoError(t, sdb.Close()) + require.NoError(t, err) + require.Equal(t, want, count, "want %d synthetic ledgers in [%d,%d], got %d", want, startSeq, endSeq, count) + require.Equal(t, startSeq, lo, "first synthetic ledger") + require.Equal(t, endSeq, hi, "last synthetic ledger") +} + +// verifySampledOps spot-checks the per-ledger op mix at the first, middle, and +// last ledger of each profile segment, instead of walking every transaction. +func verifySampledOps(t *testing.T, client *rpcclient.Client, startSeq uint32, segments []profileSegment) { + t.Helper() + lo := startSeq + for _, seg := range segments { + hi := lo + seg.ledgers - 1 + seen := map[uint32]bool{} + for _, seq := range []uint32{lo, lo + (seg.ledgers-1)/2, hi} { + if seen[seq] { + continue + } + seen[seq] = true + classic, soroban, err := walkTransactionRange(t.Context(), client, seq, seq, 200) + require.NoError(t, err, "sampling ledger %d (%s)", seq, seg.name) + require.EqualValues(t, seg.classicPerLedger, classic, "ledger %d (%s) classic ops", seq, seg.name) + if seg.sorobanPerLedger > 0 { + require.EqualValues(t, seg.sorobanPerLedger, soroban, "ledger %d (%s) soroban ops", seq, seg.name) + } else { + require.Positive(t, soroban, "ledger %d (%s) soroban ops", seq, seg.name) + } + } + lo = hi + 1 + } +} + +// walkTransactionRange pages through getTransactions for ledgers in [lo, hi] and +// counts classic Payment and Soroban ops. Pages are retried so one transient RPC +// error doesn't abort a walk of thousands of pages. +func walkTransactionRange( + ctx context.Context, client *rpcclient.Client, lo, hi uint32, pageLimit uint, +) (int, int, error) { + const pageAttempts = 3 + var countClassic, countSoroban int + cursor := "" + for { + // Empty Cursor (omitempty) covers both the first and subsequent pages. + req := protocol.GetTransactionsRequest{ + Format: protocol.FormatBase64, + Pagination: &protocol.LedgerPaginationOptions{Cursor: cursor, Limit: pageLimit}, + } + if cursor == "" { + req.StartLedger = lo + } + + var resp protocol.GetTransactionsResponse + var err error + for attempt := 1; ; attempt++ { + resp, err = client.GetTransactions(ctx, req) + if err == nil || attempt >= pageAttempts { + break + } + select { + case <-ctx.Done(): + return countClassic, countSoroban, ctx.Err() + case <-time.After(time.Second): + } + } + if err != nil { + return countClassic, countSoroban, err + } + if len(resp.Transactions) == 0 { + return countClassic, countSoroban, nil + } + + for _, tx := range resp.Transactions { + if tx.Ledger > hi { + return countClassic, countSoroban, nil + } + var env xdr.TransactionEnvelope + if err := xdr.SafeUnmarshalBase64(tx.EnvelopeXDR, &env); err != nil { + return countClassic, countSoroban, fmt.Errorf("unmarshalling tx envelope: %w", err) + } + c, s := countOps(env) + countClassic += c + countSoroban += s + } + if resp.Cursor == "" { + return countClassic, countSoroban, nil + } + cursor = resp.Cursor + } +} + +// skipUnlessLoadTestSupported skips unless the integration-test gate is on. +func skipUnlessLoadTestSupported(t *testing.T) { + t.Helper() + if os.Getenv("STELLAR_RPC_INTEGRATION_TESTS_ENABLED") != "true" { + t.Skip("STELLAR_RPC_INTEGRATION_TESTS_ENABLED not set") + } +} + +// countOps counts classic Payment and Soroban ops in one envelope. +func countOps(env xdr.TransactionEnvelope) (int, int) { + var classic, soroban int + for _, op := range env.Operations() { + switch op.Body.Type { + case xdr.OperationTypePayment: + classic++ + case xdr.OperationTypeInvokeHostFunction: + soroban++ + default: + } + } + return classic, soroban +} + +// getLedgerBounds returns the DB's latest ledger sequence and its ledger +// count (zero values for an empty DB). +func getLedgerBounds(ctx context.Context, sdb *db.DB) (uint32, uint32, error) { + r, err := db.NewLedgerReader(sdb).GetLedgerRange(ctx) + if errors.Is(err, db.ErrEmptyDB) { + return 0, 0, nil + } + if err != nil { + return 0, 0, err + } + return r.LastLedger.Sequence, r.LastLedger.Sequence - r.FirstLedger.Sequence + 1, nil +} + +type applyLoadConfigValues struct { + NetworkPassphrase string `toml:"NETWORK_PASSPHRASE"` + NumSynthetic uint32 `toml:"APPLY_LOAD_NUM_LEDGERS"` + NumClassicTxsPerLedger uint32 `toml:"APPLY_LOAD_CLASSIC_TXS_PER_LEDGER"` + Mode string `toml:"APPLY_LOAD_MODE"` + ModelTx string `toml:"APPLY_LOAD_MODEL_TX"` + MaxSorobanTxCount uint32 `toml:"APPLY_LOAD_MAX_SOROBAN_TX_COUNT"` + BatchSacCount uint32 `toml:"APPLY_LOAD_BATCH_SAC_COUNT"` +} + +// sorobanTxsPerLedger returns soroban tx envelopes per benchmark-mode ledger, or 0 +// when not statically known. For "sac", APPLY_LOAD_BATCH_SAC_COUNT transfers share +// one envelope. +func (cfg applyLoadConfigValues) sorobanTxsPerLedger() uint32 { + if cfg.Mode != "benchmark" || cfg.MaxSorobanTxCount == 0 { + return 0 + } + if cfg.ModelTx == "sac" && cfg.BatchSacCount > 1 { + return cfg.MaxSorobanTxCount / cfg.BatchSacCount + } + return cfg.MaxSorobanTxCount +} + +// profileSegment is one bundle's slice of the concatenated ledger stream, in +// bundle order. Per-ledger op counts are uniform within a segment; +// sorobanPerLedger == 0 means the exact soroban count isn't statically known. +type profileSegment struct { + name string + ledgers uint32 + classicPerLedger uint32 + sorobanPerLedger uint32 +} + +// ingestProfile is the combined expectation for an ingest run over one or more +// bundles: how many ledgers, and the per-segment op mix used to sample-verify. +type ingestProfile struct { + networkPassphrase string + totalLedgers uint32 + segments []profileSegment +} + +// combineConfigs folds per-bundle configs into one ingest expectation. Configs +// must agree on the network passphrase: the daemon ingests all bundles under one network. +func combineConfigs(cfgs []loadedConfig) (ingestProfile, error) { + if len(cfgs) == 0 { + return ingestProfile{}, errors.New("no apply-load configs given") + } + prof := ingestProfile{networkPassphrase: cfgs[0].NetworkPassphrase} + for _, cfg := range cfgs { + if cfg.NetworkPassphrase != prof.networkPassphrase { + return ingestProfile{}, fmt.Errorf("config network passphrases differ: %q vs %q", + prof.networkPassphrase, cfg.NetworkPassphrase) + } + prof.segments = append(prof.segments, profileSegment{ + name: cfg.name, + ledgers: cfg.NumSynthetic, + classicPerLedger: cfg.NumClassicTxsPerLedger, + sorobanPerLedger: cfg.sorobanTxsPerLedger(), + }) + } + return prof.capPerFile(0), nil +} + +// capPerFile caps each segment to maxPerFile ledgers (0 = all), as the backend +// does, and re-totals. +func (p ingestProfile) capPerFile(maxPerFile uint32) ingestProfile { + out := ingestProfile{networkPassphrase: p.networkPassphrase} + for _, seg := range p.segments { + if maxPerFile > 0 && maxPerFile < seg.ledgers { + seg.ledgers = maxPerFile + } + out.totalLedgers += seg.ledgers + out.segments = append(out.segments, seg) + } + return out +} + +// splitPathList splits a comma-separated list, trimming whitespace and dropping empties. +func splitPathList(s string) []string { + var out []string + for p := range strings.SplitSeq(s, ",") { + if p = strings.TrimSpace(p); p != "" { + out = append(out, p) + } + } + return out +} + +func TestCombineConfigs(t *testing.T) { + benchmark := applyLoadConfigValues{ + NetworkPassphrase: "Apply Load", + NumSynthetic: 2000, + NumClassicTxsPerLedger: 1000, + Mode: "benchmark", + ModelTx: "sac", + MaxSorobanTxCount: 1000, + BatchSacCount: 1, + } + legacy := applyLoadConfigValues{ + NetworkPassphrase: "Apply Load", + NumSynthetic: 1000, + NumClassicTxsPerLedger: 10, + MaxSorobanTxCount: 1000, // resource-limited, not an exact per-ledger count + } + + prof, err := combineConfigs([]loadedConfig{{benchmark, "a"}, {benchmark, "b"}}) + require.NoError(t, err) + require.EqualValues(t, 4000, prof.totalLedgers) + require.Equal(t, []profileSegment{ + {name: "a", ledgers: 2000, classicPerLedger: 1000, sorobanPerLedger: 1000}, + {name: "b", ledgers: 2000, classicPerLedger: 1000, sorobanPerLedger: 1000}, + }, prof.segments) + + // A non-benchmark config yields sorobanPerLedger 0 (exact count not statically known). + prof, err = combineConfigs([]loadedConfig{{legacy, "a"}}) + require.NoError(t, err) + require.Equal(t, profileSegment{name: "a", ledgers: 1000, classicPerLedger: 10}, prof.segments[0]) + + // Batched SAC transfers share one envelope: 1000 / 10 = 100 soroban tx/ledger. + batched := benchmark + batched.BatchSacCount = 10 + prof, err = combineConfigs([]loadedConfig{{batched, "a"}}) + require.NoError(t, err) + require.EqualValues(t, 100, prof.segments[0].sorobanPerLedger) + + mismatched := benchmark + mismatched.NetworkPassphrase = "Other Network" + _, err = combineConfigs([]loadedConfig{{benchmark, "a"}, {mismatched, "b"}}) + require.ErrorContains(t, err, "network passphrases differ") + + _, err = combineConfigs(nil) + require.Error(t, err) +} + +func TestIngestProfileCapPerFile(t *testing.T) { + full := ingestProfile{segments: []profileSegment{ + {name: "a", ledgers: 1000, classicPerLedger: 1000, sorobanPerLedger: 100}, + {name: "b", ledgers: 500, classicPerLedger: 10, sorobanPerLedger: 0}, + }} + + require.EqualValues(t, 1500, full.capPerFile(0).totalLedgers) // 0 = no cap + + // A per-file ceiling caps each bundle independently, leaving the op mix intact. + capped := full.capPerFile(200) + require.EqualValues(t, 400, capped.totalLedgers) // 200 + 200 + require.Equal(t, []profileSegment{ + {name: "a", ledgers: 200, classicPerLedger: 1000, sorobanPerLedger: 100}, + {name: "b", ledgers: 200, classicPerLedger: 10, sorobanPerLedger: 0}, + }, capped.segments) + + // A ceiling above a file's size caps only the larger file: min(700,1000)+min(700,500). + require.EqualValues(t, 1200, full.capPerFile(700).totalLedgers) +} + +func TestSplitPathList(t *testing.T) { + require.Nil(t, splitPathList("")) + require.Equal(t, []string{"a"}, splitPathList("a")) + require.Equal(t, []string{"a", "b"}, splitPathList(" a , b ,")) +} + +func TestComputeProfilePerf(t *testing.T) { + // Two segments of 3 ledgers starting at seq 10, one arrival every 100ms. + base := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + arrivals := make(map[uint32]time.Time) + for seq := uint32(10); seq <= 15; seq++ { + arrivals[seq] = base.Add(time.Duration(seq-10) * 100 * time.Millisecond) + } + + perf := computeProfilePerf(arrivals, 10, []profileSegment{ + {name: "a", ledgers: 3}, + {name: "b", ledgers: 3}, + }) + require.Len(t, perf, 2) + + // Segment a has no predecessor ledger: its clock starts at its own first + // arrival (excluding preprocessing) and covers 2 ledger ingests. + require.Equal(t, "a", perf[0].Profile) + require.InDelta(t, 0.2, perf[0].WallClockSec, 1e-9) + require.InDelta(t, 10.0, perf[0].LedgersPerSecond, 1e-9) + require.InDelta(t, 100.0, perf[0].MsPerLedger, 1e-9) + + // Segment b's clock starts at segment a's last arrival and covers all 3. + require.Equal(t, "b", perf[1].Profile) + require.InDelta(t, 0.3, perf[1].WallClockSec, 1e-9) + require.InDelta(t, 10.0, perf[1].LedgersPerSecond, 1e-9) + require.InDelta(t, 100.0, perf[1].MsPerLedger, 1e-9) + require.InDelta(t, 100.0, perf[1].PerLedgerLatencyMs.P50, 1e-9) +} + +// loadedConfig is an apply-load config plus the profile name derived from its +// file name (apply-load-v27-oz.cfg -> apply-load-v27-oz). +type loadedConfig struct { + applyLoadConfigValues + + name string +} + +// loadApplyLoadConfigs loads every config in the comma-separated +// LOADTEST_CONFIG_PATH (default: defaultApplyLoadConfigPath) and +// validates each. Fails if any are missing or invalid. +func loadApplyLoadConfigs(t *testing.T) ([]loadedConfig, error) { + t.Helper() + cfgPaths := splitPathList(os.Getenv("LOADTEST_CONFIG_PATH")) + if len(cfgPaths) == 0 { + cfgPaths = []string{defaultApplyLoadConfigPath} + } + cfgs := make([]loadedConfig, 0, len(cfgPaths)) + for _, cfgPath := range cfgPaths { + cfgRaw, err := os.ReadFile(cfgPath) + if err != nil { + return nil, err + } + base := filepath.Base(cfgPath) + cfg := loadedConfig{name: strings.TrimSuffix(base, filepath.Ext(base))} + if err := toml.Unmarshal(cfgRaw, &cfg.applyLoadConfigValues); err != nil { + return nil, fmt.Errorf("%s: %w", cfgPath, err) + } + if cfg.NumSynthetic <= 0 || cfg.NumClassicTxsPerLedger <= 0 { + return nil, fmt.Errorf( + "invalid config %s: need APPLY_LOAD_NUM_LEDGERS, APPLY_LOAD_CLASSIC_TXS_PER_LEDGER > 0", cfgPath) + } + cfgs = append(cfgs, cfg) + } + return cfgs, nil +} + +// --- perf metrics --------------------------------------------------- +// +// When PERF_RESULTS_PATH is set, runIngestPhase writes a JSON report to that +// path summarizing the ingest workload. + +type perfReport struct { + StartedAt string `json:"startedAt"` + FinishedAt string `json:"finishedAt"` + LedgerCount uint32 `json:"ledgerCount"` + // InitialLedgerCount is the DB's pre-corpus ledger count: ingestion cost grows + // with DB size, so runs are only comparable at similar initial sizes. + InitialLedgerCount uint32 `json:"initialLedgerCount"` + IngestWallClockSec float64 `json:"ingestWallClockSeconds"` + LedgersPerSecond float64 `json:"ledgersPerSecond"` + PerLedgerLatencyMs latencyQuantiles `json:"perLedgerLatencyMs"` + Profiles []profilePerf `json:"profiles"` + CaptiveCoreVersion string `json:"captiveCoreVersion"` + + // Markdown-only context read from PERF_* env vars by emitPerfReport; + // excluded from the JSON artifact. + TargetSha string `json:"-"` + RunID string `json:"-"` + Repo string `json:"-"` + GoldenFetchSecs string `json:"-"` +} + +// profilePerf is the ingest measurement for one bundle's segment. +type profilePerf struct { + Profile string `json:"profile"` + Ledgers uint32 `json:"ledgers"` + WallClockSec float64 `json:"wallClockSeconds"` + LedgersPerSecond float64 `json:"ledgersPerSecond"` + MsPerLedger float64 `json:"msPerLedger"` + PerLedgerLatencyMs latencyQuantiles `json:"perLedgerLatencyMs"` +} + +type latencyQuantiles struct { + P50 float64 `json:"p50"` + P95 float64 `json:"p95"` + P99 float64 `json:"p99"` + Min float64 `json:"min"` + Max float64 `json:"max"` + Mean float64 `json:"mean"` +} + +type perfReportInput struct { + startedAt time.Time + finishedAt time.Time + ledgerCount uint32 + initialLedgers uint32 + arrivals map[uint32]time.Time + startSeq uint32 + segments []profileSegment + captiveCoreVersion string +} + +// arrivalDeltas returns per-ledger latency samples (ms) for [lo, hi]: +// arrivals[seq] - arrivals[seq-1]. Sequences whose predecessor was never recorded +// are skipped, excluding the first synthetic ledger's corpus-preprocessing delta. +func arrivalDeltas(arrivals map[uint32]time.Time, lo, hi uint32) []float64 { + deltas := make([]float64, 0, hi-lo+1) + for seq := lo; seq <= hi; seq++ { + prev, hasPrev := arrivals[seq-1] + cur, hasCur := arrivals[seq] + if hasPrev && hasCur { + deltas = append(deltas, float64(cur.Sub(prev).Microseconds())/1000.0) + } + } + return deltas +} + +// computeProfilePerf slices the arrival timeline into per-bundle segments (in +// bundle order) and measures each from its per-ledger latency samples. +func computeProfilePerf(arrivals map[uint32]time.Time, startSeq uint32, segments []profileSegment) []profilePerf { + out := make([]profilePerf, 0, len(segments)) + lo := startSeq + for _, seg := range segments { + hi := lo + seg.ledgers - 1 + out = append(out, perfFromDeltas(seg.name, seg.ledgers, arrivalDeltas(arrivals, lo, hi))) + lo = hi + 1 + } + return out +} + +// perfFromDeltas summarizes per-ledger latency samples: wall-clock is their sum +// (the deltas telescope) and ms/ledger their mean. +func perfFromDeltas(name string, ledgers uint32, deltasMs []float64) profilePerf { + var sumMs float64 + for _, d := range deltasMs { + sumMs += d + } + p := profilePerf{ + Profile: name, + Ledgers: ledgers, + WallClockSec: sumMs / 1000, + PerLedgerLatencyMs: computeQuantiles(deltasMs), + } + if sumMs > 0 { + p.MsPerLedger = sumMs / float64(len(deltasMs)) + p.LedgersPerSecond = 1000 * float64(len(deltasMs)) / sumMs + } + return p +} + +// emitPerfReport writes the perf report as JSON to PERF_RESULTS_PATH and as +// markdown to PERF_RESULTS_MD_PATH; either or both may be unset. +func emitPerfReport(t *testing.T, in perfReportInput) { + t.Helper() + jsonPath := os.Getenv("PERF_RESULTS_PATH") + mdPath := os.Getenv("PERF_RESULTS_MD_PATH") + if jsonPath == "" && mdPath == "" { + return + } + + overallDeltas := arrivalDeltas(in.arrivals, in.startSeq, in.startSeq+in.ledgerCount-1) + overall := perfFromDeltas("overall", in.ledgerCount, overallDeltas) + sha := os.Getenv("PERF_TARGET_SHA") + if len(sha) > 7 { + sha = sha[:7] + } + report := perfReport{ + StartedAt: in.startedAt.Format(time.RFC3339), + FinishedAt: in.finishedAt.Format(time.RFC3339), + LedgerCount: in.ledgerCount, + InitialLedgerCount: in.initialLedgers, + IngestWallClockSec: overall.WallClockSec, + LedgersPerSecond: overall.LedgersPerSecond, + PerLedgerLatencyMs: overall.PerLedgerLatencyMs, + Profiles: computeProfilePerf(in.arrivals, in.startSeq, in.segments), + CaptiveCoreVersion: in.captiveCoreVersion, + TargetSha: sha, + RunID: os.Getenv("PERF_RUN_ID"), + Repo: os.Getenv("PERF_REPO"), + GoldenFetchSecs: os.Getenv("PERF_GOLDEN_FETCH_SECONDS"), + } + + if jsonPath != "" { + data, err := json.MarshalIndent(report, "", " ") + require.NoError(t, err) + require.NoError(t, os.WriteFile(jsonPath, data, 0o644)) + t.Logf("perf report written to %s", jsonPath) + } + if mdPath != "" { + require.NoError(t, os.WriteFile(mdPath, []byte(renderPerfMarkdown(report)), 0o644)) + t.Logf("perf markdown written to %s", mdPath) + } +} + +// renderPerfMarkdown returns the PR-comment table for an ingest run; missing +// context fields render as empty cells. +func renderPerfMarkdown(r perfReport) string { + lines := make([]string, 0, 4+len(r.Profiles)+12) + lines = append(lines, + fmt.Sprintf("### 📈 Ingest load test — `%s`", r.TargetSha), + "", + "| Profile | Ledgers | Wall-clock | Ledgers/sec | ms/ledger | p50 / p95 / p99 ms |", + "|---|---|---|---|---|---|", + ) + for _, p := range r.Profiles { + lines = append(lines, fmt.Sprintf("| %s | %d | %.3fs | %.2f | %.2f | %v / %v / %v |", + p.Profile, p.Ledgers, p.WallClockSec, p.LedgersPerSecond, p.MsPerLedger, + p.PerLedgerLatencyMs.P50, p.PerLedgerLatencyMs.P95, p.PerLedgerLatencyMs.P99)) + } + lines = append(lines, + "", + "| Metric | Value |", + "|---|---|", + fmt.Sprintf("| Ledgers replayed | %d |", r.LedgerCount), + fmt.Sprintf("| Initial DB ledger count | %d |", r.InitialLedgerCount), + fmt.Sprintf("| Overall throughput | %.2f ledgers/sec |", r.LedgersPerSecond), + fmt.Sprintf("| Overall ingest wall-clock | %.3fs |", r.IngestWallClockSec), + fmt.Sprintf("| Per-ledger p50 / p95 / p99 | %v / %v / %v ms |", + r.PerLedgerLatencyMs.P50, r.PerLedgerLatencyMs.P95, r.PerLedgerLatencyMs.P99), + fmt.Sprintf("| Golden DB fetch+decompress | %ss |", r.GoldenFetchSecs), + fmt.Sprintf("| stellar-core | `%s` |", r.CaptiveCoreVersion), + fmt.Sprintf("| Workflow run | [#%s](https://github.com/%s/actions/runs/%s) |", r.RunID, r.Repo, r.RunID), + "", + ) + return strings.Join(lines, "\n") +} + +func computeQuantiles(samplesMs []float64) latencyQuantiles { + if len(samplesMs) == 0 { + return latencyQuantiles{} + } + sorted := make([]float64, len(samplesMs)) + copy(sorted, samplesMs) + sort.Float64s(sorted) + at := func(p float64) float64 { + idx := int(p * float64(len(sorted)-1)) + return sorted[idx] + } + var sum float64 + for _, v := range sorted { + sum += v + } + return latencyQuantiles{ + P50: at(0.50), + P95: at(0.95), + P99: at(0.99), + Min: sorted[0], + Max: sorted[len(sorted)-1], + Mean: sum / float64(len(sorted)), + } +} diff --git a/go.mod b/go.mod index b2f97db58..6dcd7148e 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,10 @@ go 1.25 require ( github.com/Masterminds/squirrel v1.5.4 + github.com/aws/aws-sdk-go-v2 v1.42.0 + github.com/aws/aws-sdk-go-v2/config v1.31.16 + github.com/aws/aws-sdk-go-v2/service/s3 v1.89.1 + github.com/aws/aws-sdk-go-v2/service/ssm v1.69.3 github.com/cenkalti/backoff/v4 v4.3.0 github.com/creachadair/jrpc2 v1.3.3 github.com/fsouza/fake-gcs-server v1.49.2 @@ -17,7 +21,7 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 - github.com/stellar/go-stellar-sdk v0.6.0 + github.com/stellar/go-stellar-sdk v0.5.1-0.20260618200753-4daf27b6f1bf github.com/stretchr/testify v1.11.1 ) @@ -29,25 +33,22 @@ require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.54.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.54.0 // indirect - github.com/aws/aws-sdk-go-v2 v1.39.5 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 // indirect - github.com/aws/aws-sdk-go-v2/config v1.31.16 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.18.20 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.12 // indirect github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.12 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.12 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.3 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.12 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.29 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.12 // indirect - github.com/aws/aws-sdk-go-v2/service/s3 v1.89.1 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.30.0 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.4 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.39.0 // indirect - github.com/aws/smithy-go v1.23.1 // indirect + github.com/aws/smithy-go v1.27.1 // indirect github.com/cncf/xds/go v0.0.0-20251031190108-5cf4b1949528 // indirect github.com/envoyproxy/go-control-plane/envoy v1.35.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect @@ -59,6 +60,7 @@ require ( github.com/pkg/xattr v0.4.9 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/spiffe/go-spiffe/v2 v2.6.0 // indirect + github.com/xdrpp/goxdr v0.1.1 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.38.0 // indirect go.opentelemetry.io/otel/sdk v1.38.0 // indirect @@ -98,7 +100,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jmoiron/sqlx v1.3.5 // indirect - github.com/klauspost/compress v1.18.1 // indirect + github.com/klauspost/compress v1.18.1 github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/lib/pq v1.10.9 // indirect diff --git a/go.sum b/go.sum index 0315adfb4..4dcdaf41a 100644 --- a/go.sum +++ b/go.sum @@ -88,8 +88,8 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3d github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= github.com/aws/aws-sdk-go v1.55.8 h1:JRmEUbU52aJQZ2AjX4q4Wu7t4uZjOu71uyNmaWlUkJQ= github.com/aws/aws-sdk-go v1.55.8/go.mod h1:ZkViS9AqA6otK+JBBNH2++sx1sgxrPKcSzPPvQkUtXk= -github.com/aws/aws-sdk-go-v2 v1.39.5 h1:e/SXuia3rkFtapghJROrydtQpfQaaUgd1cUvyO1mp2w= -github.com/aws/aws-sdk-go-v2 v1.39.5/go.mod h1:yWSxrnioGUZ4WVv9TgMrNUeLV3PFESn/v+6T/Su8gnM= +github.com/aws/aws-sdk-go-v2 v1.42.0 h1:XvXMJTkFQtpBKIWZnmr9ZEOc2InWM2yldjXEJ/bymhA= +github.com/aws/aws-sdk-go-v2 v1.42.0/go.mod h1:27+ACypSLljLAEKsCYOmrjKh83vuTRkuAe9Uv/3A4bg= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 h1:t9yYsydLYNBk9cJ73rgPhPWqOh/52fcWDQB5b1JsKSY= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2/go.mod h1:IusfVNTmiSN3t4rhxWFaBAqn+mcNdwKtPcV16eYdgko= github.com/aws/aws-sdk-go-v2/config v1.31.16 h1:E4Tz+tJiPc7kGnXwIfCyUj6xHJNpENlY11oKpRTgsjc= @@ -100,32 +100,34 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.12 h1:VO3FIM2TDbm0kqp6sFNR0P github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.12/go.mod h1:6C39gB8kg82tx3r72muZSrNhHia9rjGkX7ORaS2GKNE= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 h1:9/HxDeIgA7DcKK6e6ZaP5PQiXugYbNERx3Z5u30mN+k= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2/go.mod h1:3N1RoxKNcVHmbOKVMMw8pvMs5TUhGYPQP/aq1zmAWqo= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 h1:p/9flfXdoAnwJnuW9xHEAFY22R3A6skYkW19JFF9F+8= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12/go.mod h1:ZTLHakoVCTtW8AaLGSwJ3LXqHD9uQKnOcv1TrpO6u2k= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 h1:2lTWFvRcnWFFLzHWmtddu5MTchc5Oj2OOey++99tPZ0= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12/go.mod h1:hI92pK+ho8HVcWMHKHrK3Uml4pfG7wvL86FzO0LVtQQ= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 h1:f3vKqSo13fhTYb+JEcXwXefZQE26I1FB5eTSniU67ko= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29/go.mod h1:MzoLFUArKGpGD+ukmPiTPG1X5x4o6M2kq4v2dr1FiEc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 h1:RdwIf/CuUsvJX3RgJagbOyotl/cxoLY4xviKuE7p2GY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29/go.mod h1:71wt8W2EgswdZy9Mf9KNnzxZ3TiZlv4caKghPktDOkA= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc= github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.12 h1:itu4KHu8JK/N6NcLIISlf3LL1LccMqruLUXZ9y7yBZw= github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.12/go.mod h1:i+6vTU3xziikTY3vcox23X8pPGW5X3wVgd1VZ7ha+x8= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 h1:xtuxji5CS0JknaXoACOunXOYOQzgfTvGAc9s2QdCJA4= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2/go.mod h1:zxwi0DIR0rcRcgdbl7E2MSOvxDyyXGBlScvBkARFaLQ= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.12 h1:ZD2+BSw9vFsNlKYIasSNt3uDbjqqXIBcM13UJv/Lx2k= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.12/go.mod h1:Ms4zlcVBbXbiP7EVLhl+lgjvA/a7YphqQ3Ih3174EmI= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.3 h1:NEe7FaViguRQEm8zl8Ay/kC/QRsMtWUiCGZajQIsLdc= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.3/go.mod h1:JLuCKu5VfiLBBBl/5IzZILU7rxS0koQpHzMOCzycOJU= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.12 h1:MM8imH7NZ0ovIVX7D2RxfMDv7Jt9OiUXkcQ+GqywA7M= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.12/go.mod h1:gf4OGwdNkbEsb7elw2Sy76odfhwNktWII3WgvQgQQ6w= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.29 h1:DRebniUGZ2MqiiIVmQJ04vIXr918hubdHMnarSLEWyU= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.29/go.mod h1:LfRkPCD8YHDM2E5eTkos2UpwYeZnBcVarTa8L59bJHA= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.12 h1:R3uW0iKl8rgNEXNjVGliW/oMEh9fO/LlUEV8RvIFr1I= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.12/go.mod h1:XEttbEr5yqsw8ebi7vlDoGJJjMXRez4/s9pibpJyL5s= github.com/aws/aws-sdk-go-v2/service/s3 v1.89.1 h1:Dq82AV+Qxpno/fG162eAhnD8d48t9S+GZCfz7yv1VeA= github.com/aws/aws-sdk-go-v2/service/s3 v1.89.1/go.mod h1:MbKLznDKpf7PnSonNRUVYZzfP0CeLkRIUexeblgKcU4= +github.com/aws/aws-sdk-go-v2/service/ssm v1.69.3 h1:58LjP8cp8UEHA1LG/JZ4fG9SobHE82kLYe46mogbSI4= +github.com/aws/aws-sdk-go-v2/service/ssm v1.69.3/go.mod h1:16Zd02ocSJp68o4r36MQ4Rikf/Ulv4On5qjMpJJf5Mo= github.com/aws/aws-sdk-go-v2/service/sso v1.30.0 h1:xHXvxst78wBpJFgDW07xllOx0IAzbryrSdM4nMVQ4Dw= github.com/aws/aws-sdk-go-v2/service/sso v1.30.0/go.mod h1:/e8m+AO6HNPPqMyfKRtzZ9+mBF5/x1Wk8QiDva4m07I= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.4 h1:tBw2Qhf0kj4ZwtsVpDiVRU3zKLvjvjgIjHMKirxXg8M= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.4/go.mod h1:Deq4B7sRM6Awq/xyOBlxBdgW8/Z926KYNNaGMW2lrkA= github.com/aws/aws-sdk-go-v2/service/sts v1.39.0 h1:C+BRMnasSYFcgDw8o9H5hzehKzXyAb9GY5v/8bP9DUY= github.com/aws/aws-sdk-go-v2/service/sts v1.39.0/go.mod h1:4EjU+4mIx6+JqKQkruye+CaigV7alL3thVPfDd9VlMs= -github.com/aws/smithy-go v1.23.1 h1:sLvcH6dfAFwGkHLZ7dGiYF7aK6mg4CgKA/iDKjLDt9M= -github.com/aws/smithy-go v1.23.1/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= +github.com/aws/smithy-go v1.27.1 h1:4T340VFndXtADGF52gYa1POyL7s9E4Z1OeZ1hCscIw8= +github.com/aws/smithy-go v1.27.1/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= @@ -429,8 +431,8 @@ github.com/spf13/viper v1.17.0 h1:I5txKw7MJasPL/BrfkbA0Jyo/oELqVmux4pR/UxOMfI= github.com/spf13/viper v1.17.0/go.mod h1:BmMMMLQXSbcHK6KAOiFLz0l5JHrU89OdIRHvsk0+yVI= github.com/spiffe/go-spiffe/v2 v2.6.0 h1:l+DolpxNWYgruGQVV0xsfeya3CsC7m8iBzDnMpsbLuo= github.com/spiffe/go-spiffe/v2 v2.6.0/go.mod h1:gm2SeUoMZEtpnzPNs2Csc0D/gX33k1xIx7lEzqblHEs= -github.com/stellar/go-stellar-sdk v0.6.0 h1:NM2oqZJQup0QxnJMq6C8s4iIIhU6rHFX0rlsF3wh/Ho= -github.com/stellar/go-stellar-sdk v0.6.0/go.mod h1:IkcqcrE9UQi7n/1y+MxKB+7qzdjG1T2kGOD7Ss8dqjw= +github.com/stellar/go-stellar-sdk v0.5.1-0.20260618200753-4daf27b6f1bf h1:GCIPikMaf5pF6LMGRQmYd+D9Rq1zC0bXIAl96k8Lzhg= +github.com/stellar/go-stellar-sdk v0.5.1-0.20260618200753-4daf27b6f1bf/go.mod h1:wH+L/1lnWZYUJQei6HF0QhIvs7chSHOapizIJDp66SE= github.com/stellar/go-xdr v0.0.0-20260529210834-0bf8f4956364 h1:gOKrfuWdZ92LFlv0TAwgZ7OsWKeBsOMDlGLyFgduI1w= github.com/stellar/go-xdr v0.0.0-20260529210834-0bf8f4956364/go.mod h1:If+U9Z1W5xU97VrOgJandQT+2dN7/iOpkCrxBJEyF80= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=