diff --git a/Taskfile.yml b/Taskfile.yml index c6dfdbafd..3bc3c8f4c 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -96,7 +96,7 @@ tasks: host:postgres:start: desc: Run a postgres container cmds: - - docker start kwil-postgres || docker run -d -p 5432:5432 --name kwil-postgres -e "POSTGRES_HOST_AUTH_METHOD=trust" kwildb/postgres:latest + - docker start kwil-postgres || docker run -d -p 5432:5432 --name kwil-postgres -e "POSTGRES_HOST_AUTH_METHOD=trust" ghcr.io/trufnetwork/kwil-postgres:latest host:indexer:start: desc: Run the indexer diff --git a/compose.yaml b/compose.yaml index f7bfa1023..d7de8d3d7 100644 --- a/compose.yaml +++ b/compose.yaml @@ -1,6 +1,6 @@ services: kwil-postgres: - image: "kwildb/postgres:16.8-1" + image: "ghcr.io/trufnetwork/kwil-postgres:16.8-1" hostname: kwil-postgres shm_size: 2G restart: unless-stopped @@ -28,7 +28,7 @@ services: tn-db: container_name: tn-db hostname: tn-db - image: "ghcr.io/trufnetwork/node:local" + image: "ghcr.io/trufnetwork/node:${TN_IMAGE:-local}" restart: unless-stopped build: context: . @@ -37,9 +37,9 @@ services: CONFIG_PATH: /root/.kwild # app.pg-db-host KWILD_DB_HOST: kwil-postgres - # Optionally supply SETUP_DB_OWNER to override the owner derived from the generated node key - SETUP_DB_OWNER: ${SETUP_DB_OWNER:-} - SETUP_CHAIN_ID: ${SETUP_CHAIN_ID:-trufnetwork-dev} + # Optionally supply SETUP_DB_OWNER to override the owner derived from the generated node key + SETUP_DB_OWNER: ${SETUP_DB_OWNER:-} + SETUP_CHAIN_ID: ${SETUP_CHAIN_ID:-trufnetwork-dev} ports: - "50051:50051" - "${TN_RPC_PORT:-8484}:8484" diff --git a/deployments/Dockerfile b/deployments/Dockerfile index d7b8906c8..ac0c5bf88 100644 --- a/deployments/Dockerfile +++ b/deployments/Dockerfile @@ -17,10 +17,20 @@ COPY deployments/tn-entrypoint.sh ./deployments/tn-entrypoint.sh # todo: incorporate task build process, otherwise images will lack information about the build RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o /app/.build/kwild /app/cmd/kwild/main.go -FROM busybox:1.35.0-uclibc as busybox -# busy box will provide us with a shell to run commands in distroless +FROM postgres:16-alpine AS pg_repack_builder -FROM alpine:latest +# build pg_repack against postgres 16 client libraries +RUN apk add --no-cache build-base clang19 gawk llvm19 curl zlib-dev readline-dev openssl-dev lz4-dev zstd-dev && \ + curl -fsSL -o /tmp/pg_repack.tar.gz https://codeload.github.com/reorg/pg_repack/tar.gz/refs/tags/ver_1.5.2 && \ + tar -xzf /tmp/pg_repack.tar.gz && rm /tmp/pg_repack.tar.gz && \ + cd pg_repack-ver_1.5.2 && \ + make USE_PGXS=1 PG_CONFIG=/usr/local/bin/pg_config && \ + make USE_PGXS=1 PG_CONFIG=/usr/local/bin/pg_config install && \ + PG_BINDIR="$(/usr/local/bin/pg_config --bindir)" && \ + install -Dm755 "$PG_BINDIR/pg_repack" /opt/pg_repack/bin/pg_repack && \ + cd .. && rm -rf pg_repack-ver_1.5.2 + +FROM alpine:3.21 ENV SETUP_CHAIN_ID=truflation-dev ENV SETUP_DB_OWNER= @@ -29,9 +39,12 @@ ENV CONFIG_PATH=/root/.kwild WORKDIR /app -# add postgres client tools +# add postgres client tools and the pg_repack binary used by tn_vacuum RUN apk add --no-cache postgresql16-client +# copy pg_repack CLI from build stage +COPY --from=pg_repack_builder /opt/pg_repack/bin/pg_repack /usr/local/bin/pg_repack + # move .build content to /app COPY --from=build /app/.build/* /app/ diff --git a/deployments/dev-net/devnet-compose.yaml b/deployments/dev-net/devnet-compose.yaml index 7eaa2892f..e2dcfbd62 100644 --- a/deployments/dev-net/devnet-compose.yaml +++ b/deployments/dev-net/devnet-compose.yaml @@ -33,7 +33,7 @@ x-common-logging: &common_logging services: kwil-postgres-1: - image: "kwildb/postgres:16.8-1" + image: "ghcr.io/trufnetwork/kwil-postgres:16.8-1" ports: - "5432:5432" <<: *postgres_common @@ -42,7 +42,7 @@ services: source: data-kwil-postgres-1 kwil-postgres-2: - image: "kwildb/postgres:16.8-1" + image: "ghcr.io/trufnetwork/kwil-postgres:16.8-1" ports: - "5433:5432" <<: *postgres_common diff --git a/deployments/infra/README.md b/deployments/infra/README.md index a551bdf8c..2821a43d5 100644 --- a/deployments/infra/README.md +++ b/deployments/infra/README.md @@ -331,7 +331,7 @@ The AMI includes: - **Base OS**: Ubuntu 24.04 LTS - **Docker**: Latest Docker CE with docker-compose - **TRUF.NETWORK Stack**: - - PostgreSQL (kwildb/postgres:16.8-1) + - PostgreSQL (ghcr.io/trufnetwork/kwil-postgres:16.8-1) - TRUF.NETWORK Node - ⚠️ To be added when ghcr image is published - PostgreSQL MCP Server (crystaldba/postgres-mcp:latest) - Will need to be adjusted later on - **Configuration Scripts**: @@ -382,4 +382,4 @@ aws logs describe-log-groups --log-group-name-prefix /aws/imagebuilder ## Important -Always use these commands responsibly, especially in non-production environments. Remember to delete the stack after testing to avoid unnecessary AWS charges. \ No newline at end of file +Always use these commands responsibly, especially in non-production environments. Remember to delete the stack after testing to avoid unnecessary AWS charges. diff --git a/deployments/infra/stacks/docker-compose.template.yml b/deployments/infra/stacks/docker-compose.template.yml index 8752213f8..9555b09b9 100644 --- a/deployments/infra/stacks/docker-compose.template.yml +++ b/deployments/infra/stacks/docker-compose.template.yml @@ -1,6 +1,6 @@ services: tn-postgres: - image: kwildb/postgres:latest + image: ghcr.io/trufnetwork/kwil-postgres:latest container_name: tn-postgres environment: POSTGRES_HOST_AUTH_METHOD: trust @@ -106,4 +106,4 @@ services: networks: tn-network: name: tn-network - driver: bridge \ No newline at end of file + driver: bridge diff --git a/docs/container-image-guide.md b/docs/container-image-guide.md index 15ed740f4..675fe30fa 100644 --- a/docs/container-image-guide.md +++ b/docs/container-image-guide.md @@ -7,7 +7,7 @@ Run the TRUF.NETWORK node container with Docker Compose while keeping the standa ## Prerequisites - Docker Engine 24+ with the `docker compose` plugin. -- Pull access to `ghcr.io/trufnetwork/node` and `kwildb/postgres`. +- Pull access to `ghcr.io/trufnetwork/node` and `ghcr.io/trufnetwork/kwil-postgres`. - Optional: the [`kwild` CLI](https://github.com/trufnetwork/node/releases) if you want to pre-populate configuration files instead of using the container’s auto-initialization path. ## 1. Prepare the workspace @@ -26,7 +26,7 @@ Create `docker-compose.yml` in `~/truf-node` using the minimal stack below. ```yaml services: postgres: - image: kwildb/postgres:16.8-1 + image: ghcr.io/trufnetwork/kwil-postgres:latest restart: unless-stopped environment: POSTGRES_HOST_AUTH_METHOD: trust diff --git a/docs/examples/mcp-reverse-proxy/README.md b/docs/examples/mcp-reverse-proxy/README.md index 43a20a389..d2f5dadc4 100644 --- a/docs/examples/mcp-reverse-proxy/README.md +++ b/docs/examples/mcp-reverse-proxy/README.md @@ -28,7 +28,7 @@ This directory contains ready-to-use configuration examples for deploying the TR 2. **Create environment file:** ```bash cat > .env << EOF - # TRUF.NETWORK uses kwildb/postgres image which auto-creates kwild user/database + # TRUF.NETWORK uses ghcr.io/trufnetwork/kwil-postgres image which auto-creates kwild user/database # Note: No password needed - uses POSTGRES_HOST_AUTH_METHOD=trust DOMAIN=mcp.your-domain.com ACME_EMAIL=admin@your-domain.com @@ -276,7 +276,7 @@ Have a working configuration for another reverse proxy? Please contribute: git clone https://github.com/trufnetwork/node.git cd node/docs/examples/mcp-reverse-proxy -# Configure environment (kwildb/postgres auto-creates kwild user/database) +# Configure environment (ghcr.io/trufnetwork/kwil-postgres auto-creates kwild user/database) cat > .env << EOF DOMAIN=mcp.your-domain.com ACME_EMAIL=admin@your-domain.com @@ -291,4 +291,4 @@ docker compose logs -f mcp-server # Test curl -H "Accept: text/event-stream" http://your-domain.com/sse -``` \ No newline at end of file +``` diff --git a/docs/examples/mcp-reverse-proxy/docker-compose.sse.yaml b/docs/examples/mcp-reverse-proxy/docker-compose.sse.yaml index 441c9d807..7c5640806 100644 --- a/docs/examples/mcp-reverse-proxy/docker-compose.sse.yaml +++ b/docs/examples/mcp-reverse-proxy/docker-compose.sse.yaml @@ -13,7 +13,7 @@ version: '3.8' services: # PostgreSQL Database (Kwil-configured image for TRUF.NETWORK) postgres: - image: kwildb/postgres:latest + image: ghcr.io/trufnetwork/kwil-postgres:latest container_name: tn-postgres restart: unless-stopped environment: @@ -200,8 +200,8 @@ volumes: # Example environment file (.env) # Copy to .env and customize: # -# # Database settings (TRUF.NETWORK uses kwildb/postgres image with trust auth) -# # The kwildb/postgres image automatically creates kwild user and database +# # Database settings (TRUF.NETWORK uses ghcr.io/trufnetwork/kwil-postgres image with trust auth) +# # The ghcr.io/trufnetwork/kwil-postgres image automatically creates kwild user and database # # Note: No password needed due to POSTGRES_HOST_AUTH_METHOD=trust # # # MCP Server settings @@ -251,4 +251,4 @@ volumes: # docker compose -f docker-compose.sse.yaml down # # 7. Full cleanup: -# docker compose -f docker-compose.sse.yaml down -v --remove-orphans \ No newline at end of file +# docker compose -f docker-compose.sse.yaml down -v --remove-orphans diff --git a/docs/examples/mcp-reverse-proxy/traefik.yml.example b/docs/examples/mcp-reverse-proxy/traefik.yml.example index 7daad3632..eef77fa68 100644 --- a/docs/examples/mcp-reverse-proxy/traefik.yml.example +++ b/docs/examples/mcp-reverse-proxy/traefik.yml.example @@ -267,7 +267,7 @@ services: - "traefik.http.middlewares.dashboard-auth.basicauth.users=admin:$$apr1$$YOUR_HASHED_PASSWORD_HERE" # Generate with: htpasswd -nb admin your-secure-password postgres: - image: kwildb/postgres:latest + image: ghcr.io/trufnetwork/kwil-postgres:latest restart: unless-stopped environment: - POSTGRES_HOST_AUTH_METHOD=trust @@ -310,4 +310,4 @@ networks: volumes: postgres-data: - traefik-logs: \ No newline at end of file + traefik-logs: diff --git a/docs/mcp-reverse-proxy.md b/docs/mcp-reverse-proxy.md index 5a9d0accc..ebc1d855e 100644 --- a/docs/mcp-reverse-proxy.md +++ b/docs/mcp-reverse-proxy.md @@ -252,7 +252,7 @@ LoadModule headers_module modules/mod_headers.so ## MCP Server SSE Configuration -**Important**: TRUF.NETWORK uses the Kwil PostgreSQL Docker image (`kwildb/postgres`) which is configured with `POSTGRES_HOST_AUTH_METHOD=trust` and automatically creates a `kwild` database with a `kwild` user. This matches the standard node setup described in the [Node Operator Guide](./node-operator-guide.md). +**Important**: TRUF.NETWORK uses the Kwil PostgreSQL Docker image (`ghcr.io/trufnetwork/kwil-postgres`) which is configured with `POSTGRES_HOST_AUTH_METHOD=trust` and automatically creates a `kwild` database with a `kwild` user. This matches the standard node setup described in the [Node Operator Guide](./node-operator-guide.md). ### Starting the MCP Server with SSE @@ -650,4 +650,4 @@ After implementing reverse proxy configuration: 4. **Plan Scaling**: Consider load balancing for high-traffic scenarios 5. **Security Audits**: Regular security reviews and updates -For additional support with MCP server deployment, consult the [Node Operator Guide](./node-operator-guide.md) and [MCP Server Documentation](./mcp-server.md). \ No newline at end of file +For additional support with MCP server deployment, consult the [Node Operator Guide](./node-operator-guide.md) and [MCP Server Documentation](./mcp-server.md). diff --git a/docs/node-operator-guide.md b/docs/node-operator-guide.md index b8f1d90e3..4755fd3e9 100644 --- a/docs/node-operator-guide.md +++ b/docs/node-operator-guide.md @@ -56,7 +56,7 @@ The PostgreSQL client (`psql`) is required for database operations, and the `pg_ #### For Linux (Ubuntu/Debian) ```bash -sudo apt-get install -y postgresql-client-16 +sudo apt-get install -y postgresql-client-16 postgresql-16-repack ``` #### For macOS @@ -64,7 +64,7 @@ sudo apt-get install -y postgresql-client-16 You can install the PostgreSQL client using [Homebrew](https://brew.sh/). If you don't have Homebrew, install it first by following the instructions on their website. ```bash -brew install postgresql@16 +brew install postgresql@16 pg_repack ``` To use it from any terminal, you may need to add it to your `PATH`. For `zsh` (the default in modern macOS): @@ -226,7 +226,7 @@ docker run -d -p 127.0.0.1:5432:5432 --name tn-postgres \ -e "POSTGRES_HOST_AUTH_METHOD=trust" \ -v tn-pgdata:/var/lib/postgresql/data \ --shm-size=1gb \ - kwildb/postgres:latest + ghcr.io/trufnetwork/kwil-postgres:latest ``` > **Warning**: Critical Security Requirements @@ -487,6 +487,26 @@ For complete configuration options (stream lists, schedules, metrics, troublesho [extensions/tn_cache/README.md#operations--monitoring](../extensions/tn_cache/README.md#operations--monitoring) +### Vacuum Extension (tn_vacuum) + +The `tn_vacuum` extension provides **automated database maintenance** through periodic vacuuming operations. It helps reclaim disk space and optimize database performance by removing dead tuples using `pg_repack`. + +> **Note:** If you're using the official TrufNetwork node image, `pg_repack` is already included. For custom installations, see the installation guide in the extension documentation. + +**Quick enable** + +```toml +[extensions.tn_vacuum] +enabled = true +block_interval = 50000 # runs vacuum every 50k blocks +``` + +After editing `config.toml`, restart `kwild` for the change to take effect. + +For tuning guidance, metrics, and troubleshooting, see the full documentation: + +[extensions/tn_vacuum/README.md](../extensions/tn_vacuum/README.md) + ### 7. Become a Validator (Optional) To upgrade your node to a validator: @@ -577,19 +597,19 @@ For Ubuntu/Debian: ```bash sudo apt-get update -sudo apt-get install postgresql-client-16 +sudo apt-get install postgresql-client-16 postgresql-16-repack ``` For CentOS/RHEL: ```bash -sudo yum install postgresql16 +sudo yum install postgresql16 pg_repack_16 ``` For macOS (using Homebrew): ```bash -brew install postgresql@16 +brew install postgresql@16 pg_repack ``` Verify the installation: @@ -888,7 +908,7 @@ Sometimes you may need to reset your node to sync from a specific point or recov -e "POSTGRES_HOST_AUTH_METHOD=trust" \ -v tn-pgdata:/var/lib/postgresql/data \ --shm-size=1gb \ - kwildb/postgres:latest + ghcr.io/trufnetwork/kwil-postgres:latest ``` 8. **Re-enable and start services**: @@ -985,6 +1005,3 @@ private = true ``` For more details, see the [Kwil Private RPC documentation](http://docs.kwil.com/docs/node/private-rpc). - - - diff --git a/docs/node-upgrade-guide.md b/docs/node-upgrade-guide.md index f2444b568..ea5e3e029 100644 --- a/docs/node-upgrade-guide.md +++ b/docs/node-upgrade-guide.md @@ -203,7 +203,7 @@ kwild blacklist list ## What About PostgreSQL & Other Components? *Minor* Kwil releases do **not** require a database upgrade. -If the release notes specify a new official Postgres image (e.g. `kwildb/postgres:x.y-z`) you can recreate the container at your convenience – data volumes are preserved. +If the release notes specify a new official Postgres image (e.g. `ghcr.io/trufnetwork/kwil-postgres:x.y-z`) you can recreate the container at your convenience – data volumes are preserved. --- diff --git a/extensions/leaderwatch/constants.go b/extensions/leaderwatch/constants.go new file mode 100644 index 000000000..5a758880c --- /dev/null +++ b/extensions/leaderwatch/constants.go @@ -0,0 +1,3 @@ +package leaderwatch + +const ExtensionName = "leaderwatch" diff --git a/extensions/leaderwatch/leaderwatch.go b/extensions/leaderwatch/leaderwatch.go new file mode 100644 index 000000000..bc83b09ab --- /dev/null +++ b/extensions/leaderwatch/leaderwatch.go @@ -0,0 +1,170 @@ +package leaderwatch + +import ( + "context" + "fmt" + "sync" + + "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/core/log" + "github.com/trufnetwork/kwil-db/extensions/hooks" +) + +type Callbacks struct { + OnAcquire func(ctx context.Context, app *common.App, block *common.BlockContext) + OnLose func(ctx context.Context, app *common.App, block *common.BlockContext) + OnEndBlock func(ctx context.Context, app *common.App, block *common.BlockContext) +} + +type watcher struct { + callbacks Callbacks + isLeader bool +} + +type extension struct { + mu sync.RWMutex + logger log.Logger + service *common.Service + watchers map[string]*watcher + order []string +} + +var ( + extOnce sync.Once + extInst *extension +) + +func getExtension() *extension { + extOnce.Do(func() { + extInst = &extension{ + logger: log.New(log.WithLevel(log.LevelInfo)).New(ExtensionName), + watchers: make(map[string]*watcher), + } + }) + return extInst +} + +func InitializeExtension() { + if err := hooks.RegisterEngineReadyHook(ExtensionName+"_engine_ready", engineReadyHook); err != nil { + panic(fmt.Sprintf("failed to register %s engine ready hook: %v", ExtensionName, err)) + } + if err := hooks.RegisterEndBlockHook(ExtensionName+"_end_block", endBlockHook); err != nil { + panic(fmt.Sprintf("failed to register %s end block hook: %v", ExtensionName, err)) + } +} + +func engineReadyHook(ctx context.Context, app *common.App) error { + ext := getExtension() + if app != nil && app.Service != nil { + ext.mu.Lock() + ext.logger = app.Service.Logger.New(ExtensionName) + ext.service = app.Service + ext.mu.Unlock() + } + return nil +} + +func endBlockHook(ctx context.Context, app *common.App, block *common.BlockContext) error { + ext := getExtension() + + isLeader := determineLeader(app, block) + + ext.mu.Lock() + var svc *common.Service + var logger log.Logger + if app != nil { + svc = app.Service + } + if svc != nil { + logger = svc.Logger.New(ExtensionName) + } else { + logger = ext.logger + } + ext.service = svc + ext.logger = logger + + updates := make([]struct { + callbacks Callbacks + change int + }, 0, len(ext.order)) + + for _, name := range ext.order { + w, ok := ext.watchers[name] + if !ok { + continue + } + change := 0 + if w.isLeader != isLeader { + w.isLeader = isLeader + if isLeader { + change = 1 + } else { + change = -1 + } + } + updates = append(updates, struct { + callbacks Callbacks + change int + }{callbacks: w.callbacks, change: change}) + } + callbacks := make([]Callbacks, len(updates)) + changes := make([]int, len(updates)) + for i, u := range updates { + callbacks[i] = u.callbacks + changes[i] = u.change + } + ext.mu.Unlock() + + for i, cb := range callbacks { + switch changes[i] { + case 1: + if cb.OnAcquire != nil { + cb.OnAcquire(ctx, app, block) + } + case -1: + if cb.OnLose != nil { + cb.OnLose(ctx, app, block) + } + } + if cb.OnEndBlock != nil { + cb.OnEndBlock(ctx, app, block) + } + } + + return nil +} + +func determineLeader(app *common.App, block *common.BlockContext) bool { + if app == nil || app.Service == nil || block == nil || block.ChainContext == nil || block.ChainContext.NetworkParameters == nil { + return false + } + nodeID := app.Service.Identity + leader := block.ChainContext.NetworkParameters.Leader + if len(nodeID) == 0 || leader.PublicKey == nil { + return false + } + return string(nodeID) == string(leader.PublicKey.Bytes()) +} + +func Register(name string, callbacks Callbacks) error { + ext := getExtension() + ext.mu.Lock() + defer ext.mu.Unlock() + if name == "" { + return fmt.Errorf("leaderwatch: name cannot be empty") + } + if _, exists := ext.watchers[name]; exists { + return fmt.Errorf("leaderwatch: watcher %q already registered", name) + } + ext.watchers[name] = &watcher{callbacks: callbacks} + ext.order = append(ext.order, name) + return nil +} + +func ResetForTest() { + ext := getExtension() + ext.mu.Lock() + ext.watchers = make(map[string]*watcher) + ext.order = nil + ext.mu.Unlock() +} diff --git a/extensions/leaderwatch/leaderwatch_test.go b/extensions/leaderwatch/leaderwatch_test.go new file mode 100644 index 000000000..5c654511c --- /dev/null +++ b/extensions/leaderwatch/leaderwatch_test.go @@ -0,0 +1,106 @@ +package leaderwatch + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/core/crypto" + "github.com/trufnetwork/kwil-db/core/log" + coretypes "github.com/trufnetwork/kwil-db/core/types" +) + +func makeBlock(height int64, leader []byte) *common.BlockContext { + pk := &fakePubKey{b: leader} + return &common.BlockContext{ + Height: height, + ChainContext: &common.ChainContext{ + NetworkParameters: &coretypes.NetworkParameters{ + Leader: coretypes.PublicKey{PublicKey: pk}, + }, + }, + } +} + +type fakePubKey struct { + b []byte +} + +func (f *fakePubKey) Equals(other crypto.Key) bool { + if other == nil { + return false + } + return string(f.Bytes()) == string(other.Bytes()) && f.Type() == other.Type() +} +func (f *fakePubKey) Bytes() []byte { return f.b } +func (f *fakePubKey) Type() crypto.KeyType { return crypto.KeyTypeEd25519 } +func (f *fakePubKey) Verify(data []byte, sig []byte) (bool, error) { return true, nil } + +func TestLeaderwatch_CallbackOrderingAndTransitions(t *testing.T) { + ResetForTest() + app := &common.App{Service: &common.Service{Identity: []byte("nodeA"), Logger: log.New()}} + ctx := context.Background() + + var events []string + + err := Register("first", Callbacks{ + OnAcquire: func(ctx context.Context, app *common.App, block *common.BlockContext) { + events = append(events, "first_acquire") + }, + OnLose: func(ctx context.Context, app *common.App, block *common.BlockContext) { + events = append(events, "first_lose") + }, + OnEndBlock: func(ctx context.Context, app *common.App, block *common.BlockContext) { + events = append(events, "first_end") + }, + }) + require.NoError(t, err) + + err = Register("second", Callbacks{ + OnAcquire: func(ctx context.Context, app *common.App, block *common.BlockContext) { + events = append(events, "second_acquire") + }, + OnLose: func(ctx context.Context, app *common.App, block *common.BlockContext) { + events = append(events, "second_lose") + }, + OnEndBlock: func(ctx context.Context, app *common.App, block *common.BlockContext) { + events = append(events, "second_end") + }, + }) + require.NoError(t, err) + + // become leader + require.NoError(t, endBlockHook(ctx, app, makeBlock(1, []byte("nodeA")))) + require.Equal(t, []string{ + "first_acquire", "first_end", + "second_acquire", "second_end", + }, events) + + // stay leader - only end callbacks fire + events = nil + require.NoError(t, endBlockHook(ctx, app, makeBlock(2, []byte("nodeA")))) + require.Equal(t, []string{"first_end", "second_end"}, events) + + // lose leadership + events = nil + require.NoError(t, endBlockHook(ctx, app, makeBlock(3, []byte("nodeB")))) + require.Equal(t, []string{ + "first_lose", "first_end", + "second_lose", "second_end", + }, events) +} + +func TestLeaderwatch_DuplicateRegistrationFails(t *testing.T) { + ResetForTest() + require.NoError(t, Register("dup", Callbacks{})) + err := Register("dup", Callbacks{}) + require.Error(t, err) +} + +func TestLeaderwatch_ResetClearsState(t *testing.T) { + ResetForTest() + require.NoError(t, Register("one", Callbacks{})) + ResetForTest() + require.NoError(t, Register("one", Callbacks{})) +} diff --git a/extensions/register.go b/extensions/register.go index 887aa5139..27cf8e6fd 100644 --- a/extensions/register.go +++ b/extensions/register.go @@ -2,12 +2,16 @@ package extensions import ( "github.com/trufnetwork/node/extensions/database-size" + "github.com/trufnetwork/node/extensions/leaderwatch" "github.com/trufnetwork/node/extensions/tn_cache" "github.com/trufnetwork/node/extensions/tn_digest" + "github.com/trufnetwork/node/extensions/tn_vacuum" ) func init() { + leaderwatch.InitializeExtension() tn_cache.InitializeExtension() tn_digest.InitializeExtension() + tn_vacuum.InitializeExtension() database_size.InitializeExtension() } diff --git a/extensions/tn_digest/README.md b/extensions/tn_digest/README.md index 3006ea6af..0c185a785 100644 --- a/extensions/tn_digest/README.md +++ b/extensions/tn_digest/README.md @@ -29,7 +29,7 @@ Minimal SQL to adjust: ```sql -- First-time setup: ensure the single row exists INSERT INTO main.digest_config (id, enabled, digest_schedule) -VALUES (1, true, '*/10 * * * *'); +VALUES (1, true, '0 9 * * *'); -- Subsequent changes UPDATE main.digest_config SET enabled = true, digest_schedule = '*/10 * * * *' WHERE id = 1; diff --git a/extensions/tn_digest/leader.go b/extensions/tn_digest/leader.go deleted file mode 100644 index 4f168faa0..000000000 --- a/extensions/tn_digest/leader.go +++ /dev/null @@ -1,21 +0,0 @@ -package tn_digest - -import ( - "bytes" - - "github.com/trufnetwork/kwil-db/common" -) - -// isCurrentLeader compares the chain leader in block context with this node's identity. -func isCurrentLeader(app *common.App, block *common.BlockContext) bool { - if block == nil || block.ChainContext == nil || block.ChainContext.NetworkParameters == nil || app.Service == nil || app.Service.Identity == nil { - return false - } - leaderBytes := block.ChainContext.NetworkParameters.Leader.Bytes() - if len(leaderBytes) == 0 { - // warn that leader is not set - app.Service.Logger.Warn("leader is not set") - return false - } - return bytes.Equal(leaderBytes, app.Service.Identity) -} diff --git a/extensions/tn_digest/leader_reload_test.go b/extensions/tn_digest/leader_reload_test.go index ddb8c0c9d..5f7ae4c3b 100644 --- a/extensions/tn_digest/leader_reload_test.go +++ b/extensions/tn_digest/leader_reload_test.go @@ -147,10 +147,10 @@ func TestDigest_DefaultDisabled_NoSchedulerOnLeaderAcquire(t *testing.T) { ext.SetReloadIntervalBlocks(1000) identity := []byte("nodeA") app := &common.App{Service: makeService(identity, "1000")} + ext.SetService(app.Service) block := makeBlock(1, identity) - err := endBlockHook(context.Background(), app, block) - require.NoError(t, err) + digestLeaderAcquire(context.Background(), app, block) assert.Nil(t, ext.Scheduler()) } @@ -160,9 +160,10 @@ func TestDigest_LeaderAcquire_StartsScheduler_WhenEnabled(t *testing.T) { ext.SetReloadIntervalBlocks(1000) identity := []byte("nodeB") app := &common.App{Service: makeService(identity, "1000")} + ext.SetService(app.Service) block := makeBlock(1, identity) - require.NoError(t, endBlockHook(context.Background(), app, block)) + digestLeaderAcquire(context.Background(), app, block) require.NotNil(t, ext.Scheduler()) // cleanup _ = ext.Scheduler().Stop() @@ -174,14 +175,15 @@ func TestDigest_LoseLeadership_StopsScheduler(t *testing.T) { ext.SetReloadIntervalBlocks(1000) identity := []byte("nodeC") app := &common.App{Service: makeService(identity, "1000")} + ext.SetService(app.Service) // acquire leadership - require.NoError(t, endBlockHook(context.Background(), app, makeBlock(1, identity))) + digestLeaderAcquire(context.Background(), app, makeBlock(1, identity)) require.NotNil(t, ext.Scheduler()) // lose leadership other := []byte("other") - require.NoError(t, endBlockHook(context.Background(), app, makeBlock(2, other))) + digestLeaderLose(context.Background(), app, makeBlock(2, other)) // idempotent stop _ = ext.Scheduler().Stop() @@ -196,17 +198,18 @@ func TestDigest_Reload_EnablesAndStarts_WhenBecomesEnabled(t *testing.T) { ext.SetLastCheckedHeight(1) identity := []byte("nodeD") app := &common.App{Service: makeService(identity, "1")} + ext.SetService(app.Service) // attach EngineOps with fake DB that returns enabled on reload BEFORE first hook fdb := &fakeDB{enabled: true, schedule: "*/5 * * * *"} ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, &fakeAccounts{}, log.New())) // leader at height 1: disabled, no scheduler - require.NoError(t, endBlockHook(context.Background(), app, makeBlock(1, identity))) + digestLeaderAcquire(context.Background(), app, makeBlock(1, identity)) assert.Nil(t, ext.Scheduler()) // height 2 triggers reload -> should enable and start - require.NoError(t, endBlockHook(context.Background(), app, makeBlock(2, identity))) + digestLeaderEndBlock(context.Background(), app, makeBlock(2, identity)) require.NotNil(t, ext.Scheduler()) _ = ext.Scheduler().Stop() } @@ -219,15 +222,16 @@ func TestDigest_Reload_DisablesAndStops_WhenBecomesDisabled(t *testing.T) { ext.SetLastCheckedHeight(1) identity := []byte("nodeE") app := &common.App{Service: makeService(identity, "1")} + ext.SetService(app.Service) // start as leader enabled (no reload yet) - require.NoError(t, endBlockHook(context.Background(), app, makeBlock(1, identity))) + digestLeaderAcquire(context.Background(), app, makeBlock(1, identity)) require.NotNil(t, ext.Scheduler()) // reload returns disabled fdb := &fakeDB{enabled: false, schedule: "*/5 * * * *"} ext.SetEngineOps(digestinternal.NewEngineOperations(&fakeEngine{}, fdb, &fakeAccounts{}, log.New())) - require.NoError(t, endBlockHook(context.Background(), app, makeBlock(2, identity))) + digestLeaderEndBlock(context.Background(), app, makeBlock(2, identity)) // stop should be idempotent _ = ext.Scheduler().Stop() @@ -239,9 +243,10 @@ func TestDigest_LeaderDetection_UsesNetworkParametersLeader(t *testing.T) { ext.SetReloadIntervalBlocks(1000) identity := []byte("nodeF") app := &common.App{Service: makeService(identity, "1000")} + ext.SetService(app.Service) // Proposer would be different, but we use NetworkParameters.Leader in makeBlock - require.NoError(t, endBlockHook(context.Background(), app, makeBlock(1, identity))) + digestLeaderAcquire(context.Background(), app, makeBlock(1, identity)) require.True(t, ext.IsLeader()) require.NotNil(t, ext.Scheduler()) _ = ext.Scheduler().Stop() diff --git a/extensions/tn_digest/tn_digest.go b/extensions/tn_digest/tn_digest.go index 92d1be2fb..4e438b3e2 100644 --- a/extensions/tn_digest/tn_digest.go +++ b/extensions/tn_digest/tn_digest.go @@ -9,6 +9,7 @@ import ( "github.com/trufnetwork/kwil-db/extensions/hooks" "github.com/trufnetwork/kwil-db/extensions/precompiles" sql "github.com/trufnetwork/kwil-db/node/types/sql" + "github.com/trufnetwork/node/extensions/leaderwatch" "github.com/trufnetwork/node/extensions/tn_digest/internal" ) @@ -24,10 +25,17 @@ func InitializeExtension() { if err := hooks.RegisterEngineReadyHook(ExtensionName+"_engine_ready", engineReadyHook); err != nil { panic(fmt.Sprintf("failed to register %s engine ready hook: %v", ExtensionName, err)) } - // Register end-block hook for leader gating + // Register end-block hook (kept for compatibility; actual leader handling via leaderwatch) if err := hooks.RegisterEndBlockHook(ExtensionName+"_end_block", endBlockHook); err != nil { panic(fmt.Sprintf("failed to register %s end block hook: %v", ExtensionName, err)) } + if err := leaderwatch.Register(ExtensionName, leaderwatch.Callbacks{ + OnAcquire: digestLeaderAcquire, + OnLose: digestLeaderLose, + OnEndBlock: digestLeaderEndBlock, + }); err != nil { + panic(fmt.Sprintf("failed to register %s leader watcher: %v", ExtensionName, err)) + } } // InitializeDigestPrecompile makes the extension visible in logs @@ -91,77 +99,106 @@ func engineReadyHook(ctx context.Context, app *common.App) error { // endBlockHook toggles scheduler based on leader status and config func endBlockHook(ctx context.Context, app *common.App, block *common.BlockContext) error { + return nil +} + +func digestLeaderAcquire(ctx context.Context, app *common.App, block *common.BlockContext) { ext := GetExtension() if ext == nil { - return nil + return } - - // Determine leader: compare NetworkParameters.Leader with node identity - isLeader := isCurrentLeader(app, block) - - prev := ext.IsLeader() - if !prev && isLeader { - // became leader: start scheduler if enabled - if ext.ConfigEnabled() { - // lazily create scheduler if missing using app.Service (tests may not set ext.Service) - if ext.ensureSchedulerWithService(app.Service) { - // created scheduler, continue to start below - } - if ext.Scheduler() == nil { // still missing due to missing prereqs - ext.Logger().Debug("tn_digest: prerequisites missing; deferring start until broadcaster/signer/engine/service are available") - } else if err := ext.startScheduler(ctx); err != nil { - ext.Logger().Warn("failed to start tn_digest scheduler on leader acquire", "error", err) - } else { - ext.Logger().Info("tn_digest started (leader)", "schedule", ext.Schedule()) - } + ext.setLeader(true) + if !ext.ConfigEnabled() { + return + } + service := ext.Service() + if app != nil && app.Service != nil { + service = app.Service + if ext.Service() == nil { + ext.SetService(service) } } - if prev && !isLeader { - // lost leadership: stop scheduler if running - ext.stopSchedulerIfRunning() + if ext.ensureSchedulerWithService(service) { + // scheduler created; fall through to start + } + if ext.Scheduler() == nil { + ext.Logger().Debug("tn_digest: prerequisites missing; deferring start until broadcaster/signer/engine/service are available") + return + } + if err := ext.startScheduler(ctx); err != nil { + ext.Logger().Warn("failed to start tn_digest scheduler on leader acquire", "error", err) + } else { + ext.Logger().Info("tn_digest started (leader)", "schedule", ext.Schedule()) + } +} + +func digestLeaderLose(ctx context.Context, app *common.App, block *common.BlockContext) { + ext := GetExtension() + if ext == nil { + return + } + ext.setLeader(false) + ext.stopSchedulerIfRunning() + if ext.Logger() != nil { ext.Logger().Info("tn_digest stopped (lost leadership)") } - ext.setLeader(isLeader) - - // Periodic config reload - if block != nil && ext.ReloadIntervalBlocks() > 0 { - if block.Height-ext.LastCheckedHeight() >= ext.ReloadIntervalBlocks() { - if ext.EngineOps() != nil { - enabled, schedule, _ := ext.EngineOps().LoadDigestConfig(ctx) - // Only act if changed - if enabled != ext.ConfigEnabled() || (schedule != "" && schedule != ext.Schedule()) { - // fallback to default if schedule empty - if schedule == "" { - schedule = DefaultDigestSchedule - } - ext.SetConfig(enabled, schedule) - // reconcile based on new config and current leadership - if !enabled { - // disabled -> stop if running - ext.stopSchedulerIfRunning() - ext.Logger().Info("tn_digest stopped due to config disabled") - } else if isLeader { - // enabled and leader -> (re)start with new schedule - if ext.Scheduler() == nil && !ext.ensureSchedulerWithService(app.Service) { - ext.Logger().Debug("tn_digest: prerequisites missing; deferring (re)start after config update") - } else if err := func() error { - // stop if existing, then start - if ext.Scheduler() != nil { - ext.stopSchedulerIfRunning() - } - return ext.startScheduler(ctx) - }(); err != nil { - ext.Logger().Warn("failed to (re)start tn_digest scheduler after config update", "error", err) - } else { - ext.Logger().Info("tn_digest (re)started with new schedule", "schedule", ext.Schedule()) - } - } +} + +func digestLeaderEndBlock(ctx context.Context, app *common.App, block *common.BlockContext) { + ext := GetExtension() + if ext == nil { + return + } + + if block == nil { + return + } + + reload := ext.ReloadIntervalBlocks() + if reload <= 0 { + return + } + + if block.Height-ext.LastCheckedHeight() < reload { + return + } + + if ext.EngineOps() == nil { + ext.Logger().Debug("tn_digest: skip reload; EngineOps not ready") + ext.SetLastCheckedHeight(block.Height) + return + } + + enabled, schedule, _ := ext.EngineOps().LoadDigestConfig(ctx) + if schedule == "" { + schedule = DefaultDigestSchedule + } + + if enabled != ext.ConfigEnabled() || schedule != ext.Schedule() { + ext.SetConfig(enabled, schedule) + if !enabled { + ext.stopSchedulerIfRunning() + ext.Logger().Info("tn_digest stopped due to config disabled") + } else if ext.IsLeader() { + service := ext.Service() + if app != nil && app.Service != nil { + service = app.Service + if ext.Service() == nil { + ext.SetService(service) + } + } + if ext.Scheduler() == nil && !ext.ensureSchedulerWithService(service) { + ext.Logger().Debug("tn_digest: prerequisites missing; deferring (re)start after config update") + } else if ext.Scheduler() != nil { + ext.stopSchedulerIfRunning() + if err := ext.startScheduler(ctx); err != nil { + ext.Logger().Warn("failed to (re)start tn_digest scheduler after config update", "error", err) + } else { + ext.Logger().Info("tn_digest (re)started with new schedule", "schedule", ext.Schedule()) } - } else { - ext.Logger().Debug("tn_digest: skip reload; EngineOps not ready") } - ext.SetLastCheckedHeight(block.Height) } } - return nil + + ext.SetLastCheckedHeight(block.Height) } diff --git a/extensions/tn_vacuum/README.md b/extensions/tn_vacuum/README.md new file mode 100644 index 000000000..3321f5725 --- /dev/null +++ b/extensions/tn_vacuum/README.md @@ -0,0 +1,119 @@ +# TN Vacuum Extension + +Automated database maintenance through periodic vacuuming operations. Reclaims disk space and optimizes database performance by removing dead tuples using `pg_repack`. + +## Configuration + +Add to your node's configuration file (all values are strings due to the +extension config format): + +```toml +[extensions.tn_vacuum] +enabled = "true" +block_interval = "50000" +# Optional tuning +# pg_repack_jobs = "1" # limit pg_repack concurrency (default: auto) +``` + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `enabled` | string (`"true"`/`"false"`) | `"true"` | Enable/disable the vacuum extension | +| `block_interval` | string (numeric) | `"50000"` | Number of blocks between vacuum runs | +| `pg_repack_jobs` | string (numeric) | _unset_ | Passes `--jobs=N` to `pg_repack` to throttle concurrency (omit to use the binary default) | + +**Tuning `block_interval`:** +- High-write environments: `25000` - `50000` blocks +- Read-heavy environments: `75000` - `100000` blocks +- Minimum: `1` block (enforced) + +## Prerequisites + +Requires `pg_repack` binary installed and in system PATH. + +> **Note:** If you're using the official TrufNetwork node image, `pg_repack` is already included. Skip installation. + +**For custom installations:** + +**Ubuntu/Debian:** +```bash +sudo apt-get install postgresql-client-16 postgresql-16-repack +``` + +**RHEL/CentOS:** +```bash +sudo yum install postgresql16 pg_repack_16 +``` + +The extension automatically creates the `pg_repack` PostgreSQL extension on first run. + +## State Persistence + +The extension keeps a lightweight bookkeeping table in your node database at +`ext_tn_vacuum.run_state`. On every successful run it records the block height +and timestamp, allowing restarts to pick up the schedule without re-running +immediately. The schema is prefixed with `ext_`, so it is ignored by consensus +hashing and remains entirely node-local. + +## Metrics + +When OpenTelemetry is enabled, the extension provides: + +**Counters:** +- `tn_vacuum.vacuum_start_total` - Vacuum operations started +- `tn_vacuum.vacuum_complete_total` - Successful completions +- `tn_vacuum.vacuum_error_total` - Errors encountered +- `tn_vacuum.vacuum_skipped_total` - Operations skipped + +**Histograms:** +- `tn_vacuum.vacuum_duration_seconds` - Duration of operations +- `tn_vacuum.tables_processed` - Tables processed per run + +**Gauges:** +- `tn_vacuum.last_run_height` - Block height of last run + +## Troubleshooting + +### pg_repack Not Found + +**Symptom:** Logs show "pg_repack binary not found" + +**Solution:** +1. Install pg_repack (see [Prerequisites](#prerequisites)) +2. Ensure it's in system PATH +3. Restart the node + +### Permission Errors + +**Symptom:** Vacuum fails with permission denied + +**Solution:** Ensure database user has superuser privileges OR grant explicit permissions: +```sql +GRANT EXECUTE ON FUNCTION pg_repack.* TO your_user; +``` + +### High Memory Usage + +**Symptom:** Node memory spikes during vacuum + +**Solution:** Increase `block_interval` to run less frequently + +## Performance Impact + +Vacuum operations are non-blocking but consume resources: +- **CPU**: Moderate during operation +- **Memory**: Proportional to table sizes +- **Disk I/O**: Significant but spread over time + +**Best Practices:** +- Start with default interval (50k blocks) +- Monitor for 24-48 hours before adjusting +- Consider database size and growth rate + +## Security Note + +Database credentials are passed via environment variables to `pg_repack`. Requires elevated database privileges (superuser or pg_repack role). + +## Related Documentation + +- [pg_repack Documentation](https://reorg.github.io/pg_repack/) +- [PostgreSQL VACUUM](https://www.postgresql.org/docs/current/sql-vacuum.html) diff --git a/extensions/tn_vacuum/config.go b/extensions/tn_vacuum/config.go new file mode 100644 index 000000000..267eb1e3a --- /dev/null +++ b/extensions/tn_vacuum/config.go @@ -0,0 +1,75 @@ +package tn_vacuum + +import ( + "fmt" + "strconv" + "strings" + + "github.com/trufnetwork/kwil-db/common" +) + +type Config struct { + Enabled bool + BlockInterval int64 + PgRepackJobs int +} + +func LoadConfig(service *common.Service) (Config, error) { + cfg := Config{Enabled: true, BlockInterval: defaultBlockInterval} + + if service == nil || service.LocalConfig == nil { + return cfg, nil + } + + raw, ok := service.LocalConfig.Extensions[ExtensionName] + if !ok { + return cfg, nil + } + + if v, ok := raw[ConfigKeyEnabled]; ok { + boolVal, err := parseBool(v) + if err != nil { + return cfg, fmt.Errorf("parse enabled: %w", err) + } + cfg.Enabled = boolVal + } + + if v, ok := raw[ConfigKeyBlockInterval]; ok { + val, err := strconv.ParseInt(strings.TrimSpace(v), 10, 64) + if err != nil { + return cfg, fmt.Errorf("parse block_interval: %w", err) + } + if val <= 0 { + val = defaultBlockInterval + } + if val < minBlockInterval { + val = minBlockInterval + } + cfg.BlockInterval = val + } + + if v, ok := raw[ConfigKeyPgRepackJobs]; ok { + val, err := strconv.Atoi(strings.TrimSpace(v)) + if err != nil { + return cfg, fmt.Errorf("parse pg_repack_jobs: %w", err) + } + if val < 0 { + val = 0 + } + cfg.PgRepackJobs = val + } + + return cfg, nil +} + +func parseBool(in string) (bool, error) { + val := strings.TrimSpace(in) + switch val { + case "true": + return true, nil + case "false", "": + return false, nil + default: + return false, fmt.Errorf("invalid bool %q, expected 'true' or 'false'", in) + } +} diff --git a/extensions/tn_vacuum/constants.go b/extensions/tn_vacuum/constants.go new file mode 100644 index 000000000..5326b291c --- /dev/null +++ b/extensions/tn_vacuum/constants.go @@ -0,0 +1,31 @@ +package tn_vacuum + +const ( + // ExtensionName is used for hook registration and config namespace. + ExtensionName = "tn_vacuum" +) + +const ( + defaultBlockInterval = int64(50000) + minBlockInterval = int64(1) +) + +// Configuration keys +const ( + ConfigKeyEnabled = "enabled" + ConfigKeyBlockInterval = "block_interval" + ConfigKeyPgRepackJobs = "pg_repack_jobs" +) + +// Database connection defaults +const ( + DefaultPostgresHost = "127.0.0.1" + DefaultPostgresPort = "5432" + DefaultSSLMode = "sslmode=disable" +) + +// Report status values +const ( + StatusOK = "ok" + StatusFailed = "failed" +) diff --git a/extensions/tn_vacuum/extension.go b/extensions/tn_vacuum/extension.go new file mode 100644 index 000000000..7faa4aaf6 --- /dev/null +++ b/extensions/tn_vacuum/extension.go @@ -0,0 +1,414 @@ +package tn_vacuum + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/core/log" + sql "github.com/trufnetwork/kwil-db/node/types/sql" + "github.com/trufnetwork/node/extensions/tn_vacuum/metrics" +) + +type Extension struct { + mu sync.RWMutex + logger log.Logger + service *common.Service + config Config + mechanism Mechanism + runner *Runner + state runState + stateStore stateStore + now func() time.Time + metrics metrics.MetricsRecorder + runQueue chan runRequest + workerCtx context.Context + workerCancel context.CancelFunc + workerWG sync.WaitGroup + runInProgress bool +} + +var ( + extInstance *Extension + once sync.Once +) + +type runRequest struct { + height int64 + reason string + dbConfig DBConnConfig + triggeredAt time.Time + PgRepackJobs int +} + +func GetExtension() *Extension { + once.Do(func() { + logger := log.New(log.WithLevel(log.LevelInfo)) + extInstance = &Extension{ + logger: logger, + metrics: metrics.NewMetricsRecorder(logger), + now: time.Now, + } + }) + return extInstance +} + +func SetExtension(e *Extension) { + if e != nil && e.now == nil { + e.now = time.Now + } + extInstance = e +} + +func ResetForTest() { + if extInstance != nil { + extInstance.Close(context.Background()) + } + once = sync.Once{} + extInstance = nil +} + +// setStateStore overrides the persistent state backend. Tests use this to +// inject a stub without touching a real database connection. +func (e *Extension) setStateStore(store stateStore) { + e.mu.Lock() + defer e.mu.Unlock() + e.stateStore = store +} + +// setNowFunc overrides the clock used for persisted timestamps. Tests provide +// deterministic values through this hook. +func (e *Extension) setNowFunc(now func() time.Time) { + e.mu.Lock() + defer e.mu.Unlock() + e.now = now +} + +func (e *Extension) Logger() log.Logger { + e.mu.RLock() + defer e.mu.RUnlock() + return e.logger +} + +func (e *Extension) setLogger(l log.Logger) { + e.mu.Lock() + defer e.mu.Unlock() + e.logger = l + e.metrics = metrics.NewMetricsRecorder(l) +} + +func (e *Extension) setService(s *common.Service) { + e.mu.Lock() + defer e.mu.Unlock() + e.service = s +} + +// startWorkerLocked spins up the background worker responsible for consuming +// queued run requests. The caller must hold e.mu. +func (e *Extension) startWorkerLocked(parent context.Context) { + if e.runQueue != nil { + return + } + if parent == nil { + parent = context.Background() + } + ctx, cancel := context.WithCancel(parent) + e.workerCtx = ctx + e.workerCancel = cancel + e.runQueue = make(chan runRequest, 1) + e.workerWG.Add(1) + runQueue := e.runQueue + eworker := e + go func() { + defer eworker.workerWG.Done() + for { + select { + case <-ctx.Done(): + return + case req, ok := <-runQueue: + if !ok { + return + } + eworker.processRun(ctx, req) + } + } + }() +} + +// initializeState prepares the persistence backend and loads the last run +// information from disk. It is safe to call multiple times; the underlying +// operations are idempotent. +func (e *Extension) initializeState(ctx context.Context, db sql.DB) error { + e.mu.RLock() + store := e.stateStore + service := e.service + logger := e.logger + metricsRecorder := e.metrics + e.mu.RUnlock() + + if store == nil { + cfg := DBConnConfig{} + if service != nil { + cfg = dbConnFromService(service) + } + if cfg.Database == "" { + return fmt.Errorf("tn_vacuum state persistence requires database name") + } + + newStore, err := newPGStateStore(ctx, cfg, logger) + if err != nil { + return fmt.Errorf("initialize tn_vacuum state store: %w", err) + } + + e.mu.Lock() + if e.stateStore == nil { + e.stateStore = newStore + store = newStore + metricsRecorder = e.metrics + } else { + store = e.stateStore + newStore.Close() + } + e.mu.Unlock() + } + + if store == nil { + return fmt.Errorf("tn_vacuum state store unavailable") + } + + if err := store.Ensure(ctx); err != nil { + return fmt.Errorf("prepare tn_vacuum state store: %w", err) + } + + state, ok, err := store.Load(ctx) + if err != nil { + return fmt.Errorf("load tn_vacuum state: %w", err) + } + if !ok { + return nil + } + + e.mu.Lock() + e.state = state + e.mu.Unlock() + + if metricsRecorder != nil { + metricsRecorder.RecordLastRunHeight(ctx, state.LastRunHeight) + } + + return nil +} + +// processRun executes a scheduled vacuum request on the worker goroutine. It +// assumes only a single worker is active at a time so no additional locking is +// required outside of bookkeeping updates. +func (e *Extension) processRun(ctx context.Context, req runRequest) { + e.mu.Lock() + mech := e.mechanism + runner := e.runner + logger := e.logger + metricsRecorder := e.metrics + store := e.stateStore + nowFn := e.now + config := e.config + e.runInProgress = true + e.mu.Unlock() + + if !config.Enabled || mech == nil || runner == nil { + e.mu.Lock() + e.runInProgress = false + e.mu.Unlock() + return + } + + runCtx, cancel := context.WithCancel(ctx) + defer cancel() + + err := runner.Execute(runCtx, RunnerArgs{ + Mechanism: mech, + Logger: logger, + Reason: req.reason, + DB: req.dbConfig, + Metrics: metricsRecorder, + PgRepackJobs: req.PgRepackJobs, + }) + + if err != nil { + logger.Warn("vacuum run failed", "error", err, "height", req.height, "reason", req.reason) + e.mu.Lock() + e.runInProgress = false + e.mu.Unlock() + return + } + + newState := runState{LastRunHeight: req.height} + if nowFn != nil { + newState.LastRunAt = nowFn().UTC() + } + + if store != nil { + if err := store.Save(runCtx, newState); err != nil { + logger.Warn("failed to persist tn_vacuum state", "error", err) + } + } + if metricsRecorder != nil { + metricsRecorder.RecordLastRunHeight(runCtx, req.height) + } + + e.mu.Lock() + if req.height > e.state.LastRunHeight { + e.state = newState + } + e.runInProgress = false + e.mu.Unlock() +} + +// enqueueRun places a run request on the worker queue if no job is already +// pending or in progress. It returns false when the worker is busy so callers +// can record a skip metric. +func (e *Extension) enqueueRun(ctx context.Context, req runRequest) bool { + e.mu.Lock() + defer e.mu.Unlock() + if e.runQueue == nil { + e.startWorkerLocked(ctx) + } + if e.runInProgress { + return false + } + if len(e.runQueue) > 0 { + return false + } + select { + case e.runQueue <- req: + return true + default: + return false + } +} + +func (e *Extension) configure(ctx context.Context, cfg Config) error { + e.mu.Lock() + defer e.mu.Unlock() + + if e.mechanism != nil { + _ = e.mechanism.Close(ctx) + e.mechanism = nil + } + + e.config = cfg + + if !cfg.Enabled { + return nil + } + + mech := newMechanism() + deps := MechanismDeps{Logger: e.logger, DB: dbConnFromService(e.service)} + if deps.DB.Database == "" { + return fmt.Errorf("tn_vacuum requires database name in configuration") + } + if err := mech.Prepare(ctx, deps); err != nil { + return err + } + + e.mechanism = mech + e.runner = &Runner{logger: e.logger} + return nil +} + +func (e *Extension) maybeRun(ctx context.Context, blockHeight int64) { + if blockHeight <= 0 { + return + } + + e.mu.RLock() + cfg := e.config + mech := e.mechanism + runner := e.runner + state := e.state + logger := e.logger + svc := e.service + metricsRecorder := e.metrics + nowFn := e.now + e.mu.RUnlock() + + if !cfg.Enabled || mech == nil || runner == nil { + return + } + + if state.LastRunHeight != 0 { + if blockHeight <= state.LastRunHeight { + return + } + if blockHeight-state.LastRunHeight < cfg.BlockInterval { + if metricsRecorder != nil { + metricsRecorder.RecordVacuumSkipped(ctx, "block_interval_not_met") + } + return + } + } + + reason := fmt.Sprintf("block_interval:%d", blockHeight) + triggeredAt := time.Now() + if nowFn != nil { + triggeredAt = nowFn() + } + req := runRequest{ + height: blockHeight, + reason: reason, + dbConfig: dbConnFromService(svc), + triggeredAt: triggeredAt, + PgRepackJobs: cfg.PgRepackJobs, + } + + if !e.enqueueRun(ctx, req) { + if metricsRecorder != nil { + metricsRecorder.RecordVacuumSkipped(ctx, "worker_busy") + } + logger.Debug("vacuum run already queued or in progress", "height", blockHeight, "reason", reason) + } +} + +func (e *Extension) Close(ctx context.Context) { + e.mu.Lock() + mech := e.mechanism + e.mechanism = nil + e.runner = nil + store := e.stateStore + e.stateStore = nil + queue := e.runQueue + e.runQueue = nil + cancel := e.workerCancel + e.workerCancel = nil + e.workerCtx = nil + e.mu.Unlock() + + if mech != nil { + _ = mech.Close(ctx) + } + if store != nil { + store.Close() + } + if cancel != nil { + cancel() + } + if queue != nil { + close(queue) + } + e.workerWG.Wait() +} + +func dbConnFromService(service *common.Service) DBConnConfig { + if service == nil || service.LocalConfig == nil { + return DBConnConfig{} + } + db := service.LocalConfig.DB + return DBConnConfig{ + Host: db.Host, + Port: db.Port, + User: db.User, + Password: db.Pass, + Database: db.DBName, + } +} diff --git a/extensions/tn_vacuum/mechanism.go b/extensions/tn_vacuum/mechanism.go new file mode 100644 index 000000000..5f808f5e3 --- /dev/null +++ b/extensions/tn_vacuum/mechanism.go @@ -0,0 +1,66 @@ +package tn_vacuum + +import ( + "context" + "sync" + "time" + + "github.com/trufnetwork/kwil-db/core/log" +) + +type Mechanism interface { + Name() string + Prepare(ctx context.Context, deps MechanismDeps) error + Run(ctx context.Context, req RunRequest) (*RunReport, error) + Close(ctx context.Context) error +} + +type MechanismDeps struct { + Logger log.Logger + DB DBConnConfig +} + +type RunRequest struct { + Reason string + DB DBConnConfig + PgRepackJobs int +} + +type RunReport struct { + Mechanism string + Status string + Duration time.Duration + TablesProcessed int + Error string +} + +type DBConnConfig struct { + Host string + Port string + User string + Password string + Database string +} + +var ( + mechanismFactory = func() Mechanism { return NewPgRepackMechanism() } + mechanismFactoryMu sync.RWMutex +) + +func newMechanism() Mechanism { + mechanismFactoryMu.RLock() + defer mechanismFactoryMu.RUnlock() + return mechanismFactory() +} + +func setMechanismFactoryForTest(f func() Mechanism) { + mechanismFactoryMu.Lock() + defer mechanismFactoryMu.Unlock() + mechanismFactory = f +} + +func resetMechanismFactory() { + mechanismFactoryMu.Lock() + defer mechanismFactoryMu.Unlock() + mechanismFactory = func() Mechanism { return NewPgRepackMechanism() } +} diff --git a/extensions/tn_vacuum/mechanism_repack.go b/extensions/tn_vacuum/mechanism_repack.go new file mode 100644 index 000000000..92320c4c2 --- /dev/null +++ b/extensions/tn_vacuum/mechanism_repack.go @@ -0,0 +1,191 @@ +package tn_vacuum + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "os/exec" + "strings" + "time" + + "github.com/jackc/pgx/v5" + "github.com/trufnetwork/kwil-db/core/log" +) + +var ErrPgRepackUnavailable = errors.New("pg_repack binary not found in PATH") + +type pgRepackMechanism struct { + logger log.Logger + binaryPath string + db DBConnConfig +} + +func NewPgRepackMechanism() Mechanism { + return &pgRepackMechanism{} +} + +func (m *pgRepackMechanism) Name() string { return "pg_repack" } + +func (m *pgRepackMechanism) Prepare(ctx context.Context, deps MechanismDeps) error { + m.logger = deps.Logger.New("mechanism.pg_repack") + m.db = deps.DB + path, err := exec.LookPath("pg_repack") + if err != nil { + m.logger.Error("pg_repack binary not found; extension cannot start", "error", err) + return ErrPgRepackUnavailable + } + m.binaryPath = path + m.logger.Info("pg_repack binary detected", "path", path) + if err := ensurePgRepackExtension(ctx, deps.DB, m.logger); err != nil { + return fmt.Errorf("ensure pg_repack extension: %w", err) + } + return nil +} + +func (m *pgRepackMechanism) Run(ctx context.Context, req RunRequest) (*RunReport, error) { + startTime := time.Now() + report := &RunReport{ + Mechanism: m.Name(), + Status: StatusOK, + } + + if m.binaryPath == "" { + return nil, fmt.Errorf("pg_repack unavailable: %w", ErrPgRepackUnavailable) + } + db := req.DB + if db.Database == "" { + db = m.db + } + if db.Database == "" { + return nil, fmt.Errorf("pg_repack requires database name") + } + + args := []string{fmt.Sprintf("--dbname=%s", db.Database), "--all"} + if db.Host != "" { + args = append(args, fmt.Sprintf("--host=%s", db.Host)) + } + if db.Port != "" { + args = append(args, fmt.Sprintf("--port=%s", db.Port)) + } + if db.User != "" { + args = append(args, fmt.Sprintf("--username=%s", db.User)) + } + + if req.PgRepackJobs > 0 { + args = append(args, fmt.Sprintf("--jobs=%d", req.PgRepackJobs)) + } + // Always skip reordering to minimize swap time; logical data remains unchanged. + args = append(args, "--no-order") + + cmd := exec.CommandContext(ctx, m.binaryPath, args...) + env := os.Environ() + if db.Password != "" { + env = append(env, fmt.Sprintf("PGPASSWORD=%s", db.Password)) + } + cmd.Env = env + + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + m.logger.Info("pg_repack starting", "args", args) + if err := cmd.Run(); err != nil { + report.Duration = time.Since(startTime) + report.Status = StatusFailed + report.Error = err.Error() + m.logger.Warn("pg_repack failed", "error", err, "stderr", stderr.String(), "duration", report.Duration) + return report, fmt.Errorf("pg_repack execution failed: %w", err) + } + + report.Duration = time.Since(startTime) + output := stdout.String() + stderr.String() + if err := detectPgRepackSoftFailure(stderr.String()); err != nil { + report.Status = StatusFailed + report.Error = err.Error() + m.logger.Warn("pg_repack reported incompatibility", "stderr", stderr.String(), "duration", report.Duration) + return report, err + } + tablesProcessed := strings.Count(output, "INFO: repacking table") + report.TablesProcessed = tablesProcessed + + if tablesProcessed == 0 { + report.Status = StatusFailed + report.Error = "pg_repack completed without processing any tables" + m.logger.Warn("pg_repack completed but processed no tables", "stderr", stderr.String()) + return report, fmt.Errorf("pg_repack processed zero tables") + } + + m.logger.Info("pg_repack completed", "stdout", stdout.String(), "stderr", stderr.String(), "duration", report.Duration, "tables", tablesProcessed) + return report, nil +} + +func detectPgRepackSoftFailure(stderr string) error { + lowered := strings.ToLower(stderr) + switch { + case strings.Contains(lowered, "does not match database library"): + return fmt.Errorf("pg_repack version mismatch: %s", summarizePgRepackError(stderr)) + default: + return nil + } +} + +func summarizePgRepackError(stderr string) string { + lines := strings.Split(strings.TrimSpace(stderr), "\n") + if len(lines) == 0 { + return "" + } + return strings.TrimSpace(lines[len(lines)-1]) +} + +func (m *pgRepackMechanism) Close(ctx context.Context) error { + if m.logger != nil { + m.logger.Info("pg_repack mechanism closed") + } + return nil +} + +func ensurePgRepackExtension(ctx context.Context, db DBConnConfig, logger log.Logger) error { + if db.Database == "" { + return fmt.Errorf("missing database name for pg_repack extension setup") + } + connStr := buildConnString(db) + conn, err := pgx.Connect(ctx, connStr) + if err != nil { + logger.Warn("failed to connect to database for pg_repack extension", "error", err) + return fmt.Errorf("pg_repack extension connection: %w", err) + } + defer conn.Close(ctx) + + if _, err := conn.Exec(ctx, "CREATE EXTENSION IF NOT EXISTS pg_repack"); err != nil { + logger.Warn("failed to create pg_repack extension", "error", err) + return fmt.Errorf("create pg_repack extension: %w", err) + } + logger.Info("pg_repack extension ensured") + return nil +} + +func buildConnString(db DBConnConfig) string { + host := db.Host + if host == "" { + host = DefaultPostgresHost + } + port := db.Port + if port == "" { + port = DefaultPostgresPort + } + parts := []string{ + fmt.Sprintf("host=%s", host), + fmt.Sprintf("port=%s", port), + fmt.Sprintf("dbname=%s", db.Database), + DefaultSSLMode, + } + if db.User != "" { + parts = append(parts, fmt.Sprintf("user=%s", db.User)) + } + if db.Password != "" { + parts = append(parts, fmt.Sprintf("password=%s", db.Password)) + } + return strings.Join(parts, " ") +} diff --git a/extensions/tn_vacuum/mechanism_repack_test.go b/extensions/tn_vacuum/mechanism_repack_test.go new file mode 100644 index 000000000..cfc535d3b --- /dev/null +++ b/extensions/tn_vacuum/mechanism_repack_test.go @@ -0,0 +1,39 @@ +package tn_vacuum + +import "testing" + +func TestDetectPgRepackSoftFailure(t *testing.T) { + tests := []struct { + name string + stderr string + expects bool + }{ + { + name: "version mismatch", + stderr: "INFO: database \"kwild\" skipped: program 'pg_repack 1.5.0' does not match database library 'pg_repack 1.5.2'", + expects: true, + }, + { + name: "extension missing", + stderr: "INFO: database \"kwild\" skipped: pg_repack 1.5.0 is not installed in the database", + expects: false, + }, + { + name: "no issues", + stderr: "INFO: repacking database \"kwild\"\nINFO: repacking table \"public\".\"foo\"", + expects: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := detectPgRepackSoftFailure(tt.stderr) + if tt.expects && err == nil { + t.Fatalf("expected error, got nil") + } + if !tt.expects && err != nil { + t.Fatalf("expected no error, got %v", err) + } + }) + } +} diff --git a/extensions/tn_vacuum/metrics/metrics.go b/extensions/tn_vacuum/metrics/metrics.go new file mode 100644 index 000000000..da77b06e5 --- /dev/null +++ b/extensions/tn_vacuum/metrics/metrics.go @@ -0,0 +1,78 @@ +// Package metrics provides observability for the tn_vacuum extension. +// It uses a plugin pattern to ensure zero overhead when OpenTelemetry is not available. +package metrics + +import ( + "context" + "strings" + "time" + + "github.com/trufnetwork/kwil-db/core/log" + "go.opentelemetry.io/otel" +) + +// MetricsRecorder defines the interface for recording vacuum metrics. +// This allows for pluggable implementations - either real OTEL metrics or no-op. +type MetricsRecorder interface { + // Vacuum operation metrics + RecordVacuumStart(ctx context.Context, mechanism string) + RecordVacuumComplete(ctx context.Context, mechanism string, duration time.Duration, tablesProcessed int) + RecordVacuumError(ctx context.Context, mechanism string, errType string) + RecordVacuumSkipped(ctx context.Context, reason string) + + // Resource metrics + RecordLastRunHeight(ctx context.Context, height int64) +} + +// NewMetricsRecorder creates a metrics recorder instance. +// It automatically detects if OpenTelemetry is available and returns +// either a real OTEL implementation or a no-op implementation. +func NewMetricsRecorder(logger log.Logger) MetricsRecorder { + // Try to get the global meter provider + meter := otel.GetMeterProvider().Meter("github.com/trufnetwork/kwil-db/extensions/tn_vacuum") + + // Try to create a test metric to verify OTEL is functional + _, err := meter.Int64Counter("tn_vacuum.test") + if err != nil { + logger.Debug("OpenTelemetry not available, metrics disabled") + return NewNoOpMetrics() + } + + // OTEL is available, create real metrics recorder + otelMetrics, err := NewOTELMetrics(meter, logger) + if err != nil { + logger.Warn("failed to initialize OTEL metrics, falling back to no-op", "error", err) + return NewNoOpMetrics() + } + + logger.Info("OpenTelemetry metrics initialized successfully") + return otelMetrics +} + +// ClassifyError categorizes errors for metric labels to keep cardinality low +func ClassifyError(err error) string { + if err == nil { + return "none" + } + + errStr := err.Error() + switch { + case strings.Contains(errStr, "context deadline exceeded"): + return "timeout" + case strings.Contains(errStr, "context canceled"): + return "cancelled" + case strings.Contains(errStr, "connection"): + return "connection_error" + case strings.Contains(errStr, "pg_repack unavailable") || strings.Contains(errStr, "not found in PATH"): + return "binary_unavailable" + case strings.Contains(errStr, "execution failed"): + return "execution_failed" + case strings.Contains(errStr, "database") || strings.Contains(errStr, "sql"): + return "database_error" + case strings.Contains(errStr, "permission denied") || strings.Contains(errStr, "unauthorized"): + return "permission_denied" + default: + return "unknown" + } +} + diff --git a/extensions/tn_vacuum/metrics/metrics_test.go b/extensions/tn_vacuum/metrics/metrics_test.go new file mode 100644 index 000000000..b7bfc30a0 --- /dev/null +++ b/extensions/tn_vacuum/metrics/metrics_test.go @@ -0,0 +1,94 @@ +package metrics + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/trufnetwork/kwil-db/core/log" +) + +func TestNoOpMetrics(t *testing.T) { + ctx := context.Background() + m := NewNoOpMetrics() + + // Should not panic + m.RecordVacuumStart(ctx, "test") + m.RecordVacuumComplete(ctx, "test", time.Second, 5) + m.RecordVacuumError(ctx, "test", "error") + m.RecordVacuumSkipped(ctx, "reason") + m.RecordLastRunHeight(ctx, 100) +} + +func TestClassifyError(t *testing.T) { + tests := []struct { + name string + err error + expected string + }{ + { + name: "nil error", + err: nil, + expected: "none", + }, + { + name: "timeout error", + err: errors.New("context deadline exceeded"), + expected: "timeout", + }, + { + name: "cancelled error", + err: errors.New("context canceled"), + expected: "cancelled", + }, + { + name: "connection error", + err: errors.New("connection refused"), + expected: "connection_error", + }, + { + name: "binary unavailable", + err: errors.New("pg_repack unavailable: not found in PATH"), + expected: "binary_unavailable", + }, + { + name: "execution failed", + err: errors.New("pg_repack execution failed"), + expected: "execution_failed", + }, + { + name: "database error", + err: errors.New("sql error: syntax error"), + expected: "database_error", + }, + { + name: "permission denied", + err: errors.New("permission denied"), + expected: "permission_denied", + }, + { + name: "unknown error", + err: errors.New("something went wrong"), + expected: "unknown", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := ClassifyError(tt.err) + require.Equal(t, tt.expected, result) + }) + } +} + +func TestNewMetricsRecorder(t *testing.T) { + logger := log.New() + m := NewMetricsRecorder(logger) + require.NotNil(t, m) + + // Should return a valid metrics recorder (either OTEL or NoOp) + ctx := context.Background() + m.RecordVacuumStart(ctx, "test") +} diff --git a/extensions/tn_vacuum/metrics/noop.go b/extensions/tn_vacuum/metrics/noop.go new file mode 100644 index 000000000..ad3329838 --- /dev/null +++ b/extensions/tn_vacuum/metrics/noop.go @@ -0,0 +1,23 @@ +package metrics + +import ( + "context" + "time" +) + +// NoOpMetrics is a no-op implementation of MetricsRecorder. +// It does nothing and has zero overhead. +type NoOpMetrics struct{} + +// NewNoOpMetrics creates a new no-op metrics recorder. +func NewNoOpMetrics() MetricsRecorder { + return &NoOpMetrics{} +} + +func (n *NoOpMetrics) RecordVacuumStart(ctx context.Context, mechanism string) {} +func (n *NoOpMetrics) RecordVacuumComplete(ctx context.Context, mechanism string, duration time.Duration, tablesProcessed int) { +} +func (n *NoOpMetrics) RecordVacuumError(ctx context.Context, mechanism string, errType string) {} +func (n *NoOpMetrics) RecordVacuumSkipped(ctx context.Context, reason string) {} +func (n *NoOpMetrics) RecordLastRunHeight(ctx context.Context, height int64) {} + diff --git a/extensions/tn_vacuum/metrics/otel.go b/extensions/tn_vacuum/metrics/otel.go new file mode 100644 index 000000000..47c68dfed --- /dev/null +++ b/extensions/tn_vacuum/metrics/otel.go @@ -0,0 +1,143 @@ +package metrics + +import ( + "context" + "time" + + "github.com/trufnetwork/kwil-db/core/log" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// OTELMetrics implements MetricsRecorder using OpenTelemetry. +type OTELMetrics struct { + logger log.Logger + + // Counters + vacuumStartCounter metric.Int64Counter + vacuumCompleteCounter metric.Int64Counter + vacuumErrorCounter metric.Int64Counter + vacuumSkippedCounter metric.Int64Counter + + // Histograms + vacuumDuration metric.Float64Histogram + tablesProcessed metric.Int64Histogram + + // Gauges + lastRunHeight metric.Int64Gauge +} + +// NewOTELMetrics creates a new OTEL metrics recorder. +func NewOTELMetrics(meter metric.Meter, logger log.Logger) (*OTELMetrics, error) { + m := &OTELMetrics{logger: logger} + + var err error + + // Create counters + m.vacuumStartCounter, err = meter.Int64Counter( + "tn_vacuum.vacuum_start_total", + metric.WithDescription("Total number of vacuum operations started"), + metric.WithUnit("{operation}"), + ) + if err != nil { + return nil, err + } + + m.vacuumCompleteCounter, err = meter.Int64Counter( + "tn_vacuum.vacuum_complete_total", + metric.WithDescription("Total number of vacuum operations completed successfully"), + metric.WithUnit("{operation}"), + ) + if err != nil { + return nil, err + } + + m.vacuumErrorCounter, err = meter.Int64Counter( + "tn_vacuum.vacuum_error_total", + metric.WithDescription("Total number of vacuum errors"), + metric.WithUnit("{error}"), + ) + if err != nil { + return nil, err + } + + m.vacuumSkippedCounter, err = meter.Int64Counter( + "tn_vacuum.vacuum_skipped_total", + metric.WithDescription("Total number of vacuum operations skipped"), + metric.WithUnit("{operation}"), + ) + if err != nil { + return nil, err + } + + // Create histograms + m.vacuumDuration, err = meter.Float64Histogram( + "tn_vacuum.vacuum_duration_seconds", + metric.WithDescription("Duration of vacuum operations"), + metric.WithUnit("s"), + ) + if err != nil { + return nil, err + } + + m.tablesProcessed, err = meter.Int64Histogram( + "tn_vacuum.tables_processed", + metric.WithDescription("Number of tables processed during vacuum"), + metric.WithUnit("{table}"), + ) + if err != nil { + return nil, err + } + + // Create gauges + m.lastRunHeight, err = meter.Int64Gauge( + "tn_vacuum.last_run_height", + metric.WithDescription("Block height of the last vacuum run"), + metric.WithUnit("{block}"), + ) + if err != nil { + return nil, err + } + + return m, nil +} + +func (m *OTELMetrics) RecordVacuumStart(ctx context.Context, mechanism string) { + m.vacuumStartCounter.Add(ctx, 1, + metric.WithAttributes( + attribute.String("mechanism", mechanism), + ), + ) +} + +func (m *OTELMetrics) RecordVacuumComplete(ctx context.Context, mechanism string, duration time.Duration, tablesProcessed int) { + attrs := metric.WithAttributes( + attribute.String("mechanism", mechanism), + ) + + m.vacuumCompleteCounter.Add(ctx, 1, attrs) + m.vacuumDuration.Record(ctx, duration.Seconds(), attrs) + m.tablesProcessed.Record(ctx, int64(tablesProcessed), attrs) +} + +func (m *OTELMetrics) RecordVacuumError(ctx context.Context, mechanism string, errType string) { + m.vacuumErrorCounter.Add(ctx, 1, + metric.WithAttributes( + attribute.String("mechanism", mechanism), + attribute.String("error_type", errType), + ), + ) +} + +func (m *OTELMetrics) RecordVacuumSkipped(ctx context.Context, reason string) { + m.vacuumSkippedCounter.Add(ctx, 1, + metric.WithAttributes( + attribute.String("reason", reason), + ), + ) +} + +func (m *OTELMetrics) RecordLastRunHeight(ctx context.Context, height int64) { + m.lastRunHeight.Record(ctx, height) +} + diff --git a/extensions/tn_vacuum/runner.go b/extensions/tn_vacuum/runner.go new file mode 100644 index 000000000..9d9f1f215 --- /dev/null +++ b/extensions/tn_vacuum/runner.go @@ -0,0 +1,76 @@ +package tn_vacuum + +import ( + "context" + "time" + + "github.com/trufnetwork/kwil-db/core/log" + "github.com/trufnetwork/node/extensions/tn_vacuum/metrics" +) + +type Runner struct { + logger log.Logger +} + +type RunnerArgs struct { + Mechanism Mechanism + Logger log.Logger + Reason string + DB DBConnConfig + Metrics metrics.MetricsRecorder + PgRepackJobs int +} + +func (r *Runner) Execute(ctx context.Context, args RunnerArgs) error { + if args.Mechanism == nil { + return nil + } + logger := r.logger + if logger == nil { + logger = args.Logger + } + + mechanismName := args.Mechanism.Name() + + if logger != nil { + logger.Info("vacuum runner executing", "mechanism", mechanismName, "reason", args.Reason) + } + if args.Metrics != nil { + args.Metrics.RecordVacuumStart(ctx, mechanismName) + } + + report, err := args.Mechanism.Run(ctx, RunRequest{ + Reason: args.Reason, + DB: args.DB, + PgRepackJobs: args.PgRepackJobs, + }) + if err != nil { + if logger != nil { + logger.Warn("vacuum runner failed", "error", err) + } + if args.Metrics != nil { + args.Metrics.RecordVacuumError(ctx, mechanismName, metrics.ClassifyError(err)) + } + return err + } + + if logger != nil { + fields := []any{"mechanism", mechanismName} + if report != nil { + fields = append(fields, "duration", report.Duration, "tables", report.TablesProcessed) + } + logger.Info("vacuum runner completed", fields...) + } + + if args.Metrics != nil { + var duration time.Duration + tables := 0 + if report != nil { + duration = report.Duration + tables = report.TablesProcessed + } + args.Metrics.RecordVacuumComplete(ctx, mechanismName, duration, tables) + } + + return nil +} diff --git a/extensions/tn_vacuum/state.go b/extensions/tn_vacuum/state.go new file mode 100644 index 000000000..8c8078971 --- /dev/null +++ b/extensions/tn_vacuum/state.go @@ -0,0 +1,122 @@ +package tn_vacuum + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/trufnetwork/kwil-db/core/log" +) + +const ( + // vacuumSchemaName is the private Postgres schema used to persist the + // extension's local bookkeeping. Schemas prefixed with ext_ are ignored by + // consensus hashing, keeping this purely node-local state. + vacuumSchemaName = "ext_tn_vacuum" +) + +// runState is the minimal persisted view of the extension. It tracks the block +// height and timestamp of the last successful vacuum run. +type runState struct { + LastRunHeight int64 + LastRunAt time.Time +} + +// stateStore represents a persistence backend capable of storing and loading +// runState snapshots. +type stateStore interface { + Ensure(ctx context.Context) error + Load(ctx context.Context) (runState, bool, error) + Save(ctx context.Context, state runState) error + Close() +} + +// pgStateStore implements stateStore using a dedicated pgx connection pool. +type pgStateStore struct { + pool *pgxpool.Pool + logger log.Logger +} + +// newPGStateStore constructs a Postgres-backed store with its own pool. +func newPGStateStore(ctx context.Context, cfg DBConnConfig, logger log.Logger) (stateStore, error) { + connStr := buildConnString(cfg) + poolCfg, err := pgxpool.ParseConfig(connStr) + if err != nil { + return nil, fmt.Errorf("parse connection config: %w", err) + } + pool, err := pgxpool.NewWithConfig(ctx, poolCfg) + if err != nil { + return nil, fmt.Errorf("create connection pool: %w", err) + } + return &pgStateStore{pool: pool, logger: logger}, nil +} + +func (s *pgStateStore) Ensure(ctx context.Context) error { + if _, err := s.pool.Exec(ctx, fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS %s", vacuumSchemaName)); err != nil { + return fmt.Errorf("create tn_vacuum schema: %w", err) + } + + if _, err := s.pool.Exec(ctx, fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s.run_state ( + id INT PRIMARY KEY, + last_run_height BIGINT NOT NULL, + last_run_at TIMESTAMPTZ NOT NULL, + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + )`, vacuumSchemaName)); err != nil { + return fmt.Errorf("create run_state table: %w", err) + } + return nil +} + +func (s *pgStateStore) Load(ctx context.Context) (runState, bool, error) { + row := s.pool.QueryRow(ctx, fmt.Sprintf(` + SELECT last_run_height, last_run_at + FROM %s.run_state + WHERE id = 1 + `, vacuumSchemaName)) + + var state runState + if err := row.Scan(&state.LastRunHeight, &state.LastRunAt); err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return runState{}, false, nil + } + return runState{}, false, fmt.Errorf("load run_state: %w", err) + } + state.LastRunAt = state.LastRunAt.UTC() + return state, true, nil +} + +func (s *pgStateStore) Save(ctx context.Context, state runState) error { + _, err := s.pool.Exec(ctx, fmt.Sprintf(` + INSERT INTO %s.run_state (id, last_run_height, last_run_at, updated_at) + VALUES (1, $1, $2, NOW()) + ON CONFLICT (id) + DO UPDATE SET + last_run_height = EXCLUDED.last_run_height, + last_run_at = EXCLUDED.last_run_at, + updated_at = NOW() + `, vacuumSchemaName), state.LastRunHeight, state.LastRunAt) + if err != nil { + return fmt.Errorf("persist run_state: %w", err) + } + return nil +} + +func (s *pgStateStore) Close() { + if s.pool != nil { + s.pool.Close() + } +} + +// noopStateStore is used when persistence is disabled or misconfigured. +type noopStateStore struct{} + +func (noopStateStore) Ensure(ctx context.Context) error { return nil } +func (noopStateStore) Load(ctx context.Context) (runState, bool, error) { + return runState{}, false, nil +} +func (noopStateStore) Save(ctx context.Context, state runState) error { return nil } +func (noopStateStore) Close() {} diff --git a/extensions/tn_vacuum/tn_vacuum.go b/extensions/tn_vacuum/tn_vacuum.go new file mode 100644 index 000000000..9bae64f33 --- /dev/null +++ b/extensions/tn_vacuum/tn_vacuum.go @@ -0,0 +1,79 @@ +package tn_vacuum + +import ( + "context" + "fmt" + + "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/extensions/hooks" + "github.com/trufnetwork/kwil-db/extensions/precompiles" + sql "github.com/trufnetwork/kwil-db/node/types/sql" +) + +func InitializeExtension() { + if err := precompiles.RegisterInitializer(ExtensionName, initializePrecompile); err != nil { + panic(fmt.Sprintf("failed to register %s initializer: %v", ExtensionName, err)) + } + if err := hooks.RegisterEngineReadyHook(ExtensionName+"_engine_ready", engineReadyHook); err != nil { + panic(fmt.Sprintf("failed to register %s engine ready hook: %v", ExtensionName, err)) + } + if err := hooks.RegisterEndBlockHook(ExtensionName+"_end_block", endBlockHook); err != nil { + panic(fmt.Sprintf("failed to register %s end block hook: %v", ExtensionName, err)) + } +} + +func initializePrecompile(ctx context.Context, service *common.Service, db sql.DB, alias string, metadata map[string]any) (precompiles.Precompile, error) { + ext := GetExtension() + if service != nil { + ext.setLogger(service.Logger.New(ExtensionName)) + ext.setService(service) + } + return precompiles.Precompile{}, nil +} + +func engineReadyHook(ctx context.Context, app *common.App) error { + ext := GetExtension() + if app != nil && app.Service != nil { + ext.setLogger(app.Service.Logger.New(ExtensionName)) + ext.setService(app.Service) + } + + if app != nil { + if err := ext.initializeState(ctx, app.DB); err != nil { + ext.Logger().Warn("failed to initialize tn_vacuum state", "error", err) + } + } + + svc := (*common.Service)(nil) + if app != nil { + svc = app.Service + } + + cfg, err := LoadConfig(svc) + if err != nil { + ext.Logger().Warn("failed to load tn_vacuum config", "error", err) + cfg.Enabled = false + } + + if err := ext.configure(ctx, cfg); err != nil { + ext.Logger().Warn("failed to configure tn_vacuum", "error", err) + } + if cfg.Enabled { + ext.mu.Lock() + ext.startWorkerLocked(ctx) + ext.mu.Unlock() + } + return nil +} + +func endBlockHook(ctx context.Context, app *common.App, block *common.BlockContext) error { + if block == nil { + return nil + } + ext := GetExtension() + if app != nil && app.Service != nil { + ext.setService(app.Service) + } + ext.maybeRun(ctx, block.Height) + return nil +} diff --git a/extensions/tn_vacuum/vacuum_test.go b/extensions/tn_vacuum/vacuum_test.go new file mode 100644 index 000000000..26a02d8b6 --- /dev/null +++ b/extensions/tn_vacuum/vacuum_test.go @@ -0,0 +1,629 @@ +package tn_vacuum + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/trufnetwork/kwil-db/common" + "github.com/trufnetwork/kwil-db/config" + "github.com/trufnetwork/kwil-db/core/log" +) + +type stubMechanism struct { + mu sync.RWMutex + prepared int + runs []RunRequest + closeCnt int +} + +func (s *stubMechanism) Name() string { return "stub" } + +func (s *stubMechanism) Prepare(ctx context.Context, deps MechanismDeps) error { + s.mu.Lock() + s.prepared++ + s.mu.Unlock() + return nil +} + +func (s *stubMechanism) Run(ctx context.Context, req RunRequest) (*RunReport, error) { + s.mu.Lock() + s.runs = append(s.runs, req) + s.mu.Unlock() + return &RunReport{ + Mechanism: s.Name(), + Status: StatusOK, + Duration: 100 * time.Millisecond, + TablesProcessed: 5, + }, nil +} + +func (s *stubMechanism) Close(ctx context.Context) error { + s.mu.Lock() + s.closeCnt++ + s.mu.Unlock() + return nil +} + +func (s *stubMechanism) preparedCount() int { + s.mu.RLock() + defer s.mu.RUnlock() + return s.prepared +} + +func (s *stubMechanism) runCount() int { + s.mu.RLock() + defer s.mu.RUnlock() + return len(s.runs) +} + +func (s *stubMechanism) runAt(i int) (RunRequest, bool) { + s.mu.RLock() + defer s.mu.RUnlock() + if i < 0 || i >= len(s.runs) { + return RunRequest{}, false + } + return s.runs[i], true +} + +func (s *stubMechanism) runsSnapshot() []RunRequest { + s.mu.RLock() + defer s.mu.RUnlock() + copyRuns := make([]RunRequest, len(s.runs)) + copy(copyRuns, s.runs) + return copyRuns +} + +type failingMechanism struct{} + +func (f *failingMechanism) Name() string { return "fail" } +func (f *failingMechanism) Prepare(ctx context.Context, deps MechanismDeps) error { + return errors.New("prepare failed") +} +func (f *failingMechanism) Run(ctx context.Context, req RunRequest) (*RunReport, error) { + return nil, errors.New("should not run") +} +func (f *failingMechanism) Close(ctx context.Context) error { return nil } + +type nilReportMechanism struct{} + +func (n *nilReportMechanism) Name() string { return "nil_report" } +func (n *nilReportMechanism) Prepare(ctx context.Context, deps MechanismDeps) error { + return nil +} +func (n *nilReportMechanism) Run(ctx context.Context, req RunRequest) (*RunReport, error) { + return nil, nil +} +func (n *nilReportMechanism) Close(ctx context.Context) error { return nil } + +type errorRunMechanism struct{} + +func (e *errorRunMechanism) Name() string { return "error_run" } +func (e *errorRunMechanism) Prepare(ctx context.Context, deps MechanismDeps) error { + return nil +} +func (e *errorRunMechanism) Run(ctx context.Context, req RunRequest) (*RunReport, error) { + return nil, errors.New("run failed") +} +func (e *errorRunMechanism) Close(ctx context.Context) error { return nil } + +type stubStateStore struct { + mu sync.RWMutex + ensureCount int + loadCount int + saveCount int + loadState runState + loadOK bool + loadErr error + saveErr error + lastSaved runState +} + +func (s *stubStateStore) Ensure(ctx context.Context) error { + s.mu.Lock() + s.ensureCount++ + s.mu.Unlock() + return nil +} + +func (s *stubStateStore) Load(ctx context.Context) (runState, bool, error) { + s.mu.Lock() + s.loadCount++ + err := s.loadErr + state := s.loadState + ok := s.loadOK + s.mu.Unlock() + if err != nil { + return runState{}, false, err + } + return state, ok, nil +} + +func (s *stubStateStore) Save(ctx context.Context, state runState) error { + s.mu.Lock() + s.saveCount++ + s.lastSaved = state + err := s.saveErr + s.mu.Unlock() + if err != nil { + return err + } + return nil +} + +func (s *stubStateStore) Close() {} + +func (s *stubStateStore) ensureCountValue() int { + s.mu.RLock() + defer s.mu.RUnlock() + return s.ensureCount +} + +func (s *stubStateStore) loadCountValue() int { + s.mu.RLock() + defer s.mu.RUnlock() + return s.loadCount +} + +func (s *stubStateStore) saveCountValue() int { + s.mu.RLock() + defer s.mu.RUnlock() + return s.saveCount +} + +func (s *stubStateStore) lastSavedState() runState { + s.mu.RLock() + defer s.mu.RUnlock() + return s.lastSaved +} + +func TestConfigureDisabledSkipsMechanism(t *testing.T) { + ctx := context.Background() + ResetForTest() + ext := GetExtension() + ext.setLogger(log.New()) + + stub := &stubMechanism{} + setMechanismFactoryForTest(func() Mechanism { return stub }) + defer resetMechanismFactory() + + require.NoError(t, ext.configure(ctx, Config{Enabled: false, BlockInterval: 5})) + require.Equal(t, 0, stub.preparedCount()) +} + +func TestEngineReadyPreparesMechanism(t *testing.T) { + ctx := context.Background() + ResetForTest() + + stub := &stubMechanism{} + setMechanismFactoryForTest(func() Mechanism { return stub }) + defer resetMechanismFactory() + + svc := &common.Service{ + Logger: log.New(), + LocalConfig: &config.Config{ + DB: config.DBConfig{DBName: "kwild_test"}, + Extensions: map[string]map[string]string{ + ExtensionName: { + "enabled": "true", + "block_interval": "3", + ConfigKeyPgRepackJobs: "2", + }, + }, + }, + } + + ext := GetExtension() + ext.setStateStore(&stubStateStore{}) + + app := &common.App{Service: svc} + require.NoError(t, engineReadyHook(ctx, app)) + require.Equal(t, 1, stub.preparedCount()) + + block := &common.BlockContext{Height: 1} + require.NoError(t, endBlockHook(ctx, app, block)) + waitForRunCount(t, stub, 1) + firstRun, ok := stub.runAt(0) + require.True(t, ok) + require.Equal(t, "block_interval:1", firstRun.Reason) + require.Equal(t, 2, firstRun.PgRepackJobs) + + require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 2})) + time.Sleep(50 * time.Millisecond) + require.Len(t, stub.runsSnapshot(), 1) + + require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 4})) + waitForRunCount(t, stub, 2) +} + +func TestConfigureFailureLeavesMechanismNil(t *testing.T) { + ctx := context.Background() + ResetForTest() + ext := GetExtension() + ext.setLogger(log.New()) + ext.setService(&common.Service{LocalConfig: &config.Config{DB: config.DBConfig{DBName: "kwild_test"}}}) + + setMechanismFactoryForTest(func() Mechanism { return &failingMechanism{} }) + defer resetMechanismFactory() + + err := ext.configure(ctx, Config{Enabled: true, BlockInterval: 10}) + require.Error(t, err) + + ext.mu.RLock() + defer ext.mu.RUnlock() + require.Nil(t, ext.mechanism) +} + +func TestRunReportEnhancement(t *testing.T) { + ctx := context.Background() + ResetForTest() + + stub := &stubMechanism{} + setMechanismFactoryForTest(func() Mechanism { return stub }) + defer resetMechanismFactory() + + svc := &common.Service{ + Logger: log.New(), + LocalConfig: &config.Config{ + DB: config.DBConfig{DBName: "kwild_test"}, + Extensions: map[string]map[string]string{ + ExtensionName: { + ConfigKeyEnabled: "true", + ConfigKeyBlockInterval: "1", + }, + }, + }, + } + + ext := GetExtension() + ext.setStateStore(&stubStateStore{}) + + app := &common.App{Service: svc} + require.NoError(t, engineReadyHook(ctx, app)) + + block := &common.BlockContext{Height: 1} + require.NoError(t, endBlockHook(ctx, app, block)) + + waitForRunCount(t, stub, 1) + + // Verify the stub returns enhanced report data + runner := ext.runner + require.NotNil(t, runner) + + report, err := stub.Run(ctx, RunRequest{Reason: "test"}) + require.NoError(t, err) + require.NotNil(t, report) + require.Equal(t, "stub", report.Mechanism) + require.Equal(t, StatusOK, report.Status) + require.Equal(t, 100*time.Millisecond, report.Duration) + require.Equal(t, 5, report.TablesProcessed) +} + +func TestVacuumSkippedMetrics(t *testing.T) { + ctx := context.Background() + ResetForTest() + + stub := &stubMechanism{} + setMechanismFactoryForTest(func() Mechanism { return stub }) + defer resetMechanismFactory() + + svc := &common.Service{ + Logger: log.New(), + LocalConfig: &config.Config{ + DB: config.DBConfig{DBName: "kwild_test"}, + Extensions: map[string]map[string]string{ + ExtensionName: { + ConfigKeyEnabled: "true", + ConfigKeyBlockInterval: "10", + }, + }, + }, + } + + ext := GetExtension() + ext.setStateStore(&stubStateStore{}) + + app := &common.App{Service: svc} + require.NoError(t, engineReadyHook(ctx, app)) + + // First run at height 1 + block := &common.BlockContext{Height: 1} + require.NoError(t, endBlockHook(ctx, app, block)) + waitForRunCount(t, stub, 1) + + // Should be skipped at height 5 (interval not met) + block = &common.BlockContext{Height: 5} + require.NoError(t, endBlockHook(ctx, app, block)) + time.Sleep(50 * time.Millisecond) + require.Len(t, stub.runsSnapshot(), 1, "should not run - interval not met") + + // Should run at height 11 (interval met) + block = &common.BlockContext{Height: 11} + require.NoError(t, endBlockHook(ctx, app, block)) + waitForRunCount(t, stub, 2) +} + +func TestEngineReadyLoadsPersistedState(t *testing.T) { + ctx := context.Background() + ResetForTest() + + stub := &stubMechanism{} + setMechanismFactoryForTest(func() Mechanism { return stub }) + defer resetMechanismFactory() + + store := &stubStateStore{ + loadState: runState{LastRunHeight: 12, LastRunAt: time.Unix(50, 0)}, + loadOK: true, + } + + svc := &common.Service{ + Logger: log.New(), + LocalConfig: &config.Config{ + DB: config.DBConfig{DBName: "kwild_test"}, + Extensions: map[string]map[string]string{ + ExtensionName: { + ConfigKeyEnabled: "true", + ConfigKeyBlockInterval: "5", + }, + }, + }, + } + + app := &common.App{Service: svc} + + ext := GetExtension() + ext.setLogger(log.New()) + ext.setStateStore(store) + + require.NoError(t, engineReadyHook(ctx, app)) + require.Equal(t, 1, store.ensureCountValue()) + require.Equal(t, 1, store.loadCountValue()) + + ext.mu.RLock() + require.Equal(t, int64(12), ext.state.LastRunHeight) + ext.mu.RUnlock() + + metricsStub := &stubMetricsRecorder{} + ext.mu.Lock() + ext.metrics = metricsStub + ext.mu.Unlock() + + require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 14})) + time.Sleep(50 * time.Millisecond) + require.Len(t, stub.runsSnapshot(), 0, "should not run before interval is met") + + require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 18})) + waitForRunCount(t, stub, 1) + waitForCondition(t, time.Second, func() bool { return store.saveCountValue() == 1 }) + require.Equal(t, int64(18), metricsStub.snapshot().lastHeight) +} + +func TestSuccessfulRunPersistsState(t *testing.T) { + ctx := context.Background() + ResetForTest() + + stub := &stubMechanism{} + setMechanismFactoryForTest(func() Mechanism { return stub }) + defer resetMechanismFactory() + + store := &stubStateStore{} + + svc := &common.Service{ + Logger: log.New(), + LocalConfig: &config.Config{ + DB: config.DBConfig{DBName: "kwild_test"}, + Extensions: map[string]map[string]string{ + ExtensionName: { + ConfigKeyEnabled: "true", + ConfigKeyBlockInterval: "1", + }, + }, + }, + } + + app := &common.App{Service: svc} + + ext := GetExtension() + ext.setLogger(log.New()) + ext.setStateStore(store) + + now := time.Unix(100, 0) + ext.setNowFunc(func() time.Time { return now }) + + require.NoError(t, engineReadyHook(ctx, app)) + require.Equal(t, 1, store.ensureCountValue()) + + metricsStub := &stubMetricsRecorder{} + ext.mu.Lock() + ext.metrics = metricsStub + ext.mu.Unlock() + + require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 5})) + waitForRunCount(t, stub, 1) + waitForCondition(t, time.Second, func() bool { return store.saveCountValue() == 1 }) + lastState := store.lastSavedState() + require.Equal(t, int64(5), lastState.LastRunHeight) + require.Equal(t, now.UTC(), lastState.LastRunAt) + snap := metricsStub.snapshot() + require.Equal(t, 1, snap.completeCount) + require.Equal(t, int64(5), snap.lastHeight) + + require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 5})) + time.Sleep(50 * time.Millisecond) + require.Equal(t, 1, store.saveCountValue(), "duplicate height should not persist again") +} + +func TestRunnerHandlesNilReport(t *testing.T) { + ctx := context.Background() + runner := &Runner{logger: log.New()} + metricsStub := &stubMetricsRecorder{} + + require.NoError(t, runner.Execute(ctx, RunnerArgs{ + Mechanism: &nilReportMechanism{}, + Logger: log.New(), + Reason: "test", + Metrics: metricsStub, + })) + + snapshot := metricsStub.snapshot() + require.Equal(t, 1, snapshot.startCount) + require.Equal(t, 1, snapshot.completeCount) + require.Zero(t, snapshot.lastDuration) + require.Zero(t, snapshot.lastTables) + require.Equal(t, "nil_report", snapshot.lastMechanism) +} + +func TestMaybeRunRecordsErrorOnce(t *testing.T) { + ctx := context.Background() + ResetForTest() + + setMechanismFactoryForTest(func() Mechanism { return &errorRunMechanism{} }) + defer resetMechanismFactory() + + svc := &common.Service{ + Logger: log.New(), + LocalConfig: &config.Config{ + DB: config.DBConfig{DBName: "kwild_test"}, + Extensions: map[string]map[string]string{ + ExtensionName: { + ConfigKeyEnabled: "true", + ConfigKeyBlockInterval: "1", + }, + }, + }, + } + + ext := GetExtension() + ext.setStateStore(&stubStateStore{}) + + app := &common.App{Service: svc} + require.NoError(t, engineReadyHook(ctx, app)) + + metricsStub := &stubMetricsRecorder{} + ext.mu.Lock() + ext.metrics = metricsStub + ext.mu.Unlock() + + require.NoError(t, endBlockHook(ctx, app, &common.BlockContext{Height: 1})) + waitForCondition(t, time.Second, func() bool { return metricsStub.snapshot().errorCount == 1 }) + errorSnapshot := metricsStub.snapshot() + require.Equal(t, 1, errorSnapshot.startCount) + require.Equal(t, "error_run", errorSnapshot.lastErrorMechanism) +} + +func TestEnqueueRunBusy(t *testing.T) { + ctx := context.Background() + ResetForTest() + + ext := GetExtension() + ext.setLogger(log.New()) + ext.mu.Lock() + ext.runQueue = make(chan runRequest, 1) + ext.runInProgress = true + ext.mu.Unlock() + + req := runRequest{height: 1, reason: "test"} + require.False(t, ext.enqueueRun(ctx, req)) +} + +type stubMetricsRecorder struct { + mu sync.RWMutex + startCount int + completeCount int + errorCount int + skippedCount int + lastDuration time.Duration + lastTables int + lastMechanism string + lastErrorType string + lastErrorMechanism string + lastSkipReason string + lastHeight int64 +} + +func (s *stubMetricsRecorder) RecordVacuumStart(ctx context.Context, mechanism string) { + s.mu.Lock() + s.startCount++ + s.lastMechanism = mechanism + s.mu.Unlock() +} + +func (s *stubMetricsRecorder) RecordVacuumComplete(ctx context.Context, mechanism string, duration time.Duration, tablesProcessed int) { + s.mu.Lock() + s.completeCount++ + s.lastMechanism = mechanism + s.lastDuration = duration + s.lastTables = tablesProcessed + s.mu.Unlock() +} + +func (s *stubMetricsRecorder) RecordVacuumError(ctx context.Context, mechanism string, errType string) { + s.mu.Lock() + s.errorCount++ + s.lastErrorMechanism = mechanism + s.lastErrorType = errType + s.mu.Unlock() +} + +func (s *stubMetricsRecorder) RecordVacuumSkipped(ctx context.Context, reason string) { + s.mu.Lock() + s.skippedCount++ + s.lastSkipReason = reason + s.mu.Unlock() +} + +func (s *stubMetricsRecorder) RecordLastRunHeight(ctx context.Context, height int64) { + s.mu.Lock() + s.lastHeight = height + s.mu.Unlock() +} + +func (s *stubMetricsRecorder) snapshot() stubMetricsSnapshot { + s.mu.RLock() + defer s.mu.RUnlock() + return stubMetricsSnapshot{ + startCount: s.startCount, + completeCount: s.completeCount, + errorCount: s.errorCount, + lastDuration: s.lastDuration, + lastTables: s.lastTables, + lastMechanism: s.lastMechanism, + lastErrorMechanism: s.lastErrorMechanism, + lastHeight: s.lastHeight, + } +} + +type stubMetricsSnapshot struct { + startCount int + completeCount int + errorCount int + lastDuration time.Duration + lastTables int + lastMechanism string + lastErrorMechanism string + lastHeight int64 +} + +func waitForCondition(t *testing.T, timeout time.Duration, fn func() bool) { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if fn() { + return + } + time.Sleep(10 * time.Millisecond) + } + if fn() { + return + } + t.Fatalf("condition not met within %s", timeout) +} + +func waitForRunCount(t *testing.T, stub *stubMechanism, count int) { + waitForCondition(t, time.Second, func() bool { + return stub.runCount() >= count + }) +} diff --git a/go.mod b/go.mod index 97b511e85..90e70c245 100644 --- a/go.mod +++ b/go.mod @@ -19,8 +19,8 @@ require ( github.com/spf13/cobra v1.9.1 github.com/stretchr/testify v1.10.0 github.com/testcontainers/testcontainers-go v0.37.0 - github.com/trufnetwork/kwil-db v0.10.3-0.20250926181531-88158eb10b64 - github.com/trufnetwork/kwil-db/core v0.4.3-0.20250926181531-88158eb10b64 + github.com/trufnetwork/kwil-db v0.10.3-0.20250930151143-372c7bcfcb2c + github.com/trufnetwork/kwil-db/core v0.4.3-0.20250930151143-372c7bcfcb2c github.com/trufnetwork/sdk-go v0.3.2-0.20250630062504-841b40cdb709 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa diff --git a/go.sum b/go.sum index f00599225..ff19fad08 100644 --- a/go.sum +++ b/go.sum @@ -1214,8 +1214,16 @@ github.com/tklauser/numcpus v0.9.0 h1:lmyCHtANi8aRUgkckBgoDk1nHCux3n2cgkJLXdQGPD github.com/tklauser/numcpus v0.9.0/go.mod h1:SN6Nq1O3VychhC1npsWostA+oW+VOQTxZrS604NSRyI= github.com/trufnetwork/kwil-db v0.10.3-0.20250926181531-88158eb10b64 h1:FGv9XArb0LrzNBFVhL250+mp30jnnsZkPJChEM2zHgk= github.com/trufnetwork/kwil-db v0.10.3-0.20250926181531-88158eb10b64/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db v0.10.3-0.20250929173952-120a6dd2189e h1:31DihFBWOrV02Y59DsiHUT3fs0yEr5rnNs/zDZX3EVE= +github.com/trufnetwork/kwil-db v0.10.3-0.20250929173952-120a6dd2189e/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= +github.com/trufnetwork/kwil-db v0.10.3-0.20250930151143-372c7bcfcb2c h1:k1nUWyzL16z3gObbvNqXJevhPsYKKdU59JDquYnoIdY= +github.com/trufnetwork/kwil-db v0.10.3-0.20250930151143-372c7bcfcb2c/go.mod h1:LiBAC48uZl2B0IiLtD2hpOce7RNfpuDdghVAOc3u1Qo= github.com/trufnetwork/kwil-db/core v0.4.3-0.20250926181531-88158eb10b64 h1:+HCpXbJ8sNcoADmBpgzz2ceqFc4JbKvGrVF4G7velsU= github.com/trufnetwork/kwil-db/core v0.4.3-0.20250926181531-88158eb10b64/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250929173952-120a6dd2189e h1:ruVB/uBGMVhX7G31Dp2CyNE9XpmJtzX0+3Csw6XTq6s= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250929173952-120a6dd2189e/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250930151143-372c7bcfcb2c h1:n7s2AehSgLYUpTZi7GfhkoCebB0ClkMBr8+p0iYd2vI= +github.com/trufnetwork/kwil-db/core v0.4.3-0.20250930151143-372c7bcfcb2c/go.mod h1:HnOsh9+BN13LJCjiH0+XKaJzyjWKf+H9AofFFp90KwQ= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2 h1:DCq8MzbWH0wZmICNmMVsSzUHUPl+2vqRhluEABjxl88= github.com/trufnetwork/openzeppelin-merkle-tree-go v0.0.2/go.mod h1:Y0MJpPp9QXU5vC6Gpoilql2NkgmGNcbHm9HYC2v2N8s= github.com/trufnetwork/sdk-go v0.3.2-0.20250630062504-841b40cdb709 h1:d9EqPXIjbq/atzEncK5dM3Z9oStx1BxCGuL/sjefeCw= diff --git a/scripts/ci-cleanup.sh b/scripts/ci-cleanup.sh index ff5b19187..096a904dc 100644 --- a/scripts/ci-cleanup.sh +++ b/scripts/ci-cleanup.sh @@ -11,7 +11,7 @@ fi # Common container names/images names=("tn-db" "kwil-postgres" "kwild" "postgres") -images=("kwildb/postgres" "ghcr.io/trufnetwork/node:local" "kwildb/postgres:latest" "kwildb/postgres:16.8-1") +images=("ghcr.io/trufnetwork/kwil-postgres" "ghcr.io/trufnetwork/node:local" "ghcr.io/trufnetwork/kwil-postgres:latest" "ghcr.io/trufnetwork/kwil-postgres:16.8-1") echo "[ci-cleanup] Stopping/removing lingering containers by name..." for n in "${names[@]}"; do diff --git a/scripts/test-ami.sh b/scripts/test-ami.sh index 25f531aff..d31794147 100755 --- a/scripts/test-ami.sh +++ b/scripts/test-ami.sh @@ -365,11 +365,11 @@ echo "Checking if required Docker images can be pulled..." DOCKER_IMAGES_AVAILABLE=true -echo "Checking kwildb/postgres:16.8-1..." -if docker manifest inspect kwildb/postgres:16.8-1 >/dev/null 2>&1; then - echo -e "${GREEN}✓ kwildb/postgres:16.8-1 available${NC}" +echo "Checking ghcr.io/trufnetwork/kwil-postgres:16.8-1..." +if docker manifest inspect ghcr.io/trufnetwork/kwil-postgres:16.8-1 >/dev/null 2>&1; then + echo -e "${GREEN}✓ ghcr.io/trufnetwork/kwil-postgres:16.8-1 available${NC}" else - echo -e "${RED}❌ kwildb/postgres:16.8-1 not available${NC}" + echo -e "${RED}❌ ghcr.io/trufnetwork/kwil-postgres:16.8-1 not available${NC}" DOCKER_IMAGES_AVAILABLE=false fi @@ -454,7 +454,7 @@ fi echo "✓ Using $COMPOSE" echo "✓ Simulated pulling ghcr.io/trufnetwork/node:latest" -echo "✓ Simulated pulling kwildb/postgres:16.8-1" +echo "✓ Simulated pulling ghcr.io/trufnetwork/kwil-postgres:16.8-1" echo "✓ Simulated pulling ghcr.io/trufnetwork/postgres-mcp:latest" echo "🔄 Restarting services..." @@ -506,4 +506,4 @@ else echo "📊 Test Results: ${TESTS_PASSED}/${TOTAL_TESTS} tests passed, ${TESTS_FAILED} failed" echo -e "${RED}Please fix the issues before deployment.${NC}" exit 1 -fi \ No newline at end of file +fi diff --git a/tests/extensions/erc20/erc20_bridge_admin_authz_test.go b/tests/extensions/erc20/erc20_bridge_admin_authz_test.go index 70d0cf67b..12f9f9801 100644 --- a/tests/extensions/erc20/erc20_bridge_admin_authz_test.go +++ b/tests/extensions/erc20/erc20_bridge_admin_authz_test.go @@ -32,7 +32,7 @@ func TestERC20BridgeAdminAuthz(t *testing.T) { require.NoError(t, err) // Step 1: Inject deposit so user has balance - err = testerc20.InjectERC20Transfer(ctx, platform, TestChain, TestEscrowA, TestERC20, TestUserA, TestEscrowA, TestAmount2, 10, nil) + err = testerc20.InjectERC20Transfer(ctx, platform, TestChain, TestEscrowA, TestERC20, TestUserA, TestUserA, TestAmount2, 10, nil) require.NoError(t, err) // Verify user has the balance diff --git a/tests/extensions/erc20/erc20_bridge_end_to_end_test.go b/tests/extensions/erc20/erc20_bridge_end_to_end_test.go index 8a172d2f5..a7517928e 100644 --- a/tests/extensions/erc20/erc20_bridge_end_to_end_test.go +++ b/tests/extensions/erc20/erc20_bridge_end_to_end_test.go @@ -52,7 +52,7 @@ func TestERC20BridgeEndToEnd(t *testing.T) { require.True(t, enabledResult, "instance should be enabled before bridge") // Step 1: Inject deposit to give user a balance - err := testerc20.InjectERC20Transfer(ctx, platform, TestChain, TestEscrowA, TestERC20, TestUserA, TestEscrowA, TestAmount1, 10, nil) + err := testerc20.InjectERC20Transfer(ctx, platform, TestChain, TestEscrowA, TestERC20, TestUserA, TestUserA, TestAmount1, 10, nil) require.NoError(t, err) // Verify user has the balance @@ -134,7 +134,7 @@ func TestERC20BridgeCustomRecipient(t *testing.T) { require.NoError(t, erc20shim.ForTestingSeedAndActivateInstance(ctx, platform, TestChain, TestEscrowA, TestERC20, 18, 1, TestExtensionAlias)) // Give user A balance to bridge - require.NoError(t, testerc20.InjectERC20Transfer(ctx, platform, TestChain, TestEscrowA, TestERC20, TestUserA, TestEscrowA, TestAmount1, 10, nil)) + require.NoError(t, testerc20.InjectERC20Transfer(ctx, platform, TestChain, TestEscrowA, TestERC20, TestUserA, TestUserA, TestAmount1, 10, nil)) engineCtx := engCtx(ctx, platform, TestUserA, 2, false) amtDec, err := types.ParseDecimalExplicit(TestAmount1, 78, 0) diff --git a/tests/extensions/erc20/erc20_bridge_epoch_test.go b/tests/extensions/erc20/erc20_bridge_epoch_test.go index 94f17f85b..24a51d7f3 100644 --- a/tests/extensions/erc20/erc20_bridge_epoch_test.go +++ b/tests/extensions/erc20/erc20_bridge_epoch_test.go @@ -31,7 +31,7 @@ func TestERC20BridgeEpochFlow(t *testing.T) { require.NoError(t, erc20shim.ForTestingSeedAndActivateInstance(ctx, platform, chain, escrow, erc20, 18, 1, TestExtensionAlias)) // Credit balance via injected transfer (simulates inbound deposit) - require.NoError(t, testerc20.InjectERC20Transfer(ctx, platform, chain, escrow, erc20, user, escrow, value, 10, nil)) + require.NoError(t, testerc20.InjectERC20Transfer(ctx, platform, chain, escrow, erc20, user, user, value, 10, nil)) // Lock and issue directly into epoch (simulate bridge request) require.NoError(t, erc20shim.ForTestingLockAndIssueDirect(ctx, platform, chain, escrow, user, value)) diff --git a/tests/extensions/erc20/erc20_bridge_injection_test.go b/tests/extensions/erc20/erc20_bridge_injection_test.go index f22d4962c..b25cdc906 100644 --- a/tests/extensions/erc20/erc20_bridge_injection_test.go +++ b/tests/extensions/erc20/erc20_bridge_injection_test.go @@ -35,7 +35,7 @@ func TestERC20BridgeInjectedTransferAffectsBalance(t *testing.T) { }) // Inject a transfer: from user to escrow (lock/credit path) - err = testerc20.InjectERC20Transfer(ctx, platform, chain, escrow, erc20, user, escrow, value, 1, nil) + err = testerc20.InjectERC20Transfer(ctx, platform, chain, escrow, erc20, user, user, value, 1, nil) require.NoError(t, err) // Query balance via the test alias @@ -66,3 +66,32 @@ func TestERC20BridgeInjectedTransferAffectsBalance(t *testing.T) { return nil }) } + +// TestERC20BridgeInjectedDepositCreditsRecipient ensures deposits can credit a recipient distinct from the depositor. +func TestERC20BridgeInjectedDepositCreditsRecipient(t *testing.T) { + seedAndRun(t, "erc20_bridge_injected_deposit_recipient", func(ctx context.Context, platform *kwilTesting.Platform) error { + chain := "sepolia" + escrow := "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee" + erc20 := "0x2222222222222222222222222222222222222222" + depositor := "0xabc0000000000000000000000000000000000004" + recipient := "0xabc0000000000000000000000000000000000005" + value := "1500000000000000000" + + require.NoError(t, erc20shim.ForTestingSeedAndActivateInstance(ctx, platform, chain, escrow, erc20, 18, 60, TestExtensionAlias)) + t.Cleanup(func() { + erc20shim.ForTestingDisableInstance(ctx, platform, chain, escrow, TestExtensionAlias) + }) + + require.NoError(t, testerc20.InjectERC20Transfer(ctx, platform, chain, escrow, erc20, depositor, recipient, value, 2, nil)) + + balanceRecipient, err := testerc20.GetUserBalance(ctx, platform, TestExtensionAlias, recipient) + require.NoError(t, err) + require.Equal(t, value, balanceRecipient, "recipient should receive credited deposit") + + balanceDepositor, err := testerc20.GetUserBalance(ctx, platform, TestExtensionAlias, depositor) + require.NoError(t, err) + require.Equal(t, "0", balanceDepositor, "depositor should not be credited by deposit") + + return nil + }) +} diff --git a/tests/extensions/erc20/erc20_bridge_multi_instance_test.go b/tests/extensions/erc20/erc20_bridge_multi_instance_test.go index a4783116a..ca5d7a0b6 100644 --- a/tests/extensions/erc20/erc20_bridge_multi_instance_test.go +++ b/tests/extensions/erc20/erc20_bridge_multi_instance_test.go @@ -40,7 +40,7 @@ func TestERC20BridgeMultiInstanceIsolation(t *testing.T) { // Step 1: Inject deposit only for escrowA, userX // This simulates a deposit to escrowA only - err = testerc20.InjectERC20Transfer(ctx, platform, TestChain, escrowA, TestERC20, TestUserA, escrowA, TestAmount1, 10, nil) + err = testerc20.InjectERC20Transfer(ctx, platform, TestChain, escrowA, TestERC20, TestUserA, TestUserA, TestAmount1, 10, nil) require.NoError(t, err) // Step 2: Verify isolation - aliasA should have balance, aliasB should not diff --git a/tests/extensions/erc20/erc20_bridge_transfer_actions_test.go b/tests/extensions/erc20/erc20_bridge_transfer_actions_test.go index 08009b997..39d9b4ca5 100644 --- a/tests/extensions/erc20/erc20_bridge_transfer_actions_test.go +++ b/tests/extensions/erc20/erc20_bridge_transfer_actions_test.go @@ -31,7 +31,7 @@ func TestSepoliaTransferActions(t *testing.T) { // Credit initial balance to TestUserA using configured escrow err = testerc20.InjectERC20Transfer(ctx, platform, - TestChain, configuredEscrow, TestERC20, TestUserA, configuredEscrow, TestAmount2, 10, nil) + TestChain, configuredEscrow, TestERC20, TestUserA, TestUserA, TestAmount2, 10, nil) require.NoError(t, err) // Verify initial balance via sepolia_wallet_balance action @@ -122,7 +122,7 @@ func TestTransferActionValidation(t *testing.T) { // Give TestUserA a small balance (half of what they'll try to transfer) smallAmount := "500000000000000000" // 0.5 tokens (half of TestAmount1 which is 1.0) err = testerc20.InjectERC20Transfer(ctx, platform, - TestChain, configuredEscrow, TestERC20, TestUserA, configuredEscrow, smallAmount, 10, nil) + TestChain, configuredEscrow, TestERC20, TestUserA, TestUserA, smallAmount, 10, nil) require.NoError(t, err) // Verify they have the small balance @@ -171,7 +171,7 @@ func TestMultipleTransferActions(t *testing.T) { // Credit large initial balance to userA initialAmount := "10000000000000000000" // 10.0 tokens err = testerc20.InjectERC20Transfer(ctx, platform, - TestChain, configuredEscrow, TestERC20, userA, configuredEscrow, initialAmount, 10, nil) + TestChain, configuredEscrow, TestERC20, userA, userA, initialAmount, 10, nil) require.NoError(t, err) // Transfer A -> B (3 tokens) diff --git a/tests/extensions/erc20/erc20_bridge_transfer_test.go b/tests/extensions/erc20/erc20_bridge_transfer_test.go index d771c88ae..87e76e1a3 100644 --- a/tests/extensions/erc20/erc20_bridge_transfer_test.go +++ b/tests/extensions/erc20/erc20_bridge_transfer_test.go @@ -31,7 +31,7 @@ func TestERC20BridgeTransferBalances(t *testing.T) { require.NoError(t, err) // Step 1: Inject deposit for userA - err = testerc20.InjectERC20Transfer(ctx, platform, TestChain, TestEscrowA, TestERC20, TestUserA, TestEscrowA, TestAmount2, 10, nil) + err = testerc20.InjectERC20Transfer(ctx, platform, TestChain, TestEscrowA, TestERC20, TestUserA, TestUserA, TestAmount2, 10, nil) require.NoError(t, err) // Verify userA received the full deposit diff --git a/tests/extensions/tn_cache_metrics/docker-compose.yml b/tests/extensions/tn_cache_metrics/docker-compose.yml index 39f6cdcf1..8199ebad0 100644 --- a/tests/extensions/tn_cache_metrics/docker-compose.yml +++ b/tests/extensions/tn_cache_metrics/docker-compose.yml @@ -2,7 +2,7 @@ version: '3.8' services: postgres: - image: kwildb/postgres:16.8-1 + image: ghcr.io/trufnetwork/kwil-postgres:16.8-1 environment: - POSTGRES_HOST_AUTH_METHOD=trust ports: diff --git a/tests/extensions/tn_digest/docker-compose.yml b/tests/extensions/tn_digest/docker-compose.yml index 3a9f17820..85cd07c91 100644 --- a/tests/extensions/tn_digest/docker-compose.yml +++ b/tests/extensions/tn_digest/docker-compose.yml @@ -2,7 +2,7 @@ version: '3.8' services: postgres: - image: kwildb/postgres:16.8-1 + image: ghcr.io/trufnetwork/kwil-postgres:16.8-1 environment: - POSTGRES_HOST_AUTH_METHOD=trust ports: diff --git a/tests/setup/simple_node.go b/tests/setup/simple_node.go index bc3a4f0dc..a92e5f147 100644 --- a/tests/setup/simple_node.go +++ b/tests/setup/simple_node.go @@ -116,7 +116,7 @@ func (f *SimpleNodeFixture) Setup(ctx context.Context, image string, config *Kwi // startPostgres starts a PostgreSQL container func (f *SimpleNodeFixture) startPostgres(ctx context.Context, networkName string) (testcontainers.Container, error) { req := testcontainers.ContainerRequest{ - Image: "kwildb/postgres:16.8-1", + Image: "ghcr.io/trufnetwork/kwil-postgres:16.8-1", ExposedPorts: []string{"5432/tcp"}, Name: "postgres", Networks: []string{networkName}, diff --git a/tests/streams/utils/erc20/helper.go b/tests/streams/utils/erc20/helper.go index 0e8f394da..5ba6e3c40 100644 --- a/tests/streams/utils/erc20/helper.go +++ b/tests/streams/utils/erc20/helper.go @@ -37,12 +37,12 @@ func WithERC20TestSetup(chain, alias string, escrowAddr string) func(t *testing. } } -// CreditUserBalance injects a realistic ERC-20 transfer to credit the user's balance. -// This simulates a user depositing tokens into the bridge. +// CreditUserBalance injects a synthetic escrow deposit to credit the user's balance. +// This simulates a user depositing tokens into the bridge via the RewardDistributor contract. func CreditUserBalance(ctx context.Context, platform *kwilTesting.Platform, extensionAlias, escrowAddr, userAddr, amount string) error { // Use the platform's DB and Engine (could be transaction-scoped) return InjectERC20Transfer( - ctx, platform, extensionAlias, escrowAddr, "0x2222222222222222222222222222222222222222", userAddr, escrowAddr, amount, 10, nil) + ctx, platform, extensionAlias, escrowAddr, "0x2222222222222222222222222222222222222222", userAddr, userAddr, amount, 10, nil) } // GetUserBalance queries the user's current balance via the extension. diff --git a/tests/streams/utils/erc20/inject.go b/tests/streams/utils/erc20/inject.go index 94d6d5938..356969b9e 100644 --- a/tests/streams/utils/erc20/inject.go +++ b/tests/streams/utils/erc20/inject.go @@ -20,7 +20,7 @@ import ( kwilTesting "github.com/trufnetwork/kwil-db/testing" ) -// InjectERC20Transfer forces an instance synced and injects a synthetic Transfer log that credits balance. +// InjectERC20Transfer forces an instance synced and injects a synthetic Deposit log that credits balance. func InjectERC20Transfer(ctx context.Context, platform *kwilTesting.Platform, chain, escrow, erc20Addr, fromHex, toHex string, valueStr string, point int64, prev *int64) error { // 1) Ensure instance exists and is synced id, err := erc20bridge.ForTestingForceSyncInstance(ctx, platform, chain, escrow, erc20Addr, 18) @@ -31,26 +31,32 @@ func InjectERC20Transfer(ctx context.Context, platform *kwilTesting.Platform, ch // 2) Compute ordered-sync topic topic := erc20bridge.ForTestingTransferListenerTopic(*id) - // 3) Build a synthetic transfer log - from := ethcommon.HexToAddress(fromHex) + // 3) Build a synthetic deposit log + if !ethcommon.IsHexAddress(fromHex) { + return fmt.Errorf("invalid address: %s", fromHex) + } + if !ethcommon.IsHexAddress(toHex) { + return fmt.Errorf("invalid address: %s", toHex) + } to := ethcommon.HexToAddress(toHex) - erc20Address := ethcommon.HexToAddress(erc20Addr) + escrowAddress := ethcommon.HexToAddress(escrow) var bn big.Int if _, ok := bn.SetString(valueStr, 10); !ok { return fmt.Errorf("invalid value: %s", valueStr) } - // topics: signature + from + to + // topics: only signature (no indexed params) topics := []ethcommon.Hash{ - ethcommon.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"), - ethcommon.BytesToHash(from.Bytes()), - ethcommon.BytesToHash(to.Bytes()), + ethcommon.HexToHash("0xe1fffcc4923d04b559f4d29a8bfc6cda04eb5b0d3c460751c2402c5c5cc9109c"), } - // data: 32-byte big-endian value + + var recipientWord [32]byte + copy(recipientWord[32-len(to.Bytes()):], to.Bytes()) val32 := types.BigIntToHash32(&bn) + data := append(recipientWord[:], val32[:]...) lg := ðtypes.Log{ - Address: erc20Address, + Address: escrowAddress, Topics: topics, - Data: val32[:], + Data: data, BlockNumber: uint64(point), TxHash: ethcommon.Hash{}, TxIndex: 0, @@ -58,7 +64,7 @@ func InjectERC20Transfer(ctx context.Context, platform *kwilTesting.Platform, ch Index: 0, Removed: false, } - ethLog := &evmsync.EthLog{Metadata: []byte("e20trsnfr"), Log: lg} + ethLog := &evmsync.EthLog{Metadata: []byte("rcpdepst"), Log: lg} // 4) Serialize like production and store via ordered-sync logsData, err := serializeEthLogsLocal([]*evmsync.EthLog{ethLog}) diff --git a/tests/streams/utils/utils.go b/tests/streams/utils/utils.go index 040673152..927092823 100644 --- a/tests/streams/utils/utils.go +++ b/tests/streams/utils/utils.go @@ -1,3 +1,5 @@ +//go:build kwiltest + // Package testutils provides utilities for testing Kwil schemas and extensions. // This package maintains backward compatibility while organizing functionality into focused subpackages. //