Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions packages/cubejs-backend-native/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,27 @@ pub struct JsValueObject<'a> {
pub handle: Handle<'a, JsArray>,
}

fn js_value_to_json_string<'a, C: Context<'a>>(
cx: &mut C,
value: Handle<'a, JsValue>,
) -> Result<String, CubeError> {
let global = cx.global_object();
let json = global
.get::<JsObject, _, _>(cx, "JSON")
.map_err(|e| CubeError::internal(format!("Can't get JSON global: {}", e)))?;
let stringify = json
.get::<JsFunction, _, _>(cx, "stringify")
.map_err(|e| CubeError::internal(format!("Can't get JSON.stringify: {}", e)))?;
let undefined = cx.undefined().upcast::<JsValue>();
let result = stringify
.call(cx, undefined, [value])
.map_err(|e| CubeError::internal(format!("JSON.stringify failed: {}", e)))?;
let s = result.downcast::<JsString, _>(cx).map_err(|e| {
CubeError::internal(format!("JSON.stringify did not return a string: {}", e))
})?;
Ok(s.value(cx))
}

impl ValueObject for JsValueObject<'_> {
fn len(&mut self) -> Result<usize, CubeError> {
Ok(self.handle.len(&mut self.cx) as usize)
Expand All @@ -287,17 +308,22 @@ impl ValueObject for JsValueObject<'_> {
|| value.downcast::<JsNull, _>(&mut self.cx).is_ok()
{
Ok(FieldValue::Null)
} else if let Ok(b) = value.downcast::<JsArray, _>(&mut self.cx) {
Err(CubeError::internal(format!(
"Expected primitive value but found JsArray({:?})",
b
)))
} else if value.is_a::<JsArray, _>(&mut self.cx) {
Ok(FieldValue::String(Cow::Owned(js_value_to_json_string(
&mut self.cx,
value,
)?)))
} else if let Ok(b) = value.downcast::<JsDate, _>(&mut self.cx) {
// TODO: Support it?
Err(CubeError::internal(format!(
"Expected primitive value but found JsDate({:?})",
b
)))
} else if value.is_a::<JsObject, _>(&mut self.cx) {
Ok(FieldValue::String(Cow::Owned(js_value_to_json_string(
&mut self.cx,
value,
)?)))
} else {
Err(CubeError::internal(format!(
"Expected primitive value but found: {:?}",
Expand All @@ -321,7 +347,10 @@ fn js_stream_push_chunk(mut cx: FunctionContext) -> JsResult<JsUndefined> {
handle: chunk_array,
};
let value =
transform_response(&mut value_object, this.schema.clone(), &this.member_fields).unwrap();
match transform_response(&mut value_object, this.schema.clone(), &this.member_fields) {
Ok(value) => value,
Err(e) => return value_object.cx.throw_error(e.message),
};
let future = this.push_chunk(value);
wait_for_future_and_execute_callback(
this.tokio_handle.clone(),
Expand Down
7 changes: 5 additions & 2 deletions packages/cubejs-postgres-driver/src/PostgresDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,19 +296,22 @@ export class PostgresDriver<Config extends PostgresDriverConfiguration = Postgre
if (!this.userDefinedTypes) {
// Postgres enum types defined as typcategory = 'E' these can be assumed
// to be of type varchar for the drivers purposes.
// Postgres array types defined as typcategory = 'A' these can be assumed
// to be of type text for the drivers purposes.
// TODO: if full implmentation the constraints can be looked up via pg_enum
// https://www.postgresql.org/docs/9.1/catalog-pg-enum.html
const customTypes = await conn.query(
`SELECT
oid,
CASE
WHEN typcategory = 'E' THEN 'varchar'
WHEN typcategory = 'A' THEN 'text'
ELSE typname
END
END AS typname
FROM
pg_type
WHERE
typcategory in ('U', 'E')`,
typcategory in ('U', 'E', 'A')`,
[]
);

Expand Down
33 changes: 33 additions & 0 deletions packages/cubejs-postgres-driver/test/PostgresDriver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,39 @@ describe('PostgresDriver', () => {
}
});

test('stream (array-typed columns)', async () => {
// Streaming must not fail when a query returns array-typed columns.
// Array types are reported as `text` and node-postgres parses them into
// JS arrays. See CORE-522.
const tableData = await driver.stream(
`SELECT
ARRAY['oops', 'test']::text[] as text_array,
ARRAY[1, 2, 3]::int[] as int_array`,
[],
{
highWaterMark: 1000,
}
);

try {
expect(await tableData.types).toEqual([
{
name: 'text_array',
type: 'text'
},
{
name: 'int_array',
type: 'text'
},
]);
expect(await streamToArray(tableData.rowStream)).toEqual([
{ text_array: ['oops', 'test'], int_array: [1, 2, 3] },
]);
} finally {
await (<any> tableData).release();
}
});

test('stream (exception)', async () => {
try {
await driver.stream('select * from test.random_name_for_table_that_doesnot_exist_sql_must_fail', [], {
Expand Down
58 changes: 52 additions & 6 deletions rust/cubesql/cubesql/src/compile/engine/df/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,10 @@ fn json_value_to_field_value(value: &Value) -> std::result::Result<FieldValue<'_
})?),
Value::Bool(b) => FieldValue::Bool(*b),
Value::Null => FieldValue::Null,
x => {
return Err(CubeError::user(format!(
"Expected primitive value but found: {:?}",
x
)));
x @ (Value::Array(_) | Value::Object(_)) => {
FieldValue::String(Cow::Owned(serde_json::to_string(x).map_err(|e| {
CubeError::internal(format!("Can't serialize non-scalar value to JSON: {}", e))
})?))
}
})
}
Expand Down Expand Up @@ -1380,7 +1379,8 @@ mod tests {
use datafusion::{
arrow::{
array::{
BooleanArray, Date32Array, Float64Array, StringArray, TimestampNanosecondArray,
Array, BooleanArray, Date32Array, Float64Array, StringArray,
TimestampNanosecondArray,
},
datatypes::{Field, Schema},
},
Expand Down Expand Up @@ -1479,6 +1479,52 @@ mod tests {
);
}

#[test]
fn convert_transport_response_serializes_non_scalar_values_to_json_strings() {
let raw = r#"
{
"results": [{
"annotation": {
"measures": [],
"dimensions": [],
"segments": [],
"timeDimensions": []
},
"data": [
{"c": ["a", "b"], "d": {"k": 1}},
{"c": null, "d": "plain"}
]
}]
}
"#;
let response: V1LoadResponse = serde_json::from_str(raw).unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("c", DataType::Utf8, true),
Field::new("d", DataType::Utf8, true),
]));
let member_fields = vec![
MemberField::regular("c".to_string()),
MemberField::regular("d".to_string()),
];
let batches = convert_transport_response(response, schema, member_fields).unwrap();

let c = batches[0]
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(c.value(0), r#"["a","b"]"#);
assert!(c.is_null(1));

let d = batches[0]
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(d.value(0), r#"{"k":1}"#);
assert_eq!(d.value(1), "plain");
}

fn get_test_load_meta(protocol: DatabaseProtocol) -> LoadRequestMeta {
LoadRequestMeta::new(
protocol.get_name().to_string(),
Expand Down
Loading