diff --git a/packages/cubejs-backend-native/src/stream.rs b/packages/cubejs-backend-native/src/stream.rs index 4a496b3537cca..9bc270ed1d01a 100644 --- a/packages/cubejs-backend-native/src/stream.rs +++ b/packages/cubejs-backend-native/src/stream.rs @@ -261,6 +261,27 @@ pub struct JsValueObject<'a> { pub handle: Handle<'a, JsArray>, } +fn js_value_to_json_string<'a, C: Context<'a>>( + cx: &mut C, + value: Handle<'a, JsValue>, +) -> Result { + let global = cx.global_object(); + let json = global + .get::(cx, "JSON") + .map_err(|e| CubeError::internal(format!("Can't get JSON global: {}", e)))?; + let stringify = json + .get::(cx, "stringify") + .map_err(|e| CubeError::internal(format!("Can't get JSON.stringify: {}", e)))?; + let undefined = cx.undefined().upcast::(); + let result = stringify + .call(cx, undefined, [value]) + .map_err(|e| CubeError::internal(format!("JSON.stringify failed: {}", e)))?; + let s = result.downcast::(cx).map_err(|e| { + CubeError::internal(format!("JSON.stringify did not return a string: {}", e)) + })?; + Ok(s.value(cx)) +} + impl ValueObject for JsValueObject<'_> { fn len(&mut self) -> Result { Ok(self.handle.len(&mut self.cx) as usize) @@ -287,17 +308,22 @@ impl ValueObject for JsValueObject<'_> { || value.downcast::(&mut self.cx).is_ok() { Ok(FieldValue::Null) - } else if let Ok(b) = value.downcast::(&mut self.cx) { - Err(CubeError::internal(format!( - "Expected primitive value but found JsArray({:?})", - b - ))) + } else if value.is_a::(&mut self.cx) { + Ok(FieldValue::String(Cow::Owned(js_value_to_json_string( + &mut self.cx, + value, + )?))) } else if let Ok(b) = value.downcast::(&mut self.cx) { // TODO: Support it? Err(CubeError::internal(format!( "Expected primitive value but found JsDate({:?})", b ))) + } else if value.is_a::(&mut self.cx) { + Ok(FieldValue::String(Cow::Owned(js_value_to_json_string( + &mut self.cx, + value, + )?))) } else { Err(CubeError::internal(format!( "Expected primitive value but found: {:?}", @@ -321,7 +347,10 @@ fn js_stream_push_chunk(mut cx: FunctionContext) -> JsResult { handle: chunk_array, }; let value = - transform_response(&mut value_object, this.schema.clone(), &this.member_fields).unwrap(); + match transform_response(&mut value_object, this.schema.clone(), &this.member_fields) { + Ok(value) => value, + Err(e) => return value_object.cx.throw_error(e.message), + }; let future = this.push_chunk(value); wait_for_future_and_execute_callback( this.tokio_handle.clone(), diff --git a/packages/cubejs-postgres-driver/src/PostgresDriver.ts b/packages/cubejs-postgres-driver/src/PostgresDriver.ts index f0b11bd31a2a8..e083b5d0104a4 100644 --- a/packages/cubejs-postgres-driver/src/PostgresDriver.ts +++ b/packages/cubejs-postgres-driver/src/PostgresDriver.ts @@ -296,6 +296,8 @@ export class PostgresDriver { } }); + test('stream (array-typed columns)', async () => { + // Streaming must not fail when a query returns array-typed columns. + // Array types are reported as `text` and node-postgres parses them into + // JS arrays. See CORE-522. + const tableData = await driver.stream( + `SELECT + ARRAY['oops', 'test']::text[] as text_array, + ARRAY[1, 2, 3]::int[] as int_array`, + [], + { + highWaterMark: 1000, + } + ); + + try { + expect(await tableData.types).toEqual([ + { + name: 'text_array', + type: 'text' + }, + { + name: 'int_array', + type: 'text' + }, + ]); + expect(await streamToArray(tableData.rowStream)).toEqual([ + { text_array: ['oops', 'test'], int_array: [1, 2, 3] }, + ]); + } finally { + await ( tableData).release(); + } + }); + test('stream (exception)', async () => { try { await driver.stream('select * from test.random_name_for_table_that_doesnot_exist_sql_must_fail', [], { diff --git a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs index 8f7ba0704bd10..5a498b4fd220b 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs @@ -350,11 +350,10 @@ fn json_value_to_field_value(value: &Value) -> std::result::Result FieldValue::Bool(*b), Value::Null => FieldValue::Null, - x => { - return Err(CubeError::user(format!( - "Expected primitive value but found: {:?}", - x - ))); + x @ (Value::Array(_) | Value::Object(_)) => { + FieldValue::String(Cow::Owned(serde_json::to_string(x).map_err(|e| { + CubeError::internal(format!("Can't serialize non-scalar value to JSON: {}", e)) + })?)) } }) } @@ -1380,7 +1379,8 @@ mod tests { use datafusion::{ arrow::{ array::{ - BooleanArray, Date32Array, Float64Array, StringArray, TimestampNanosecondArray, + Array, BooleanArray, Date32Array, Float64Array, StringArray, + TimestampNanosecondArray, }, datatypes::{Field, Schema}, }, @@ -1479,6 +1479,52 @@ mod tests { ); } + #[test] + fn convert_transport_response_serializes_non_scalar_values_to_json_strings() { + let raw = r#" + { + "results": [{ + "annotation": { + "measures": [], + "dimensions": [], + "segments": [], + "timeDimensions": [] + }, + "data": [ + {"c": ["a", "b"], "d": {"k": 1}}, + {"c": null, "d": "plain"} + ] + }] + } + "#; + let response: V1LoadResponse = serde_json::from_str(raw).unwrap(); + let schema = Arc::new(Schema::new(vec![ + Field::new("c", DataType::Utf8, true), + Field::new("d", DataType::Utf8, true), + ])); + let member_fields = vec![ + MemberField::regular("c".to_string()), + MemberField::regular("d".to_string()), + ]; + let batches = convert_transport_response(response, schema, member_fields).unwrap(); + + let c = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(c.value(0), r#"["a","b"]"#); + assert!(c.is_null(1)); + + let d = batches[0] + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(d.value(0), r#"{"k":1}"#); + assert_eq!(d.value(1), "plain"); + } + fn get_test_load_meta(protocol: DatabaseProtocol) -> LoadRequestMeta { LoadRequestMeta::new( protocol.get_name().to_string(),