Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
451 changes: 39 additions & 412 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 1 addition & 18 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,11 @@ members = [
"bench",
"plateau",
"test",
"arrow-rs/transport",
"arrow-rs/client",
"arrow-rs/test",
"arrow-rs/data",
"arrow-rs/catalog",
"arrow-rs/server"
]


[workspace.package]
version = "0.5.15"
version = "0.6.0"
edition = "2021"
authors = ["Wallaroo Labs"]
repository = "https://github.com/WallarooLabs/plateau"
Expand Down Expand Up @@ -49,22 +43,11 @@ plateau-test = { path = "./test" }
plateau-transport = { path = "./transport" }
plateau = { path = "./plateau" }

plateau-catalog-arrow-rs = { path = "./arrow-rs/catalog" }
plateau-client-arrow-rs = { path = "./arrow-rs/client" }
plateau-data-arrow-rs = { path = "./arrow-rs/data" }
plateau-server-arrow-rs = { path = "./arrow-rs/server" }
plateau-test-arrow-rs = { path = "./arrow-rs/test" }
plateau-transport-arrow-rs = { path = "./arrow-rs/transport" }


[profile.bench]
debug = true


[patch.crates-io]
arrow2 = { git = "https://github.com/WallarooLabs/arrow2", branch = "fmurphy/bugfix-records-parsing" }
parquet2 = { git = "https://github.com/WallarooLabs/parquet2", branch = "more-writer-details" }


[workspace.lints.clippy]
use_self = "warn"
Expand Down
8 changes: 7 additions & 1 deletion bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ serde = { version = "1", features = ["derive"] }
humantime-serde = "1"

sample-std = "0.2.1"
sample-arrow2 = "0.17.2"
sample-arrow-rs = "55.2.0"

arrow-ipc = "55.2.0"
arrow-array = "55.2.0"
arrow-schema = "55.2.0"

plateau-transport.workspace = true

futures = "0.3"
hdrhistogram = "7.5"
Expand Down
75 changes: 51 additions & 24 deletions bench/src/load.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use std::fs::File;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use arrow2::io::ipc;
use plateau_client::arrow2::chunk::Chunk;
use plateau_client::arrow2::datatypes::{DataType, Field};
use plateau_client::{arrow2, ArrowSchema, MultiChunk};
use sample_arrow2::array::sampler_from_example;
use sample_arrow2::primitive::primitive_len_sampler;
use sample_arrow2::{AlwaysValid, SetLen};
use arrow_array::types::Int64Type;
use arrow_array::RecordBatch;
use arrow_ipc::reader::FileReader;
use arrow_schema::{DataType, Field};
use plateau_client::MultiChunk;
use plateau_transport::arrow_schema::SchemaRef;
use sample_arrow_rs::array::sampler_from_example;
use sample_arrow_rs::primitive::primitive_len_sampler;
use sample_arrow_rs::{AlwaysValid, SetLen};
use sample_std::{sample_all, Random, Sample, TryConvert};

use crate::{HealthCheckJob, IteratorJob, SummarizerJob, WorkerConfig, WorkerTask, WriterJob};
Expand Down Expand Up @@ -37,35 +40,59 @@ impl Sample for Now {
}

pub fn build_sampler(path: &Path) -> anyhow::Result<MultiChunkSampler> {
let mut file = File::open(path)?;
let file = File::open(path)?;
let mut reader = FileReader::try_new(file, None)?;
let schema: SchemaRef = reader.schema().clone();

let metadata = ipc::read::read_file_metadata(&mut file)?;
let schema = metadata.schema.clone();
let mut reader = ipc::read::FileReader::new(&mut file, metadata, None, None);
let result = reader.next().ok_or_else(|| anyhow::anyhow!("no data"))?;
let chunk = result?;
let batch: RecordBatch = reader.next().ok_or_else(|| anyhow::anyhow!("no data"))??;

let metadata = schema.metadata;
let metadata = schema.metadata().clone();

let mut samplers = vec![primitive_len_sampler(Now, AlwaysValid)];
let mut samplers = vec![primitive_len_sampler::<_, _, Int64Type>(Now, AlwaysValid)];

let without_time = schema
.fields
.fields()
.iter()
.zip(chunk.into_arrays())
.filter_map(|(f, array)| if f.name == "time" { None } else { Some(array) });
.zip(batch.columns().iter())
.filter_map(|(f, array)| {
if f.name() == "time" {
None
} else {
Some(array.clone())
}
});

samplers.extend(without_time.map(|array| sampler_from_example(array.as_ref())));

let mut fields = vec![Field::new("time", DataType::Int64, false)];
fields.extend(schema.fields.into_iter().filter(|f| f.name != "time"));
let mut fields: Vec<Arc<Field>> = vec![Arc::new(Field::new("time", DataType::Int64, false))];
fields.extend(
schema
.fields()
.iter()
.filter(|f| f.name() != "time")
.cloned(),
);

let schema = ArrowSchema { fields, metadata };
let schema_metadata = metadata;
let field_names: Vec<String> = fields.iter().map(|f| f.name().to_string()).collect();

Ok(Box::new(sample_all(samplers).try_convert(
move |arrays| MultiChunk {
schema: schema.clone(),
chunks: [Chunk::new(arrays)].into(),
move |arrays| {
// Build schema from actual array types to handle nullability differences
let actual_fields: Vec<Arc<Field>> = field_names
.iter()
.zip(arrays.iter())
.map(|(name, array)| Arc::new(Field::new(name, array.data_type().clone(), false)))
.collect();
let schema: SchemaRef = Arc::new(arrow_schema::Schema::new_with_metadata(
arrow_schema::Fields::from(actual_fields),
schema_metadata.clone(),
));
let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
MultiChunk {
schema,
chunks: [batch].into(),
}
},
|_| None,
)))
Expand Down
12 changes: 6 additions & 6 deletions catalog/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "plateau-catalog-arrow-rs"
name = "plateau-catalog"
description = "Index of all stored segments in plateau"

version.workspace = true
Expand Down Expand Up @@ -51,9 +51,9 @@ arrow-json = "55.2.0"
arrow-ipc = "55.2.0"

# Use arrow-rs versions of dependencies
plateau-data-arrow-rs = { path = "../data" }
plateau-client-arrow-rs = { path = "../client" }
plateau-transport-arrow-rs = { path = "../transport" }
plateau-data.workspace = true
plateau-client.workspace = true
plateau-transport.workspace = true


[dev-dependencies]
Expand All @@ -64,8 +64,8 @@ uuid = { version = "1.10", features = ["v4"] }
reqwest.workspace = true

# Use arrow-rs versions for testing
plateau-client-arrow-rs = { path = "../client" }
plateau-test-arrow-rs = { path = "../test" }
plateau-client.workspace = true
plateau-test.workspace = true


[lints]
Expand Down
8 changes: 4 additions & 4 deletions catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ pub mod partition;
pub mod slog;
pub mod topic;

pub use plateau_client_arrow_rs as client;
pub use plateau_data_arrow_rs as data;
pub use plateau_transport_arrow_rs as transport;
pub use plateau_client as client;
pub use plateau_data as data;
pub use plateau_transport as transport;

#[cfg(test)]
pub use plateau_test_arrow_rs as test;
pub use plateau_test as test;

pub use catalog::{Catalog, Config};
pub use reconcile::{ReconcileConfig, ReconcileJob, ReconcileStats};
8 changes: 4 additions & 4 deletions cli/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::{collections::HashMap, fmt::Display};

use plateau_client::{ArrowSchema, SchemaChunk};
use plateau_client::{SchemaChunk, SchemaRef};
use plateau_transport::{Inserted, Partitions, RecordStatus, Records, TopicIterationReply, Topics};

pub trait CliDisplay {
Expand Down Expand Up @@ -116,11 +116,11 @@ impl CliDisplay for Inserted {
}
}

impl CliDisplay for SchemaChunk<ArrowSchema> {
impl CliDisplay for SchemaChunk<SchemaRef> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for field in &self.schema.fields {
write!(f, "\x1b[1m{}:\x1b[0m ", field.name)?;
match self.get_array([field.name.as_ref()]) {
write!(f, "\x1b[1m{}:\x1b[0m ", field.name())?;
match self.get_array([field.name().as_ref()]) {
Ok(arr) => writeln!(f, "{arr:?}"),
Err(_) => writeln!(f, "unknown type (not an array)"),
}?;
Expand Down
4 changes: 2 additions & 2 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::{fs::File, io::AsyncWriteExt};
use tokio_util::io::ReaderStream;

use plateau_client::{
localhost, ArrowSchema, ArrowStream, Client, Iterate, Retrieve, SchemaChunk, SizedArrowStream,
localhost, ArrowStream, Client, Iterate, Retrieve, SchemaChunk, SchemaRef, SizedArrowStream,
};

mod display;
Expand Down Expand Up @@ -223,7 +223,7 @@ async fn make_request(client: &Client, cmd: Command) -> Result<(), Error> {
}
OutputFormat::ArrowStdout => {
params.data_focus.dataset_separator = Some(".".to_owned());
let mut response: Vec<SchemaChunk<ArrowSchema>> = client
let mut response: Vec<SchemaChunk<SchemaRef>> = client
.get_records(topic_name, partition_name, &params)
.await?;

Expand Down
9 changes: 5 additions & 4 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "plateau-client-arrow-rs"
name = "plateau-client"

version.workspace = true
edition.workspace = true
Expand Down Expand Up @@ -29,8 +29,9 @@ polars = { version = "0.36.2", optional = true, features = [
"ipc",
"dtype-full",
] }
hashbrown = { version = "0.14", optional = true, features = ["raw"] }

plateau-transport-arrow-rs = { path = "../../arrow-rs/transport" }
plateau-transport.workspace = true


[dev-dependencies]
Expand All @@ -49,9 +50,9 @@ plateau-test.workspace = true
[features]
batch = ["dep:tokio"]
health = ["dep:tokio"]
polars = ["dep:polars"]
polars = ["dep:polars", "dep:hashbrown"]
replicate = ["dep:backoff", "dep:tokio"]
rweb = ["plateau-transport-arrow-rs/rweb"]
rweb = ["plateau-transport/rweb"]


[lints]
Expand Down
2 changes: 1 addition & 1 deletion client/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio::sync::mpsc;
use tokio::task;
use tracing::{error, warn};

use plateau_transport_arrow_rs as transport;
use plateau_transport as transport;
use transport::InsertQuery;

use crate::{Client, Error as ClientError, Insertion, MaxRequestSize};
Expand Down
19 changes: 11 additions & 8 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{pin::Pin, str::FromStr};
use async_trait::async_trait;
use bytes::Bytes;
use futures::{TryStream, TryStreamExt};
use plateau_transport_arrow_rs as transport;
use plateau_transport as transport;
pub use reqwest;
use reqwest::Body;
use reqwest::{
Expand All @@ -16,16 +16,19 @@ use reqwest::{
};
use thiserror::Error;
use tracing::{trace, warn};
pub use transport::arrow_schema::SchemaRef;
use transport::headers::ITERATION_STATUS_HEADER;
#[cfg(feature = "polars")]
use transport::RecordStatus;
use transport::CONTENT_TYPE_JSON;
use transport::{
arrow_ipc,
arrow_schema::{ArrowError, Schema, SchemaRef},
Insert, InsertQuery, Inserted, MultiChunk, Partitions, RecordQuery, Records, SchemaChunk,
TopicIterationQuery, TopicIterationReply, TopicIterationStatus, TopicIterator, Topics,
CONTENT_TYPE_ARROW,
arrow_schema::{ArrowError, Schema},
Insert, Inserted, Partitions, RecordQuery, Records, TopicIterator, Topics, CONTENT_TYPE_ARROW,
};
pub use transport::{
InsertQuery, MultiChunk, SchemaChunk, TopicIterationQuery, TopicIterationReply,
TopicIterationStatus,
};

#[cfg(feature = "health")]
Expand Down Expand Up @@ -93,10 +96,10 @@ pub const DEFAULT_MAX_BATCH_BYTES: usize = 10240000;
/// Plateau client. Creation options:
/// ```
/// // Client pointed at 'localhost:3030'.
/// let client = plateau_client_arrow_rs::Client::default();
/// let client = plateau_client::Client::default();
///
/// // Client pointed at an alternate URL.
/// let client = plateau_client_arrow_rs::Client::new("plateau.my-wallaroo-cluster.dev:1234");
/// let client = plateau_client::Client::new("plateau.my-wallaroo-cluster.dev:1234");
/// ```
#[derive(Debug, Clone)]
pub struct Client {
Expand Down Expand Up @@ -993,7 +996,7 @@ mod tests {
Expectation, Server,
};
use plateau_server::{http, Config as PlateauConfig};
use plateau_transport_arrow_rs::{DataFocus, RecordStatus, Span, Topic, TopicIterationOrder};
use plateau_transport::{DataFocus, RecordStatus, Span, Topic, TopicIterationOrder};
use tokio_util::io::ReaderStream;

use super::*;
Expand Down
2 changes: 1 addition & 1 deletion client/src/replicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::collections::{BTreeMap, HashMap};
use std::fmt;
use std::sync::Arc;

use plateau_transport_arrow_rs as transport;
use plateau_transport as transport;
use serde::{Deserialize, Serialize};
use transport::{MultiChunk, PartitionId, RecordQuery};

Expand Down
10 changes: 5 additions & 5 deletions data/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "plateau-data-arrow-rs"
name = "plateau-data"
description = "Data processing for the plateau server"

version.workspace = true
Expand Down Expand Up @@ -44,8 +44,8 @@ arrow-ipc = "55.2.0"
thiserror.workspace = true

# Use arrow-rs versions of transport and client
plateau-transport-arrow-rs = { path = "../transport" }
plateau-client-arrow-rs = { path = "../client" }
plateau-transport.workspace = true
plateau-client.workspace = true


[dev-dependencies]
Expand All @@ -61,8 +61,8 @@ sample-arrow-rs = "55.2.0"
reqwest.workspace = true

# Use arrow-rs versions for testing
plateau-client-arrow-rs = { path = "../client" }
plateau-test-arrow-rs = { path = "../test" }
plateau-client.workspace = true
plateau-test.workspace = true


[lints]
Expand Down
4 changes: 2 additions & 2 deletions data/src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use arrow_array::{Array, ArrayRef, BooleanArray, Int32Array, Int64Array, RecordB
pub use arrow_schema::Schema;
use arrow_select::filter::filter_record_batch;
use chrono::{DateTime, TimeZone, Utc};
use plateau_transport_arrow_rs::{ChunkError, SchemaChunk, SegmentChunk};
use plateau_transport::{ChunkError, SchemaChunk, SegmentChunk};
use std::borrow::Borrow;
use std::ops::RangeInclusive;
use std::sync::Arc;
Expand Down Expand Up @@ -294,7 +294,7 @@ pub fn slice(chunk: SegmentChunk, offset: usize, len: usize) -> SegmentChunk {
pub mod test {
use super::*;
use crate::transport::estimate_size;
use plateau_test_arrow_rs as test_arrow_rs;
use plateau_test as test_arrow_rs;
use test_arrow_rs::{inferences_nested, inferences_schema_a, inferences_schema_b};

/*
Expand Down
Loading
Loading