Skip to content
This repository was archived by the owner on Mar 5, 2026. It is now read-only.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
"extend": "^3.0.2",
"google-auth-library": "^10.0.0",
"google-gax": "^5.0.0"

},
"peerDependencies": {
"protobufjs": "^7.2.4 - 7.5.0"
Expand All @@ -46,6 +45,7 @@
"@types/node": "^22.13.14",
"@types/sinon": "^17.0.4",
"@types/uuid": "^10.0.0",
"avsc": "^5.7.9",
"c8": "^10.1.3",
"gapic-tools": "^1.0.1",
"gts": "^6.0.2",
Expand Down
10 changes: 8 additions & 2 deletions src/reader/read_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import {ArrowTableReader} from './arrow_reader';
import {DataFormat} from './data_format';

type CreateReadSessionRequest =

Check warning on line 25 in src/reader/read_client.ts

View workflow job for this annotation

GitHub Actions / lint

'CreateReadSessionRequest' is defined but never used
protos.google.cloud.bigquery.storage.v1.ICreateReadSessionRequest;
type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession;
type ArrowSerializationOptions = {
type AvroArrowSerializationOptions = {
picosTimestampPrecision: protos.google.cloud.bigquery.storage.v1.ArrowSerializationOptions.PicosTimestampPrecision;
};

Expand Down Expand Up @@ -131,7 +131,8 @@
table: string;
dataFormat: DataFormat;
selectedFields?: string[];
arrowSerializationOptions?: ArrowSerializationOptions;
arrowSerializationOptions?: AvroArrowSerializationOptions;
avroSerializationOptions?: AvroArrowSerializationOptions;
}): Promise<ReadSession> {
await this.initialize();
const {table, parent, dataFormat, selectedFields} = request;
Expand All @@ -154,6 +155,11 @@
arrowSerializationOptions: request.arrowSerializationOptions,
});
}
if (request.avroSerializationOptions) {
Object.assign(createReq.readSession.readOptions, {
avroSerializationOptions: request.avroSerializationOptions,
});
}
const [response] = await this._client.createReadSession(createReq);
if (typeof [response] === undefined) {
throw new gax.GoogleError(`${response}`);
Expand Down
11 changes: 8 additions & 3 deletions src/reader/read_session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type ReadRowsResponse =
protos.google.cloud.bigquery.storage.v1.IReadRowsResponse;
type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession;
const ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.ReadSession;
type ArrowSerializationOptions = {
type AvroArrowSerializationOptions = {
picosTimestampPrecision: protos.google.cloud.bigquery.storage.v1.ArrowSerializationOptions.PicosTimestampPrecision;
};

Expand All @@ -39,9 +39,13 @@ export type GetStreamOptions = {
*/
selectedFields?: string;
/**
* Option to opt into higher precision timestamps.
* Option to opt into higher precision timestamps for arrow readers.
*/
arrowSerializationOptions?: ArrowSerializationOptions;
arrowSerializationOptions?: AvroArrowSerializationOptions;
/**
* Option to opt into higher precision timestamps for avro readers.
*/
avroSerializationOptions?: AvroArrowSerializationOptions;
};

/**
Expand Down Expand Up @@ -93,6 +97,7 @@ export class ReadSession {
dataFormat: this._format,
selectedFields: options?.selectedFields?.split(','),
arrowSerializationOptions: options?.arrowSerializationOptions,
avroSerializationOptions: options?.avroSerializationOptions,
});
this.trace(
'session created',
Expand Down
128 changes: 128 additions & 0 deletions system-test/reader_client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import {RecordBatch, Table, tableFromIPC} from 'apache-arrow';
import {ArrowRecordBatchTableRowTransform} from '../src/reader/arrow_transform';
import {ResourceStream} from '@google-cloud/paginator';
import {ArrowTableReader} from '../src/reader';
import {Transform, TransformCallback} from 'stream';
type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession;

type ReadRowsResponse =
protos.google.cloud.bigquery.storage.v1.IReadRowsResponse;
Expand Down Expand Up @@ -230,6 +232,132 @@ describe('reader.ReaderClient', () => {
});
});

describe('AvroReader', () => {
it('should read high precision timestamps from an avro stream', async () => {
const avro = require('avsc');
class AvroRawTransform extends Transform {
private session: ReadSession;

constructor(session: ReadSession) {
super({
objectMode: true,
});
this.session = session;
}

_transform(
serializedRecordBatch: any,
_: BufferEncoding,
callback: TransformCallback,
): void {
const session = this.session;
const schema = JSON.parse(session?.avroSchema?.schema as string);
const avroType = avro.Type.forSchema(schema);
if (
!(
serializedRecordBatch.avroRows &&
serializedRecordBatch.avroRows.serializedBinaryRows
)
) {
callback(null);
return;
}
const decodedData = avroType.decode(
serializedRecordBatch.avroRows.serializedBinaryRows,
0,
);
callback(null, decodedData.value);
}
}

const picosTableId = generateUuid();
const picosSchema: any = {
fields: [
{
name: 'customer_name',
type: 'STRING',
mode: 'REQUIRED',
},
{
name: 'row_num',
type: 'INTEGER',
mode: 'REQUIRED',
},
{
name: 'created_at',
type: 'TIMESTAMP',
mode: 'NULLABLE',
timestampPrecision: 12,
},
],
};
const expectedTsValue = '2024-04-05T15:45:58.981123456789Z';
await bigquery
.dataset(datasetId)
.createTable(picosTableId, {schema: picosSchema});
await bigquery
.dataset(datasetId)
.table(picosTableId)
.insert([
{
customer_name: 'my-name',
row_num: 1,
created_at: expectedTsValue,
},
]);

bqReadClient.initialize().catch(err => {
throw err;
});
const client = new ReadClient();
client.setClient(bqReadClient);

try {
const session = await client.createReadSession({
parent: `projects/${projectId}`,
table: `projects/${projectId}/datasets/${datasetId}/tables/${picosTableId}`,
dataFormat: AvroFormat,
avroSerializationOptions: {
picosTimestampPrecision:
protos.google.cloud.bigquery.storage.v1.ArrowSerializationOptions
.PicosTimestampPrecision.TIMESTAMP_PRECISION_PICOS,
},
});

assert.equal(session.dataFormat, AvroFormat);
assert.notEqual(session.streams, null);
assert.notEqual(session.streams?.length, 0);

const readStream = session.streams![0];
const connection = await client.createReadStream({
session,
streamName: readStream.name!,
});

const myStream = connection
.getRowsStream()
.pipe(new AvroRawTransform(session!));
const responses: ReadRowsResponse[] = [];
await new Promise((resolve, reject) => {
myStream.on('data', (data: ReadRowsResponse) => {
responses.push(data);
});
myStream.on('error', reject);
myStream.on('end', () => {
resolve(null);
});
});

assert.equal(responses.length, 1);
assert.equal((responses[0] as any)['created_at'], expectedTsValue);

connection.close();
client.close();
} finally {
client.close();
}
});
});
describe('ArrowTableReader', () => {
it('should allow to read a table as an Arrow byte stream', async () => {
bqReadClient.initialize().catch(err => {
Expand Down
Loading