From 8f3f1d0954ddc8aa69edc95dcc18c18de9ae9d61 Mon Sep 17 00:00:00 2001 From: Daniel Kronovet Date: Thu, 16 Apr 2026 16:02:27 -0400 Subject: [PATCH 1/4] feat(sqlite): add hex2int UDF for converting hex strings to integers Enables built-in SQLite aggregation functions (MAX, MIN, SUM, etc.) on hex-encoded integer columns. Co-Authored-By: Claude Opus 4.6 --- Cargo.lock | 2 + crates/sqlite/Cargo.toml | 4 ++ crates/sqlite/src/lib.rs | 2 + crates/sqlite/src/udf.rs | 149 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 157 insertions(+) create mode 100644 crates/sqlite/src/udf.rs diff --git a/Cargo.lock b/Cargo.lock index 16dbbecd..fe34f79a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5403,7 +5403,9 @@ version = "0.1.0" dependencies = [ "async-trait", "futures", + "libsqlite3-sys", "sqlx", + "tokio", "torii-common", ] diff --git a/crates/sqlite/Cargo.toml b/crates/sqlite/Cargo.toml index 08f62825..e0a1b34a 100644 --- a/crates/sqlite/Cargo.toml +++ b/crates/sqlite/Cargo.toml @@ -7,6 +7,7 @@ description = "SQLite connection helpers for Torii storage crates" [dependencies] async-trait.workspace = true futures.workspace = true +libsqlite3-sys = { version = "0.30", features = ["bundled"] } sqlx = { workspace = true, features = [ "sqlite", "runtime-tokio-rustls", @@ -15,5 +16,8 @@ sqlx = { workspace = true, features = [ torii-common.workspace = true +[dev-dependencies] +tokio = { version = "1", features = ["rt", "macros"] } + [lints] workspace = true diff --git a/crates/sqlite/src/lib.rs b/crates/sqlite/src/lib.rs index e0e1dc65..67e86a51 100644 --- a/crates/sqlite/src/lib.rs +++ b/crates/sqlite/src/lib.rs @@ -1,4 +1,6 @@ pub mod db; pub mod migration; +pub mod udf; pub use db::{is_sqlite_memory_path, sqlite_connect_options, SqliteConnection}; +pub use udf::register_udfs; diff --git a/crates/sqlite/src/udf.rs b/crates/sqlite/src/udf.rs new file mode 100644 index 00000000..7ece1310 --- /dev/null +++ b/crates/sqlite/src/udf.rs @@ -0,0 +1,149 @@ +use std::ffi::CString; +use std::os::raw::c_int; +use std::ptr; + +use libsqlite3_sys::{ + sqlite3, sqlite3_create_function_v2, sqlite3_result_error, sqlite3_result_int64, + sqlite3_result_null, sqlite3_value, sqlite3_value_text, sqlite3_value_type, SQLITE_OK, + SQLITE_TEXT, SQLITE_UTF8, +}; +use sqlx::sqlite::SqliteConnection; + +/// Register all custom UDFs on the given connection. +#[allow(unsafe_code)] +pub async fn register_udfs(conn: &mut SqliteConnection) -> Result<(), sqlx::Error> { + let mut handle = conn.lock_handle().await?; + let raw = handle.as_raw_handle().as_ptr(); + + // SAFETY: raw is a valid sqlite3* handle, and we hold the lock. + unsafe { + register_hex2int(raw); + } + + Ok(()) +} + +/// Register the `hex2int` scalar function on a raw sqlite3 handle. +/// +/// `hex2int(hex_string)` converts a hex-encoded integer (with or without `0x` prefix) +/// to an i64. Returns NULL for NULL input. +#[allow(unsafe_code)] +unsafe fn register_hex2int(db: *mut sqlite3) { + let name = CString::new("hex2int").unwrap(); + let rc = sqlite3_create_function_v2( + db, + name.as_ptr(), + 1, // nArg + SQLITE_UTF8, // eTextRep + ptr::null_mut(), // pApp + Some(hex2int_fn), // xFunc + None, // xStep + None, // xFinal + None, // xDestroy + ); + assert_eq!(rc, SQLITE_OK, "failed to register hex2int UDF"); +} + +/// The C callback implementing hex2int. +#[allow(unsafe_code)] +unsafe extern "C" fn hex2int_fn( + ctx: *mut libsqlite3_sys::sqlite3_context, + argc: c_int, + argv: *mut *mut sqlite3_value, +) { + debug_assert_eq!(argc, 1); + let val = *argv; + + // NULL in → NULL out + if sqlite3_value_type(val) == libsqlite3_sys::SQLITE_NULL { + sqlite3_result_null(ctx); + return; + } + + // Must be text + if sqlite3_value_type(val) != SQLITE_TEXT { + let msg = CString::new("hex2int: expected text argument").unwrap(); + sqlite3_result_error(ctx, msg.as_ptr(), -1); + return; + } + + let text_ptr = sqlite3_value_text(val); + if text_ptr.is_null() { + sqlite3_result_null(ctx); + return; + } + + let text = std::ffi::CStr::from_ptr(text_ptr as *const _) + .to_str() + .unwrap_or(""); + let hex = text.strip_prefix("0x").or_else(|| text.strip_prefix("0X")).unwrap_or(text); + + match u64::from_str_radix(hex, 16) { + Ok(n) => sqlite3_result_int64(ctx, n as i64), + Err(_) => { + let msg = CString::new(format!("hex2int: invalid hex string '{text}'")).unwrap(); + sqlite3_result_error(ctx, msg.as_ptr(), -1); + } + } +} + +#[cfg(test)] +mod tests { + use sqlx::sqlite::SqlitePoolOptions; + use sqlx::Row; + + use super::*; + + #[tokio::test] + async fn test_hex2int() { + let pool = SqlitePoolOptions::new() + .max_connections(1) + .connect("sqlite::memory:") + .await + .unwrap(); + let mut conn = pool.acquire().await.unwrap(); + register_udfs(conn.as_mut()).await.unwrap(); + + // Basic hex conversion + let row = sqlx::query("SELECT hex2int('0xff') AS v") + .fetch_one(conn.as_mut()) + .await + .unwrap(); + assert_eq!(row.get::("v"), 255); + + // Without prefix + let row = sqlx::query("SELECT hex2int('ff') AS v") + .fetch_one(conn.as_mut()) + .await + .unwrap(); + assert_eq!(row.get::("v"), 255); + + // Large value + let row = sqlx::query("SELECT hex2int('0xffffffffffffffff') AS v") + .fetch_one(conn.as_mut()) + .await + .unwrap(); + assert_eq!(row.get::("v"), -1); // u64::MAX as i64 + + // NULL passthrough + let row = sqlx::query("SELECT hex2int(NULL) AS v") + .fetch_one(conn.as_mut()) + .await + .unwrap(); + assert!(row.try_get::("v").is_err() || row.get::, _>("v").is_none()); + + // Zero + let row = sqlx::query("SELECT hex2int('0x0') AS v") + .fetch_one(conn.as_mut()) + .await + .unwrap(); + assert_eq!(row.get::("v"), 0); + + // Uppercase prefix + let row = sqlx::query("SELECT hex2int('0XAB') AS v") + .fetch_one(conn.as_mut()) + .await + .unwrap(); + assert_eq!(row.get::("v"), 171); + } +} From 97557a86cc8bd17aaf66537f2a835c5c90d2cf7c Mon Sep 17 00:00:00 2001 From: Daniel Kronovet Date: Thu, 16 Apr 2026 17:04:14 -0400 Subject: [PATCH 2/4] feat(hex2int): auto-install UDF and add Postgres equivalent Switch the SQLite UDF to sqlite3_auto_extension so every connection in-process (including the /sql endpoint's AnyPool) gets hex2int without per-connection wiring. Add a matching public.hex2int() in the introspect-postgres-sink migrations, handling hex strings up to 256 bits by keeping only the lower 64 bits. Wire install_udfs() into EcsService::new, and add a Postgres integration test plus a postgres:16 service to the CI test job. Co-Authored-By: Claude Opus 4.7 --- .github/workflows/ci.yml | 19 ++++ Cargo.lock | 1 + .../migrations/004_hex2int_function.sql | 25 +++++ .../introspect-postgres-sink/tests/hex2int.rs | 69 ++++++++++++ crates/sqlite/src/lib.rs | 2 +- crates/sqlite/src/udf.rs | 100 +++++++++++++----- crates/torii-ecs-sink/Cargo.toml | 1 + crates/torii-ecs-sink/src/grpc_service.rs | 1 + 8 files changed, 190 insertions(+), 28 deletions(-) create mode 100644 crates/introspect-postgres-sink/migrations/004_hex2int_function.sql create mode 100644 crates/introspect-postgres-sink/tests/hex2int.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7031c948..1986ea91 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,6 +49,25 @@ jobs: test: name: Test runs-on: ubuntu-latest + + services: + postgres: + image: postgres:16 + env: + POSTGRES_USER: torii + POSTGRES_PASSWORD: torii + POSTGRES_DB: torii + ports: + - 5432:5432 + options: >- + --health-cmd "pg_isready -U torii" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + + env: + DATABASE_URL: postgres://torii:torii@localhost:5432/torii + steps: - uses: actions/checkout@v4 diff --git a/Cargo.lock b/Cargo.lock index fe34f79a..e9e3f967 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5038,6 +5038,7 @@ dependencies = [ "torii-dojo", "torii-introspect", "torii-runtime-common", + "torii-sqlite", "tracing", ] diff --git a/crates/introspect-postgres-sink/migrations/004_hex2int_function.sql b/crates/introspect-postgres-sink/migrations/004_hex2int_function.sql new file mode 100644 index 00000000..a1eca0a7 --- /dev/null +++ b/crates/introspect-postgres-sink/migrations/004_hex2int_function.sql @@ -0,0 +1,25 @@ +CREATE OR REPLACE FUNCTION public.hex2int(hex_text TEXT) RETURNS BIGINT AS $$ +DECLARE + stripped TEXT; +BEGIN + IF hex_text IS NULL THEN + RETURN NULL; + END IF; + + -- Strip a single leading 0x / 0X prefix (not all occurrences). + IF left(hex_text, 2) IN ('0x', '0X') THEN + stripped := substr(hex_text, 3); + ELSE + stripped := hex_text; + END IF; + + -- Take the rightmost 16 hex chars (lower 64 bits). Hex strings may be up + -- to 256 bits; values that fit in u64 are preserved exactly, larger values + -- are truncated to their low 64 bits. + IF length(stripped) > 16 THEN + stripped := right(stripped, 16); + END IF; + + RETURN ('x' || lpad(stripped, 16, '0'))::bit(64)::bigint; +END; +$$ LANGUAGE plpgsql IMMUTABLE STRICT; diff --git a/crates/introspect-postgres-sink/tests/hex2int.rs b/crates/introspect-postgres-sink/tests/hex2int.rs new file mode 100644 index 00000000..538567ce --- /dev/null +++ b/crates/introspect-postgres-sink/tests/hex2int.rs @@ -0,0 +1,69 @@ +//! Integration test for the `hex2int` Postgres function shipped via migration +//! `004_hex2int_function.sql`. Requires a running Postgres reachable via +//! `DATABASE_URL`; skipped otherwise so local `cargo test` still passes. + +use sqlx::postgres::PgPoolOptions; +use sqlx::Row; +use torii_introspect_postgres_sink::INTROSPECT_PG_SINK_MIGRATIONS; + +async fn get_pool() -> Option { + let url = std::env::var("DATABASE_URL").ok()?; + let pool = PgPoolOptions::new() + .max_connections(1) + .connect(&url) + .await + .expect("failed to connect to DATABASE_URL"); + INTROSPECT_PG_SINK_MIGRATIONS + .run(&pool) + .await + .expect("failed to run migrations"); + Some(pool) +} + +async fn hex2int(pool: &sqlx::PgPool, input: Option<&str>) -> Option { + let row = sqlx::query("SELECT hex2int($1) AS v") + .bind(input) + .fetch_one(pool) + .await + .unwrap(); + row.try_get::, _>("v").unwrap() +} + +#[tokio::test] +async fn test_hex2int_postgres() { + let Some(pool) = get_pool().await else { + eprintln!("DATABASE_URL not set; skipping hex2int Postgres test"); + return; + }; + + assert_eq!(hex2int(&pool, Some("0xff")).await, Some(255)); + assert_eq!(hex2int(&pool, Some("ff")).await, Some(255)); + assert_eq!(hex2int(&pool, Some("0x0")).await, Some(0)); + assert_eq!(hex2int(&pool, Some("0XAB")).await, Some(171)); + + // u64::MAX → -1 as i64 + assert_eq!(hex2int(&pool, Some("0xffffffffffffffff")).await, Some(-1)); + + // NULL passthrough + assert_eq!(hex2int(&pool, None).await, None); + + // 256-bit value zero-padded: only the lower 64 bits are kept + assert_eq!( + hex2int( + &pool, + Some("0x00000000000000000000000000000000000000000000000000000000000000ff"), + ) + .await, + Some(255), + ); + + // 256-bit value with non-zero high bits: high bits ignored, lower 64 bits returned + assert_eq!( + hex2int( + &pool, + Some("0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef1234567890abcdef"), + ) + .await, + Some(0x1234567890abcdef_i64), + ); +} diff --git a/crates/sqlite/src/lib.rs b/crates/sqlite/src/lib.rs index 67e86a51..7bbd03c7 100644 --- a/crates/sqlite/src/lib.rs +++ b/crates/sqlite/src/lib.rs @@ -3,4 +3,4 @@ pub mod migration; pub mod udf; pub use db::{is_sqlite_memory_path, sqlite_connect_options, SqliteConnection}; -pub use udf::register_udfs; +pub use udf::install_udfs; diff --git a/crates/sqlite/src/udf.rs b/crates/sqlite/src/udf.rs index 7ece1310..362b1aeb 100644 --- a/crates/sqlite/src/udf.rs +++ b/crates/sqlite/src/udf.rs @@ -1,26 +1,43 @@ use std::ffi::CString; -use std::os::raw::c_int; +use std::os::raw::{c_char, c_int}; use std::ptr; +use std::sync::Once; use libsqlite3_sys::{ - sqlite3, sqlite3_create_function_v2, sqlite3_result_error, sqlite3_result_int64, - sqlite3_result_null, sqlite3_value, sqlite3_value_text, sqlite3_value_type, SQLITE_OK, - SQLITE_TEXT, SQLITE_UTF8, + sqlite3, sqlite3_api_routines, sqlite3_auto_extension, sqlite3_create_function_v2, + sqlite3_result_error, sqlite3_result_int64, sqlite3_result_null, sqlite3_value, + sqlite3_value_text, sqlite3_value_type, SQLITE_OK, SQLITE_TEXT, SQLITE_UTF8, }; -use sqlx::sqlite::SqliteConnection; -/// Register all custom UDFs on the given connection. -#[allow(unsafe_code)] -pub async fn register_udfs(conn: &mut SqliteConnection) -> Result<(), sqlx::Error> { - let mut handle = conn.lock_handle().await?; - let raw = handle.as_raw_handle().as_ptr(); +static INIT: Once = Once::new(); - // SAFETY: raw is a valid sqlite3* handle, and we hold the lock. - unsafe { - register_hex2int(raw); - } +/// Register all custom UDFs as auto-extensions. +/// +/// After calling this, every new SQLite connection in the process will +/// automatically have the UDFs available. Safe to call multiple times; +/// only the first call has any effect. +#[allow(unsafe_code)] +pub fn install_udfs() { + INIT.call_once(|| { + // SAFETY: sqlite3_auto_extension expects an extension entry point cast to + // Option c_int>. + // `udfs_init` matches this signature. + unsafe { + let rc = sqlite3_auto_extension(Some(udfs_init)); + assert_eq!(rc, SQLITE_OK, "failed to install SQLite UDF auto-extension"); + } + }); +} - Ok(()) +/// Auto-extension entry point called by SQLite for each new connection. +#[allow(unsafe_code)] +unsafe extern "C" fn udfs_init( + db: *mut sqlite3, + _pz_err_msg: *mut *mut c_char, + _p_thunk: *const sqlite3_api_routines, +) -> c_int { + register_hex2int(db); + SQLITE_OK } /// Register the `hex2int` scalar function on a raw sqlite3 handle. @@ -76,9 +93,18 @@ unsafe extern "C" fn hex2int_fn( let text = std::ffi::CStr::from_ptr(text_ptr as *const _) .to_str() .unwrap_or(""); - let hex = text.strip_prefix("0x").or_else(|| text.strip_prefix("0X")).unwrap_or(text); - - match u64::from_str_radix(hex, 16) { + let stripped = text + .strip_prefix("0x") + .or_else(|| text.strip_prefix("0X")) + .unwrap_or(text); + // Take the rightmost 16 hex chars (lower 64 bits). Hex strings may be up + // to 256 bits; values that fit in u64 are preserved exactly, larger values + // are truncated to their low 64 bits. + let lower = stripped + .get(stripped.len().saturating_sub(16)..) + .unwrap_or(stripped); + + match u64::from_str_radix(lower, 16) { Ok(n) => sqlite3_result_int64(ctx, n as i64), Err(_) => { let msg = CString::new(format!("hex2int: invalid hex string '{text}'")).unwrap(); @@ -96,54 +122,74 @@ mod tests { #[tokio::test] async fn test_hex2int() { + install_udfs(); + let pool = SqlitePoolOptions::new() .max_connections(1) .connect("sqlite::memory:") .await .unwrap(); - let mut conn = pool.acquire().await.unwrap(); - register_udfs(conn.as_mut()).await.unwrap(); - // Basic hex conversion + // UDFs are automatically available — no per-connection registration needed let row = sqlx::query("SELECT hex2int('0xff') AS v") - .fetch_one(conn.as_mut()) + .fetch_one(&pool) .await .unwrap(); assert_eq!(row.get::("v"), 255); // Without prefix let row = sqlx::query("SELECT hex2int('ff') AS v") - .fetch_one(conn.as_mut()) + .fetch_one(&pool) .await .unwrap(); assert_eq!(row.get::("v"), 255); // Large value let row = sqlx::query("SELECT hex2int('0xffffffffffffffff') AS v") - .fetch_one(conn.as_mut()) + .fetch_one(&pool) .await .unwrap(); assert_eq!(row.get::("v"), -1); // u64::MAX as i64 // NULL passthrough let row = sqlx::query("SELECT hex2int(NULL) AS v") - .fetch_one(conn.as_mut()) + .fetch_one(&pool) .await .unwrap(); assert!(row.try_get::("v").is_err() || row.get::, _>("v").is_none()); // Zero let row = sqlx::query("SELECT hex2int('0x0') AS v") - .fetch_one(conn.as_mut()) + .fetch_one(&pool) .await .unwrap(); assert_eq!(row.get::("v"), 0); // Uppercase prefix let row = sqlx::query("SELECT hex2int('0XAB') AS v") - .fetch_one(conn.as_mut()) + .fetch_one(&pool) .await .unwrap(); assert_eq!(row.get::("v"), 171); + + // 256-bit value: only the lower 64 bits are kept + // 0x0000...0000_00000000000000ff → 255 + let row = sqlx::query( + "SELECT hex2int('0x00000000000000000000000000000000000000000000000000000000000000ff') AS v", + ) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(row.get::("v"), 255); + + // 256-bit value where high bits are non-zero but ignored + // high 192 bits: 0xdead...; low 64 bits: 0x1234567890abcdef + let row = sqlx::query( + "SELECT hex2int('0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef1234567890abcdef') AS v", + ) + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(row.get::("v"), 0x1234567890abcdef_i64); } } diff --git a/crates/torii-ecs-sink/Cargo.toml b/crates/torii-ecs-sink/Cargo.toml index 3598c8c9..1edc7cf8 100644 --- a/crates/torii-ecs-sink/Cargo.toml +++ b/crates/torii-ecs-sink/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" torii = { path = "../.." } torii-dojo = { path = "../dojo" } torii-introspect = { path = "../introspect" } +torii-sqlite.workspace = true torii-runtime-common.workspace = true dojo-introspect.workspace = true introspect-types.workspace = true diff --git a/crates/torii-ecs-sink/src/grpc_service.rs b/crates/torii-ecs-sink/src/grpc_service.rs index 565828cf..80b92e4c 100644 --- a/crates/torii-ecs-sink/src/grpc_service.rs +++ b/crates/torii-ecs-sink/src/grpc_service.rs @@ -645,6 +645,7 @@ impl EcsService { erc1155_url: Option<&str>, ) -> Result { sqlx::any::install_default_drivers(); + torii_sqlite::install_udfs(); let backend = DbBackend::detect(database_url); let database_url = match backend { From 9623aa5b361d4c2d76737871a77e85b87c8776eb Mon Sep 17 00:00:00 2001 From: Daniel Kronovet Date: Thu, 16 Apr 2026 17:20:12 -0400 Subject: [PATCH 3/4] fix(sqlite): satisfy clippy in hex2int UDF Use pointer::cast() instead of `as *const _`, and replace match-on-Result with if-let/else to address ptr_as_ptr and single_match_else lints failing CI. Co-Authored-By: Claude Opus 4.7 --- crates/sqlite/src/udf.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/crates/sqlite/src/udf.rs b/crates/sqlite/src/udf.rs index 362b1aeb..243fd9e2 100644 --- a/crates/sqlite/src/udf.rs +++ b/crates/sqlite/src/udf.rs @@ -90,7 +90,7 @@ unsafe extern "C" fn hex2int_fn( return; } - let text = std::ffi::CStr::from_ptr(text_ptr as *const _) + let text = std::ffi::CStr::from_ptr(text_ptr.cast::()) .to_str() .unwrap_or(""); let stripped = text @@ -104,12 +104,11 @@ unsafe extern "C" fn hex2int_fn( .get(stripped.len().saturating_sub(16)..) .unwrap_or(stripped); - match u64::from_str_radix(lower, 16) { - Ok(n) => sqlite3_result_int64(ctx, n as i64), - Err(_) => { - let msg = CString::new(format!("hex2int: invalid hex string '{text}'")).unwrap(); - sqlite3_result_error(ctx, msg.as_ptr(), -1); - } + if let Ok(n) = u64::from_str_radix(lower, 16) { + sqlite3_result_int64(ctx, n as i64); + } else { + let msg = CString::new(format!("hex2int: invalid hex string '{text}'")).unwrap(); + sqlite3_result_error(ctx, msg.as_ptr(), -1); } } From eca14a8ddaf92ee3c7b464f3086313c95c7f9e18 Mon Sep 17 00:00:00 2001 From: Daniel Kronovet Date: Thu, 16 Apr 2026 17:23:54 -0400 Subject: [PATCH 4/4] fix(introspect-postgres-sink): run hex2int test migrations via SchemaMigrator The raw Migrator::run doesn't create the `introspect` schema that migration 002 writes into, so the test panicked with "schema \"introspect\" does not exist". Use SchemaMigrator, which matches how the sink itself runs these migrations in processor.rs. Co-Authored-By: Claude Opus 4.7 --- crates/introspect-postgres-sink/tests/hex2int.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/introspect-postgres-sink/tests/hex2int.rs b/crates/introspect-postgres-sink/tests/hex2int.rs index 538567ce..fd1e3cc8 100644 --- a/crates/introspect-postgres-sink/tests/hex2int.rs +++ b/crates/introspect-postgres-sink/tests/hex2int.rs @@ -5,6 +5,7 @@ use sqlx::postgres::PgPoolOptions; use sqlx::Row; use torii_introspect_postgres_sink::INTROSPECT_PG_SINK_MIGRATIONS; +use torii_postgres::migration::SchemaMigrator; async fn get_pool() -> Option { let url = std::env::var("DATABASE_URL").ok()?; @@ -13,7 +14,7 @@ async fn get_pool() -> Option { .connect(&url) .await .expect("failed to connect to DATABASE_URL"); - INTROSPECT_PG_SINK_MIGRATIONS + SchemaMigrator::new("introspect", INTROSPECT_PG_SINK_MIGRATIONS) .run(&pool) .await .expect("failed to run migrations");