diff --git a/Cargo.lock b/Cargo.lock index 5885517..2115c80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -207,7 +207,7 @@ dependencies = [ "base64 0.22.0", "chrono", "half", - "lexical-core 1.0.5", + "lexical-core", "num", "ryu", ] @@ -246,8 +246,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07884ea216994cdc32a2d5f8274a8bee979cfe90274b83f86f440866ee3132c7" dependencies = [ "planus", - "prost", - "prost-derive", "serde", ] @@ -278,7 +276,7 @@ dependencies = [ "chrono", "half", "indexmap 2.10.0", - "lexical-core 1.0.5", + "lexical-core", "memchr", "num", "serde", @@ -349,59 +347,6 @@ dependencies = [ "regex-syntax 0.8.2", ] -[[package]] -name = "arrow2" -version = "0.17.4" -source = "git+https://github.com/WallarooLabs/arrow2?branch=fmurphy%2Fbugfix-records-parsing#634b8c8a9d0461dba4c0ec308d76158d6f707cf1" -dependencies = [ - "ahash", - "arrow-format", - "async-stream", - "base64 0.21.7", - "bytemuck", - "chrono", - "dyn-clone", - "either", - "ethnum", - "fallible-streaming-iterator", - "foreign_vec", - "futures", - "getrandom 0.2.12", - "hash_hasher", - "hashbrown 0.13.2", - "indexmap 1.9.3", - "json-deserializer", - "lexical-core 0.8.5", - "multiversion", - "num-traits", - "parquet2", - "rustc_version", - "simdutf8", - "streaming-iterator", -] - -[[package]] -name = "async-stream" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" -dependencies = [ - "async-stream-impl", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.104", -] - [[package]] name = "async-trait" version = "0.1.81" @@ -572,18 +517,22 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" [[package]] name = "bench" -version = "0.5.15" +version = "0.6.0" dependencies = [ "anyhow", + "arrow-array", + "arrow-ipc", + "arrow-schema", "async-trait", "futures", "hdrhistogram", "humantime-serde", "plateau-client", "plateau-server", + "plateau-transport", "rand 0.8.5", "reqwest", - "sample-arrow2", + "sample-arrow-rs", "sample-std", "serde", "serde_json", @@ -1260,12 +1209,6 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" -[[package]] -name = "fallible-streaming-iterator" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" - [[package]] name = "fast-float" version = "0.2.0" @@ -1544,12 +1487,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "hash_hasher" -version = "2.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74721d007512d0cb3338cd20f0654ac913920061a4c4d0d8708edb3f2a698c0c" - [[package]] name = "hashbrown" version = "0.12.3" @@ -2051,15 +1988,6 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" -[[package]] -name = "itertools" -version = "0.10.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.12.1" @@ -2109,15 +2037,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "json-deserializer" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f63b421e16eb4100beb677af56f0b4f3a4f08bab74ef2af079ce5bb92c2683f" -dependencies = [ - "indexmap 1.9.3", -] - [[package]] name = "json5" version = "0.4.1" @@ -2144,41 +2063,17 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" -[[package]] -name = "lexical-core" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cde5de06e8d4c2faabc400238f9ae1c74d5412d03a7bd067645ccbc47070e46" -dependencies = [ - "lexical-parse-float 0.8.5", - "lexical-parse-integer 0.8.6", - "lexical-util 0.8.5", - "lexical-write-float 0.8.5", - "lexical-write-integer 0.8.5", -] - [[package]] name = "lexical-core" version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b765c31809609075565a70b4b71402281283aeda7ecaf4818ac14a7b2ade8958" dependencies = [ - "lexical-parse-float 1.0.5", - "lexical-parse-integer 1.0.5", - "lexical-util 1.0.6", - "lexical-write-float 1.0.5", - "lexical-write-integer 1.0.5", -] - -[[package]] -name = "lexical-parse-float" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "683b3a5ebd0130b8fb52ba0bdc718cc56815b6a097e28ae5a6997d0ad17dc05f" -dependencies = [ - "lexical-parse-integer 0.8.6", - "lexical-util 0.8.5", - "static_assertions", + "lexical-parse-float", + "lexical-parse-integer", + "lexical-util", + "lexical-write-float", + "lexical-write-integer", ] [[package]] @@ -2187,18 +2082,8 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de6f9cb01fb0b08060209a057c048fcbab8717b4c1ecd2eac66ebfe39a65b0f2" dependencies = [ - "lexical-parse-integer 1.0.5", - "lexical-util 1.0.6", - "static_assertions", -] - -[[package]] -name = "lexical-parse-integer" -version = "0.8.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d0994485ed0c312f6d965766754ea177d07f9c00c9b82a5ee62ed5b47945ee9" -dependencies = [ - "lexical-util 0.8.5", + "lexical-parse-integer", + "lexical-util", "static_assertions", ] @@ -2208,16 +2093,7 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72207aae22fc0a121ba7b6d479e42cbfea549af1479c3f3a4f12c70dd66df12e" dependencies = [ - "lexical-util 1.0.6", - "static_assertions", -] - -[[package]] -name = "lexical-util" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5255b9ff16ff898710eb9eb63cb39248ea8a5bb036bea8085b1a767ff6c4e3fc" -dependencies = [ + "lexical-util", "static_assertions", ] @@ -2230,35 +2106,14 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "lexical-write-float" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accabaa1c4581f05a3923d1b4cfd124c329352288b7b9da09e766b0668116862" -dependencies = [ - "lexical-util 0.8.5", - "lexical-write-integer 0.8.5", - "static_assertions", -] - [[package]] name = "lexical-write-float" version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5afc668a27f460fb45a81a757b6bf2f43c2d7e30cb5a2dcd3abf294c78d62bd" dependencies = [ - "lexical-util 1.0.6", - "lexical-write-integer 1.0.5", - "static_assertions", -] - -[[package]] -name = "lexical-write-integer" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1b6f3d1f4422866b68192d62f77bc5c700bee84f3069f2469d7bc8c77852446" -dependencies = [ - "lexical-util 0.8.5", + "lexical-util", + "lexical-write-integer", "static_assertions", ] @@ -2268,7 +2123,7 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "629ddff1a914a836fb245616a7888b62903aae58fa771e1d83943035efa0f978" dependencies = [ - "lexical-util 1.0.6", + "lexical-util", "static_assertions", ] @@ -2779,28 +2634,6 @@ dependencies = [ "windows-targets 0.48.5", ] -[[package]] -name = "parquet-format-safe" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1131c54b167dd4e4799ce762e1ab01549ebb94d5bdd13e6ec1b467491c378e1f" -dependencies = [ - "async-trait", - "futures", -] - -[[package]] -name = "parquet2" -version = "0.17.1" -source = "git+https://github.com/WallarooLabs/parquet2?branch=more-writer-details#431fd7ab06e89eedab0e86e4601329de92624ce0" -dependencies = [ - "async-stream", - "futures", - "parquet-format-safe", - "seq-macro", - "streaming-decompression", -] - [[package]] name = "paste" version = "1.0.14" @@ -2943,7 +2776,7 @@ dependencies = [ [[package]] name = "plateau" -version = "0.5.15" +version = "0.6.0" dependencies = [ "chrono", "plateau-server", @@ -2953,40 +2786,7 @@ dependencies = [ [[package]] name = "plateau-catalog" -version = "0.5.15" -dependencies = [ - "anyhow", - "axum", - "bytes", - "bytesize", - "chrono", - "config", - "futures", - "humantime-serde", - "metrics", - "metrics-exporter-prometheus", - "plateau-client", - "plateau-data", - "plateau-test", - "plateau-transport", - "rand 0.9.2", - "reqwest", - "serde", - "serde_json", - "sqlx", - "systemstat", - "tempfile", - "test-log", - "thiserror 2.0.12", - "tokio", - "tokio-stream", - "tracing", - "uuid", -] - -[[package]] -name = "plateau-catalog-arrow-rs" -version = "0.5.15" +version = "0.6.0" dependencies = [ "anyhow", "arrow", @@ -3007,10 +2807,10 @@ dependencies = [ "humantime-serde", "metrics", "metrics-exporter-prometheus", - "plateau-client-arrow-rs", - "plateau-data-arrow-rs", - "plateau-test-arrow-rs", - "plateau-transport-arrow-rs", + "plateau-client", + "plateau-data", + "plateau-test", + "plateau-transport", "rand 0.9.2", "reqwest", "serde", @@ -3028,7 +2828,7 @@ dependencies = [ [[package]] name = "plateau-cli" -version = "0.5.15" +version = "0.6.0" dependencies = [ "anyhow", "clap", @@ -3043,7 +2843,7 @@ dependencies = [ [[package]] name = "plateau-client" -version = "0.5.15" +version = "0.6.0" dependencies = [ "anyhow", "async-trait", @@ -3061,33 +2861,6 @@ dependencies = [ "serde", "serde_json", "serde_qs", - "thiserror 2.0.12", - "tokio", - "tokio-util", - "tracing", - "tracing-subscriber", - "url", -] - -[[package]] -name = "plateau-client-arrow-rs" -version = "0.5.15" -dependencies = [ - "anyhow", - "async-trait", - "backoff", - "bytes", - "futures", - "httptest", - "parking_lot 0.11.2", - "plateau-server", - "plateau-test", - "plateau-transport-arrow-rs", - "polars", - "reqwest", - "serde", - "serde_json", - "serde_qs", "thiserror 1.0.63", "tokio", "tokio-util", @@ -3098,39 +2871,7 @@ dependencies = [ [[package]] name = "plateau-data" -version = "0.5.15" -dependencies = [ - "anyhow", - "bytes", - "bytesize", - "chrono", - "config", - "futures", - "humantime-serde", - "itertools 0.13.0", - "parquet-format-safe", - "parquet2", - "plateau-client", - "plateau-test", - "plateau-transport", - "reqwest", - "sample-arrow2", - "sample-std", - "sample-test", - "serde", - "serde_json", - "tempfile", - "test-log", - "thiserror 2.0.12", - "tokio", - "tokio-stream", - "tracing", - "uuid", -] - -[[package]] -name = "plateau-data-arrow-rs" -version = "0.5.15" +version = "0.6.0" dependencies = [ "anyhow", "arrow", @@ -3149,9 +2890,9 @@ dependencies = [ "futures", "humantime-serde", "itertools 0.13.0", - "plateau-client-arrow-rs", - "plateau-test-arrow-rs", - "plateau-transport-arrow-rs", + "plateau-client", + "plateau-test", + "plateau-transport", "reqwest", "sample-arrow-rs", "sample-std", @@ -3169,43 +2910,7 @@ dependencies = [ [[package]] name = "plateau-server" -version = "0.5.15" -dependencies = [ - "anyhow", - "axum", - "bytes", - "bytesize", - "chrono", - "config", - "futures", - "humantime-serde", - "metrics", - "metrics-exporter-prometheus", - "plateau-catalog", - "plateau-client", - "plateau-data", - "plateau-test", - "plateau-transport", - "reqwest", - "serde", - "serde_json", - "serde_qs", - "tempfile", - "test-log", - "thiserror 2.0.12", - "tokio", - "tokio-stream", - "toml 0.7.8", - "tower-http", - "tracing", - "utoipa", - "utoipa-swagger-ui", - "uuid", -] - -[[package]] -name = "plateau-server-arrow-rs" -version = "0.5.15" +version = "0.6.0" dependencies = [ "anyhow", "arrow", @@ -3226,11 +2931,11 @@ dependencies = [ "humantime-serde", "metrics", "metrics-exporter-prometheus", - "plateau-catalog-arrow-rs", - "plateau-client-arrow-rs", - "plateau-data-arrow-rs", - "plateau-test-arrow-rs", - "plateau-transport-arrow-rs", + "plateau-catalog", + "plateau-client", + "plateau-data", + "plateau-test", + "plateau-transport", "reqwest", "serde", "serde_json", @@ -3250,7 +2955,7 @@ dependencies = [ [[package]] name = "plateau-test" -version = "0.5.15" +version = "0.6.0" dependencies = [ "anyhow", "chrono", @@ -3258,19 +2963,6 @@ dependencies = [ "plateau-server", "plateau-transport", "tempfile", - "tokio", -] - -[[package]] -name = "plateau-test-arrow-rs" -version = "0.5.15" -dependencies = [ - "anyhow", - "chrono", - "plateau-client-arrow-rs", - "plateau-server-arrow-rs", - "plateau-transport-arrow-rs", - "tempfile", "test-log", "tokio", "tracing", @@ -3278,24 +2970,7 @@ dependencies = [ [[package]] name = "plateau-transport" -version = "0.5.15" -dependencies = [ - "anyhow", - "arrow2", - "chrono", - "clap", - "regex", - "rweb", - "serde", - "serde_with", - "strum 0.26.2", - "thiserror 2.0.12", - "utoipa", -] - -[[package]] -name = "plateau-transport-arrow-rs" -version = "0.5.15" +version = "0.6.0" dependencies = [ "anyhow", "arrow", @@ -3688,29 +3363,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "prost" -version = "0.11.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" -dependencies = [ - "bytes", - "prost-derive", -] - -[[package]] -name = "prost-derive" -version = "0.11.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" -dependencies = [ - "anyhow", - "itertools 0.10.5", - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "quanta" version = "0.12.2" @@ -4293,16 +3945,6 @@ dependencies = [ "sample-std", ] -[[package]] -name = "sample-arrow2" -version = "0.17.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "502b30097ae5cc57ee8359bb59d8af349db022492de04596119d83f561ab8977" -dependencies = [ - "arrow2", - "sample-std", -] - [[package]] name = "sample-std" version = "0.2.1" @@ -4399,12 +4041,6 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca" -[[package]] -name = "seq-macro" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" - [[package]] name = "serde" version = "1.0.228" @@ -4887,15 +4523,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" -[[package]] -name = "streaming-decompression" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf6cc3b19bfb128a8ad11026086e31d3ce9ad23f8ea37354b31383a187c44cf3" -dependencies = [ - "fallible-streaming-iterator", -] - [[package]] name = "streaming-iterator" version = "0.1.9" diff --git a/Cargo.toml b/Cargo.toml index b922bef..bccb2b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,17 +10,11 @@ members = [ "bench", "plateau", "test", - "arrow-rs/transport", - "arrow-rs/client", - "arrow-rs/test", - "arrow-rs/data", - "arrow-rs/catalog", - "arrow-rs/server" ] [workspace.package] -version = "0.5.15" +version = "0.6.0" edition = "2021" authors = ["Wallaroo Labs"] repository = "https://github.com/WallarooLabs/plateau" @@ -49,22 +43,11 @@ plateau-test = { path = "./test" } plateau-transport = { path = "./transport" } plateau = { path = "./plateau" } -plateau-catalog-arrow-rs = { path = "./arrow-rs/catalog" } -plateau-client-arrow-rs = { path = "./arrow-rs/client" } -plateau-data-arrow-rs = { path = "./arrow-rs/data" } -plateau-server-arrow-rs = { path = "./arrow-rs/server" } -plateau-test-arrow-rs = { path = "./arrow-rs/test" } -plateau-transport-arrow-rs = { path = "./arrow-rs/transport" } - [profile.bench] debug = true -[patch.crates-io] -arrow2 = { git = "https://github.com/WallarooLabs/arrow2", branch = "fmurphy/bugfix-records-parsing" } -parquet2 = { git = "https://github.com/WallarooLabs/parquet2", branch = "more-writer-details" } - [workspace.lints.clippy] use_self = "warn" diff --git a/bench/Cargo.toml b/bench/Cargo.toml index 1c55a2b..597d7ce 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -14,7 +14,13 @@ serde = { version = "1", features = ["derive"] } humantime-serde = "1" sample-std = "0.2.1" -sample-arrow2 = "0.17.2" +sample-arrow-rs = "55.2.0" + +arrow-ipc = "55.2.0" +arrow-array = "55.2.0" +arrow-schema = "55.2.0" + +plateau-transport.workspace = true futures = "0.3" hdrhistogram = "7.5" diff --git a/bench/src/load.rs b/bench/src/load.rs index d6a95a4..79c2e51 100644 --- a/bench/src/load.rs +++ b/bench/src/load.rs @@ -1,14 +1,17 @@ use std::fs::File; use std::path::Path; +use std::sync::Arc; use std::time::{Duration, SystemTime}; -use arrow2::io::ipc; -use plateau_client::arrow2::chunk::Chunk; -use plateau_client::arrow2::datatypes::{DataType, Field}; -use plateau_client::{arrow2, ArrowSchema, MultiChunk}; -use sample_arrow2::array::sampler_from_example; -use sample_arrow2::primitive::primitive_len_sampler; -use sample_arrow2::{AlwaysValid, SetLen}; +use arrow_array::types::Int64Type; +use arrow_array::RecordBatch; +use arrow_ipc::reader::FileReader; +use arrow_schema::{DataType, Field}; +use plateau_client::MultiChunk; +use plateau_transport::arrow_schema::SchemaRef; +use sample_arrow_rs::array::sampler_from_example; +use sample_arrow_rs::primitive::primitive_len_sampler; +use sample_arrow_rs::{AlwaysValid, SetLen}; use sample_std::{sample_all, Random, Sample, TryConvert}; use crate::{HealthCheckJob, IteratorJob, SummarizerJob, WorkerConfig, WorkerTask, WriterJob}; @@ -37,35 +40,59 @@ impl Sample for Now { } pub fn build_sampler(path: &Path) -> anyhow::Result { - let mut file = File::open(path)?; + let file = File::open(path)?; + let mut reader = FileReader::try_new(file, None)?; + let schema: SchemaRef = reader.schema().clone(); - let metadata = ipc::read::read_file_metadata(&mut file)?; - let schema = metadata.schema.clone(); - let mut reader = ipc::read::FileReader::new(&mut file, metadata, None, None); - let result = reader.next().ok_or_else(|| anyhow::anyhow!("no data"))?; - let chunk = result?; + let batch: RecordBatch = reader.next().ok_or_else(|| anyhow::anyhow!("no data"))??; - let metadata = schema.metadata; + let metadata = schema.metadata().clone(); - let mut samplers = vec![primitive_len_sampler(Now, AlwaysValid)]; + let mut samplers = vec![primitive_len_sampler::<_, _, Int64Type>(Now, AlwaysValid)]; let without_time = schema - .fields + .fields() .iter() - .zip(chunk.into_arrays()) - .filter_map(|(f, array)| if f.name == "time" { None } else { Some(array) }); + .zip(batch.columns().iter()) + .filter_map(|(f, array)| { + if f.name() == "time" { + None + } else { + Some(array.clone()) + } + }); samplers.extend(without_time.map(|array| sampler_from_example(array.as_ref()))); - let mut fields = vec![Field::new("time", DataType::Int64, false)]; - fields.extend(schema.fields.into_iter().filter(|f| f.name != "time")); + let mut fields: Vec> = vec![Arc::new(Field::new("time", DataType::Int64, false))]; + fields.extend( + schema + .fields() + .iter() + .filter(|f| f.name() != "time") + .cloned(), + ); - let schema = ArrowSchema { fields, metadata }; + let schema_metadata = metadata; + let field_names: Vec = fields.iter().map(|f| f.name().to_string()).collect(); Ok(Box::new(sample_all(samplers).try_convert( - move |arrays| MultiChunk { - schema: schema.clone(), - chunks: [Chunk::new(arrays)].into(), + move |arrays| { + // Build schema from actual array types to handle nullability differences + let actual_fields: Vec> = field_names + .iter() + .zip(arrays.iter()) + .map(|(name, array)| Arc::new(Field::new(name, array.data_type().clone(), false))) + .collect(); + let schema: SchemaRef = Arc::new(arrow_schema::Schema::new_with_metadata( + arrow_schema::Fields::from(actual_fields), + schema_metadata.clone(), + )); + let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap(); + MultiChunk { + schema, + chunks: [batch].into(), + } }, |_| None, ))) diff --git a/catalog/Cargo.toml b/catalog/Cargo.toml index af81453..b0ef97a 100644 --- a/catalog/Cargo.toml +++ b/catalog/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "plateau-catalog-arrow-rs" +name = "plateau-catalog" description = "Index of all stored segments in plateau" version.workspace = true @@ -51,9 +51,9 @@ arrow-json = "55.2.0" arrow-ipc = "55.2.0" # Use arrow-rs versions of dependencies -plateau-data-arrow-rs = { path = "../data" } -plateau-client-arrow-rs = { path = "../client" } -plateau-transport-arrow-rs = { path = "../transport" } +plateau-data.workspace = true +plateau-client.workspace = true +plateau-transport.workspace = true [dev-dependencies] @@ -64,8 +64,8 @@ uuid = { version = "1.10", features = ["v4"] } reqwest.workspace = true # Use arrow-rs versions for testing -plateau-client-arrow-rs = { path = "../client" } -plateau-test-arrow-rs = { path = "../test" } +plateau-client.workspace = true +plateau-test.workspace = true [lints] diff --git a/catalog/src/lib.rs b/catalog/src/lib.rs index 5b37bc6..838423d 100644 --- a/catalog/src/lib.rs +++ b/catalog/src/lib.rs @@ -24,12 +24,12 @@ pub mod partition; pub mod slog; pub mod topic; -pub use plateau_client_arrow_rs as client; -pub use plateau_data_arrow_rs as data; -pub use plateau_transport_arrow_rs as transport; +pub use plateau_client as client; +pub use plateau_data as data; +pub use plateau_transport as transport; #[cfg(test)] -pub use plateau_test_arrow_rs as test; +pub use plateau_test as test; pub use catalog::{Catalog, Config}; pub use reconcile::{ReconcileConfig, ReconcileJob, ReconcileStats}; diff --git a/cli/src/display.rs b/cli/src/display.rs index ee1b615..6e7feb9 100644 --- a/cli/src/display.rs +++ b/cli/src/display.rs @@ -2,7 +2,7 @@ use std::{collections::HashMap, fmt::Display}; -use plateau_client::{ArrowSchema, SchemaChunk}; +use plateau_client::{SchemaChunk, SchemaRef}; use plateau_transport::{Inserted, Partitions, RecordStatus, Records, TopicIterationReply, Topics}; pub trait CliDisplay { @@ -116,11 +116,11 @@ impl CliDisplay for Inserted { } } -impl CliDisplay for SchemaChunk { +impl CliDisplay for SchemaChunk { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { for field in &self.schema.fields { - write!(f, "\x1b[1m{}:\x1b[0m ", field.name)?; - match self.get_array([field.name.as_ref()]) { + write!(f, "\x1b[1m{}:\x1b[0m ", field.name())?; + match self.get_array([field.name().as_ref()]) { Ok(arr) => writeln!(f, "{arr:?}"), Err(_) => writeln!(f, "unknown type (not an array)"), }?; diff --git a/cli/src/main.rs b/cli/src/main.rs index 90a742b..9f1521c 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -11,7 +11,7 @@ use tokio::{fs::File, io::AsyncWriteExt}; use tokio_util::io::ReaderStream; use plateau_client::{ - localhost, ArrowSchema, ArrowStream, Client, Iterate, Retrieve, SchemaChunk, SizedArrowStream, + localhost, ArrowStream, Client, Iterate, Retrieve, SchemaChunk, SchemaRef, SizedArrowStream, }; mod display; @@ -223,7 +223,7 @@ async fn make_request(client: &Client, cmd: Command) -> Result<(), Error> { } OutputFormat::ArrowStdout => { params.data_focus.dataset_separator = Some(".".to_owned()); - let mut response: Vec> = client + let mut response: Vec> = client .get_records(topic_name, partition_name, ¶ms) .await?; diff --git a/client/Cargo.toml b/client/Cargo.toml index 99a30a2..168b0aa 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "plateau-client-arrow-rs" +name = "plateau-client" version.workspace = true edition.workspace = true @@ -29,8 +29,9 @@ polars = { version = "0.36.2", optional = true, features = [ "ipc", "dtype-full", ] } +hashbrown = { version = "0.14", optional = true, features = ["raw"] } -plateau-transport-arrow-rs = { path = "../../arrow-rs/transport" } +plateau-transport.workspace = true [dev-dependencies] @@ -49,9 +50,9 @@ plateau-test.workspace = true [features] batch = ["dep:tokio"] health = ["dep:tokio"] -polars = ["dep:polars"] +polars = ["dep:polars", "dep:hashbrown"] replicate = ["dep:backoff", "dep:tokio"] -rweb = ["plateau-transport-arrow-rs/rweb"] +rweb = ["plateau-transport/rweb"] [lints] diff --git a/client/src/batch.rs b/client/src/batch.rs index b317d3b..bc17e4e 100644 --- a/client/src/batch.rs +++ b/client/src/batch.rs @@ -17,7 +17,7 @@ use tokio::sync::mpsc; use tokio::task; use tracing::{error, warn}; -use plateau_transport_arrow_rs as transport; +use plateau_transport as transport; use transport::InsertQuery; use crate::{Client, Error as ClientError, Insertion, MaxRequestSize}; diff --git a/client/src/lib.rs b/client/src/lib.rs index 9621133..6b19fc3 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -7,7 +7,7 @@ use std::{pin::Pin, str::FromStr}; use async_trait::async_trait; use bytes::Bytes; use futures::{TryStream, TryStreamExt}; -use plateau_transport_arrow_rs as transport; +use plateau_transport as transport; pub use reqwest; use reqwest::Body; use reqwest::{ @@ -16,16 +16,19 @@ use reqwest::{ }; use thiserror::Error; use tracing::{trace, warn}; +pub use transport::arrow_schema::SchemaRef; use transport::headers::ITERATION_STATUS_HEADER; #[cfg(feature = "polars")] use transport::RecordStatus; use transport::CONTENT_TYPE_JSON; use transport::{ arrow_ipc, - arrow_schema::{ArrowError, Schema, SchemaRef}, - Insert, InsertQuery, Inserted, MultiChunk, Partitions, RecordQuery, Records, SchemaChunk, - TopicIterationQuery, TopicIterationReply, TopicIterationStatus, TopicIterator, Topics, - CONTENT_TYPE_ARROW, + arrow_schema::{ArrowError, Schema}, + Insert, Inserted, Partitions, RecordQuery, Records, TopicIterator, Topics, CONTENT_TYPE_ARROW, +}; +pub use transport::{ + InsertQuery, MultiChunk, SchemaChunk, TopicIterationQuery, TopicIterationReply, + TopicIterationStatus, }; #[cfg(feature = "health")] @@ -93,10 +96,10 @@ pub const DEFAULT_MAX_BATCH_BYTES: usize = 10240000; /// Plateau client. Creation options: /// ``` /// // Client pointed at 'localhost:3030'. -/// let client = plateau_client_arrow_rs::Client::default(); +/// let client = plateau_client::Client::default(); /// /// // Client pointed at an alternate URL. -/// let client = plateau_client_arrow_rs::Client::new("plateau.my-wallaroo-cluster.dev:1234"); +/// let client = plateau_client::Client::new("plateau.my-wallaroo-cluster.dev:1234"); /// ``` #[derive(Debug, Clone)] pub struct Client { @@ -993,7 +996,7 @@ mod tests { Expectation, Server, }; use plateau_server::{http, Config as PlateauConfig}; - use plateau_transport_arrow_rs::{DataFocus, RecordStatus, Span, Topic, TopicIterationOrder}; + use plateau_transport::{DataFocus, RecordStatus, Span, Topic, TopicIterationOrder}; use tokio_util::io::ReaderStream; use super::*; diff --git a/client/src/replicate.rs b/client/src/replicate.rs index 3f48191..0fdcbc0 100644 --- a/client/src/replicate.rs +++ b/client/src/replicate.rs @@ -13,7 +13,7 @@ use std::collections::{BTreeMap, HashMap}; use std::fmt; use std::sync::Arc; -use plateau_transport_arrow_rs as transport; +use plateau_transport as transport; use serde::{Deserialize, Serialize}; use transport::{MultiChunk, PartitionId, RecordQuery}; diff --git a/data/Cargo.toml b/data/Cargo.toml index 7b53d4b..40e0c89 100644 --- a/data/Cargo.toml +++ b/data/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "plateau-data-arrow-rs" +name = "plateau-data" description = "Data processing for the plateau server" version.workspace = true @@ -44,8 +44,8 @@ arrow-ipc = "55.2.0" thiserror.workspace = true # Use arrow-rs versions of transport and client -plateau-transport-arrow-rs = { path = "../transport" } -plateau-client-arrow-rs = { path = "../client" } +plateau-transport.workspace = true +plateau-client.workspace = true [dev-dependencies] @@ -61,8 +61,8 @@ sample-arrow-rs = "55.2.0" reqwest.workspace = true # Use arrow-rs versions for testing -plateau-client-arrow-rs = { path = "../client" } -plateau-test-arrow-rs = { path = "../test" } +plateau-client.workspace = true +plateau-test.workspace = true [lints] diff --git a/data/src/chunk.rs b/data/src/chunk.rs index 06b23ee..d3efd92 100644 --- a/data/src/chunk.rs +++ b/data/src/chunk.rs @@ -3,7 +3,7 @@ use arrow_array::{Array, ArrayRef, BooleanArray, Int32Array, Int64Array, RecordB pub use arrow_schema::Schema; use arrow_select::filter::filter_record_batch; use chrono::{DateTime, TimeZone, Utc}; -use plateau_transport_arrow_rs::{ChunkError, SchemaChunk, SegmentChunk}; +use plateau_transport::{ChunkError, SchemaChunk, SegmentChunk}; use std::borrow::Borrow; use std::ops::RangeInclusive; use std::sync::Arc; @@ -294,7 +294,7 @@ pub fn slice(chunk: SegmentChunk, offset: usize, len: usize) -> SegmentChunk { pub mod test { use super::*; use crate::transport::estimate_size; - use plateau_test_arrow_rs as test_arrow_rs; + use plateau_test as test_arrow_rs; use test_arrow_rs::{inferences_nested, inferences_schema_a, inferences_schema_b}; /* diff --git a/data/src/index.rs b/data/src/index.rs index 802752e..069e93c 100644 --- a/data/src/index.rs +++ b/data/src/index.rs @@ -1,6 +1,6 @@ use std::ops; -use plateau_transport_arrow_rs::TopicIterationOrder; +use plateau_transport::TopicIterationOrder; /// Each record also has a global unique sequential index #[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd)] diff --git a/data/src/lib.rs b/data/src/lib.rs index 7abf5a0..0ec4cab 100644 --- a/data/src/lib.rs +++ b/data/src/lib.rs @@ -9,7 +9,7 @@ pub mod records; pub mod segment; // Use the arrow-rs versions of plateau crates -pub use plateau_client_arrow_rs as client; +pub use plateau_client as client; // Import Arrow modules pub use arrow; pub use arrow_array; @@ -20,7 +20,7 @@ pub use arrow_ipc; pub use arrow_json; pub use arrow_schema; pub use arrow_select; -pub use plateau_transport_arrow_rs as transport; +pub use plateau_transport as transport; // Adding explicit type re-exports for arrow crates to make migration easier pub use arrow_array::Array; @@ -33,7 +33,7 @@ pub use arrow_schema::Schema; pub use arrow_schema::SchemaRef; #[cfg(test)] -pub use plateau_test_arrow_rs as test; +pub use plateau_test as test; pub use chunk::IndexedChunk; pub use index::{Ordering, RecordIndex}; diff --git a/data/src/segment.rs b/data/src/segment.rs index 9d99f3b..a776be4 100644 --- a/data/src/segment.rs +++ b/data/src/segment.rs @@ -25,7 +25,7 @@ use tracing::{error, trace, warn}; // Use arrow-rs Schema instead of arrow2 Schema use arrow_schema::Schema; -use plateau_transport_arrow_rs::SegmentChunk; +use plateau_transport::SegmentChunk; #[allow(dead_code)] mod arrow; @@ -383,7 +383,7 @@ pub mod test { use super::*; use crate::test::inferences_schema_a; // Use arrow-rs transport - use plateau_transport_arrow_rs as transport; + use plateau_transport as transport; use sample_arrow_rs::{ array::ArbitraryArray, chunk::ArbitraryChunk, diff --git a/data/src/segment/arrow.rs b/data/src/segment/arrow.rs index ee7a01b..e7e9f7d 100644 --- a/data/src/segment/arrow.rs +++ b/data/src/segment/arrow.rs @@ -20,7 +20,7 @@ use arrow_ipc::{ writer::{FileWriter, IpcWriteOptions}, }; use arrow_schema::Schema; -use plateau_transport_arrow_rs::SegmentChunk; +use plateau_transport::SegmentChunk; use tracing::{error, trace, warn}; use super::{cache, SegmentIterator}; @@ -301,7 +301,7 @@ impl SegmentIterator for Reader { pub mod test { use std::collections::HashMap; // Fix imports to use arrow-rs versions - use plateau_transport_arrow_rs as transport; + use plateau_transport as transport; use transport::SchemaChunk; // Use sample-arrow-rs for property-based testing use crate::segment::test::deep_chunk; diff --git a/data/src/segment/cache.rs b/data/src/segment/cache.rs index f63253b..33ddd12 100644 --- a/data/src/segment/cache.rs +++ b/data/src/segment/cache.rs @@ -20,7 +20,7 @@ use arrow_ipc::{ writer::{IpcWriteOptions, StreamWriter}, }; use arrow_schema::Schema; -use plateau_transport_arrow_rs::{SchemaChunk, SegmentChunk}; +use plateau_transport::{SchemaChunk, SegmentChunk}; use tracing::{debug, error, trace, warn}; use super::{validate_header, PLATEAU_HEADER}; diff --git a/server/Cargo.toml b/server/Cargo.toml index 45bfd87..ef22650 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "plateau-server-arrow-rs" +name = "plateau-server" description = "A low-profile event and log aggregator" version.workspace = true @@ -33,10 +33,10 @@ utoipa-swagger-ui = { version = "4", features = ["axum"] } chrono.workspace = true thiserror.workspace = true -plateau-catalog-arrow-rs.workspace = true -plateau-client-arrow-rs = { workspace = true, features = ["replicate"] } -plateau-data-arrow-rs.workspace = true -plateau-transport-arrow-rs.workspace = true +plateau-catalog.workspace = true +plateau-client = { workspace = true, features = ["replicate"] } +plateau-data.workspace = true +plateau-transport.workspace = true # Arrow-rs dependencies arrow = "55.2.0" @@ -57,8 +57,8 @@ uuid = { version = "1.10", features = ["v4"] } reqwest.workspace = true -plateau-client-arrow-rs.workspace = true -plateau-test-arrow-rs.workspace = true +plateau-client.workspace = true +plateau-test.workspace = true [lints] diff --git a/server/src/lib.rs b/server/src/lib.rs index 74b5d82..74e5537 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -14,11 +14,11 @@ pub mod metrics; pub mod replication; // Re-export plateau modules at the top level -pub use plateau_catalog_arrow_rs as catalog; -pub use plateau_client_arrow_rs as client; -pub use plateau_data_arrow_rs as data; -pub use plateau_transport_arrow_rs as transport; -// Re-export arrow from plateau_transport_arrow_rs +pub use plateau_catalog as catalog; +pub use plateau_client as client; +pub use plateau_data as data; +pub use plateau_transport as transport; +// Re-export arrow from plateau_transport pub use transport::arrow; // Re-export commonly used types from the modules @@ -27,7 +27,7 @@ pub use catalog::Catalog; pub use data::DEFAULT_BYTE_LIMIT; #[cfg(test)] -pub use plateau_test_arrow_rs as test; +pub use plateau_test as test; /// Future that resolves when an exit signal (SIGINT / SIGTERM / SIGQUIT) is /// received. diff --git a/server/tests/server.rs b/server/tests/server.rs index 1e6b8bd..4f9033a 100644 --- a/server/tests/server.rs +++ b/server/tests/server.rs @@ -11,7 +11,7 @@ use serde_json as json; use test_log::tracing_subscriber::{fmt, EnvFilter}; use tracing::trace; -use plateau_server_arrow_rs as plateau; +use plateau_server as plateau; use plateau::client::{Error as ClientError, Iterate, PandasRecordIteration, Retrieve}; use plateau::data::chunk::{RecordBatchExt, Schema}; @@ -27,9 +27,9 @@ use plateau::transport::{ use plateau::Config as PlateauConfig; use plateau::{catalog, catalog::partition, data, data::limit, http}; -use plateau_test_arrow_rs::http::TestServer; -use plateau_test_arrow_rs::inferences_large_extension; -use plateau_test_arrow_rs::{inferences_schema_a, inferences_schema_b}; +use plateau_test::http::TestServer; +use plateau_test::inferences_large_extension; +use plateau_test::{inferences_schema_a, inferences_schema_b}; #[allow(clippy::manual_repeat_n)] async fn repeat_append(client: &Client, url: &str, body: &str, count: usize) { diff --git a/test/Cargo.toml b/test/Cargo.toml index 8c4c0c1..240dffc 100644 --- a/test/Cargo.toml +++ b/test/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "plateau-test-arrow-rs" +name = "plateau-test" version.workspace = true edition.workspace = true authors.workspace = true @@ -13,9 +13,9 @@ tokio = "1.38" chrono.workspace = true -plateau-client-arrow-rs = { path = "../client" } -plateau-server-arrow-rs = { path = "../server" } -plateau-transport-arrow-rs = { path = "../transport" } +plateau-client.workspace = true +plateau-server.workspace = true +plateau-transport.workspace = true [dev-dependencies] diff --git a/test/src/http.rs b/test/src/http.rs index b1a7f10..2f03109 100644 --- a/test/src/http.rs +++ b/test/src/http.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use tempfile::tempdir; use tokio::sync::oneshot; -use plateau_server_arrow_rs::{http, Catalog, Config}; +use plateau_server::{http, Catalog, Config}; /// A RAII wrapper around a full plateau test server. /// @@ -51,7 +51,7 @@ impl TestServer { tokio::spawn(server); if let Some(replication) = replication { - tokio::spawn(plateau_server_arrow_rs::replication::run(replication, addr)); + tokio::spawn(plateau_server::replication::run(replication, addr)); } Ok(Self { @@ -79,7 +79,7 @@ impl TestServer { Catalog::close_arc(self.stop().await).await; } - pub fn client(&self) -> anyhow::Result { - plateau_client_arrow_rs::Client::new(&self.base()).map_err(Into::into) + pub fn client(&self) -> anyhow::Result { + plateau_client::Client::new(&self.base()).map_err(Into::into) } } diff --git a/test/src/lib.rs b/test/src/lib.rs index d211c91..537d416 100644 --- a/test/src/lib.rs +++ b/test/src/lib.rs @@ -1,6 +1,6 @@ -use plateau_transport_arrow_rs as transport; -use plateau_transport_arrow_rs::arrow_schema::extension::EXTENSION_TYPE_METADATA_KEY; -use plateau_transport_arrow_rs::arrow_schema::extension::EXTENSION_TYPE_NAME_KEY; +use plateau_transport as transport; +use plateau_transport::arrow_schema::extension::EXTENSION_TYPE_METADATA_KEY; +use plateau_transport::arrow_schema::extension::EXTENSION_TYPE_NAME_KEY; use std::sync::Arc; use transport::arrow_array::types::*; use transport::arrow_array::Array; diff --git a/transport/Cargo.toml b/transport/Cargo.toml index 6dd2acc..3370944 100644 --- a/transport/Cargo.toml +++ b/transport/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "plateau-transport-arrow-rs" +name = "plateau-transport" version.workspace = true edition.workspace = true