Skip to content
Merged
3 changes: 3 additions & 0 deletions changelog.d/24667_clickhouse_arrow_default_columns.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The ClickHouse sink's ArrowStream format now correctly handles MATERIALIZED, ALIAS, EPHEMERAL, and DEFAULT columns. MATERIALIZED, ALIAS, and EPHEMERAL columns are excluded from the fetched schema since they cannot receive INSERT data. DEFAULT columns are kept but marked nullable so events are not rejected when the server-computed value is omitted.

authors: benjamin-awd
157 changes: 106 additions & 51 deletions src/sinks/clickhouse/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use arrow::datatypes::{Field, Schema};
use async_trait::async_trait;
use http::{Request, StatusCode};
use hyper::Body;
use itertools::Itertools;
use serde::Deserialize;
use url::form_urlencoded;
use vector_lib::codecs::encoding::format::{ArrowEncodingError, SchemaProvider};
Expand All @@ -15,11 +14,16 @@ use crate::http::{Auth, HttpClient};

use super::parser::ClickHouseType;

/// String constants for ClickHouse column `default_kind` values.
const COLUMN_KIND_REGULAR: &str = "";
const COLUMN_KIND_DEFAULT: &str = "DEFAULT";

#[derive(Debug, Deserialize)]
struct ColumnInfo {
name: String,
#[serde(rename = "type")]
column_type: String,
default_kind: String,
}

impl TryFrom<ColumnInfo> for Field {
Expand All @@ -30,6 +34,8 @@ impl TryFrom<ColumnInfo> for Field {
let (dt, nullable) = (&ch_type)
.try_into()
.map_err(|e| format!("Failed to convert column '{}': {e}", column.name))?;
// DEFAULT columns have server-side defaults, so users don't need to provide them.
let nullable = nullable || column.default_kind == COLUMN_KIND_DEFAULT;
Ok(Field::new(column.name, dt, nullable))
}
}
Expand All @@ -42,15 +48,18 @@ pub async fn fetch_table_schema(
table: &str,
auth: Option<&Auth>,
) -> crate::Result<Schema> {
let query = "SELECT name, type \
FROM system.columns \
WHERE database = {db:String} AND table = {tbl:String} \
ORDER BY position \
FORMAT JSONEachRow";
let query = format!(
"SELECT name, type, default_kind \
FROM system.columns \
WHERE database = {{db:String}} AND table = {{tbl:String}} \
AND default_kind IN ('{COLUMN_KIND_REGULAR}', '{COLUMN_KIND_DEFAULT}') \
ORDER BY position \
FORMAT JSONEachRow"
);

// Build URI with query and parameters
let query_string = form_urlencoded::Serializer::new(String::new())
.append_pair("query", query)
.append_pair("query", &query)
.append_pair("param_db", database)
.append_pair("param_tbl", table)
.finish();
Expand All @@ -65,34 +74,34 @@ pub async fn fetch_table_schema(

let response = client.send(request).await?;

match response.status() {
StatusCode::OK => {
let body_bytes = http_body::Body::collect(response.into_body())
.await?
.to_bytes();
let body_str = String::from_utf8(body_bytes.into())
.map_err(|e| format!("Failed to parse response as UTF-8: {e}"))?;

parse_schema_from_response(&body_str)
}
status => Err(format!("Failed to fetch schema from ClickHouse: HTTP {status}").into()),
if response.status() != StatusCode::OK {
return Err(format!(
"Failed to fetch schema from ClickHouse: HTTP {}",
response.status()
)
.into());
}

let body_bytes = http_body::Body::collect(response.into_body())
.await?
.to_bytes();

// Pass bytes directly instead of converting to a UTF-8 String first
parse_schema_from_response(&body_bytes)
}

/// Parses the JSON response from ClickHouse and builds an Arrow schema.
fn parse_schema_from_response(response: &str) -> crate::Result<Schema> {
let mut lines = response.lines().filter(|line| !line.is_empty()).peekable();
/// Parses the JSONEachRow response from ClickHouse and builds an Arrow schema.
fn parse_schema_from_response(response: &[u8]) -> crate::Result<Schema> {
let fields = serde_json::Deserializer::from_slice(response)
.into_iter::<ColumnInfo>()
.map(|res| -> crate::Result<Field> { res?.try_into() })
.collect::<crate::Result<Vec<Field>>>()?;

if lines.peek().is_none() {
if fields.is_empty() {
return Err("Table does not exist or has no columns".into());
}

lines
.map(|line| -> crate::Result<Field> {
serde_json::from_str::<ColumnInfo>(line)?.try_into()
})
.try_collect::<_, Vec<Field>, _>()
.map(Schema::new)
Ok(Schema::new(fields))
}

/// Schema provider implementation for ClickHouse tables.
Expand Down Expand Up @@ -145,15 +154,17 @@ impl SchemaProvider for ClickHouseSchemaProvider {
mod tests {
use super::*;
use arrow::datatypes::{DataType, TimeUnit};
use indoc::indoc;

#[test]
fn test_parse_schema() {
let response = r#"{"name":"id","type":"Int64"}
{"name":"message","type":"String"}
{"name":"timestamp","type":"DateTime"}
"#;
let response = indoc! {r#"
{"name":"id","type":"Int64","default_kind":""}
{"name":"message","type":"String","default_kind":""}
{"name":"timestamp","type":"DateTime","default_kind":""}
"#};

let schema = parse_schema_from_response(response).unwrap();
let schema = parse_schema_from_response(response.as_bytes()).unwrap();
assert_eq!(schema.fields().len(), 3);
assert_eq!(schema.field(0).name(), "id");
assert_eq!(schema.field(0).data_type(), &DataType::Int64);
Expand All @@ -168,43 +179,41 @@ mod tests {

#[test]
fn test_parse_schema_with_type_parameters() {
// Test that type string parsing works for types with parameters
let response = r#"{"name":"bytes_sent","type":"Decimal(18, 2)"}
{"name":"timestamp","type":"DateTime64(6)"}
{"name":"duration_ms","type":"Decimal32(4)"}
"#;
let response = indoc! {r#"
{"name":"bytes_sent","type":"Decimal(18, 2)","default_kind":""}
{"name":"timestamp","type":"DateTime64(6)","default_kind":""}
{"name":"duration_ms","type":"Decimal32(4)","default_kind":""}
"#};

let schema = parse_schema_from_response(response).unwrap();
let schema = parse_schema_from_response(response.as_bytes()).unwrap();
assert_eq!(schema.fields().len(), 3);

// Check Decimal parsed from type string
assert_eq!(schema.field(0).name(), "bytes_sent");
assert_eq!(schema.field(0).data_type(), &DataType::Decimal128(18, 2));

// Check DateTime64 parsed from type string
assert_eq!(schema.field(1).name(), "timestamp");
assert_eq!(
schema.field(1).data_type(),
&DataType::Timestamp(TimeUnit::Microsecond, None)
);

// Check Decimal32 parsed from type string
assert_eq!(schema.field(2).name(), "duration_ms");
assert_eq!(schema.field(2).data_type(), &DataType::Decimal128(9, 4));
}

#[test]
fn test_schema_field_ordering() {
let response = r#"{"name":"timestamp","type":"DateTime64(3)"}
{"name":"host","type":"String"}
{"name":"message","type":"String"}
{"name":"id","type":"Int64"}
{"name":"score","type":"Float64"}
{"name":"active","type":"Bool"}
{"name":"name","type":"String"}
"#;
let response = indoc! {r#"
{"name":"timestamp","type":"DateTime64(3)","default_kind":""}
{"name":"host","type":"String","default_kind":""}
{"name":"message","type":"String","default_kind":""}
{"name":"id","type":"Int64","default_kind":""}
{"name":"score","type":"Float64","default_kind":""}
{"name":"active","type":"Bool","default_kind":""}
{"name":"name","type":"String","default_kind":""}
"#};

let schema = parse_schema_from_response(response).unwrap();
let schema = parse_schema_from_response(response.as_bytes()).unwrap();
assert_eq!(schema.fields().len(), 7);

assert_eq!(schema.field(0).name(), "timestamp");
Expand All @@ -224,4 +233,50 @@ mod tests {
assert_eq!(schema.field(4).data_type(), &DataType::Float64);
assert_eq!(schema.field(5).data_type(), &DataType::Boolean);
}

/// Tests that DEFAULT columns are marked nullable in the parsed schema,
/// since ClickHouse fills them with server-side defaults when omitted.
#[test]
fn test_default_columns_marked_nullable() {
// The SQL query filters out MATERIALIZED/ALIAS/EPHEMERAL, so
// parse_schema_from_response only sees regular and DEFAULT columns.
let response = indoc! {r#"
{"name":"id","type":"Int64","default_kind":""}
{"name":"status","type":"String","default_kind":"DEFAULT"}
{"name":"message","type":"String","default_kind":""}
"#};

let schema = parse_schema_from_response(response.as_bytes()).unwrap();
assert_eq!(schema.fields().len(), 3);

// Regular column: non-nullable
assert!(!schema.field(0).is_nullable());
// DEFAULT column: forced nullable even though type is non-nullable
assert_eq!(schema.field(1).name(), "status");
assert!(schema.field(1).is_nullable());
// Regular column: non-nullable
assert!(!schema.field(2).is_nullable());
}

/// Simulates the response after the SQL query has filtered out
/// MATERIALIZED/ALIAS/EPHEMERAL columns, leaving only regular and DEFAULT.
#[test]
fn test_post_filter_schema_with_default() {
let response = r#"{"name":"id","type":"Int64","default_kind":""}
{"name":"created_at","type":"DateTime64(3)","default_kind":"DEFAULT"}
{"name":"message","type":"Nullable(String)","default_kind":""}
"#;

let schema = parse_schema_from_response(response.as_bytes()).unwrap();
assert_eq!(schema.fields().len(), 3);

assert_eq!(schema.field(0).name(), "id");
assert!(!schema.field(0).is_nullable());

assert_eq!(schema.field(1).name(), "created_at");
assert!(schema.field(1).is_nullable()); // DEFAULT → nullable

assert_eq!(schema.field(2).name(), "message");
assert!(schema.field(2).is_nullable()); // Nullable(String) → nullable
}
}
Loading
Loading