Skip to content

Commit

Permalink
feat: add wrapper for reading table data using Storage API (#431)
Browse files Browse the repository at this point in the history
Add support for easily reading Tables using the BigQuery Storage API instead of the BigQuery API. This will provide increased performance and reduced memory usage for most use cases and will allow users to keep using the same interface as they used to use on our main library or fetch data directly via a new veneer on BigQuery Storage Read API
  • Loading branch information
alvarowolfx authored Sep 23, 2024
1 parent 2ff0553 commit 03f2b1f
Show file tree
Hide file tree
Showing 13 changed files with 1,679 additions and 5 deletions.
13 changes: 8 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,21 @@
"precompile": "gts clean"
},
"dependencies": {
"@google-cloud/paginator": "^5.0.0",
"apache-arrow": "^14.0.2",
"core-js": "^3.37.1",
"extend": "^3.0.2",
"google-gax": "^4.3.1",
"google-auth-library": "^9.6.3"
"google-auth-library": "^9.6.3",
"google-gax": "^4.3.1"
},
"peerDependencies": {
"protobufjs": "^7.2.4"
},
"devDependencies": {
"@google-cloud/bigquery": "^7.0.0",
"@google-cloud/bigquery": "^7.5.2",
"@types/extend": "^3.0.4",
"@types/mocha": "^9.0.0",
"@types/node": "^20.0.0",
"@types/node": "^20.16.5",
"@types/sinon": "^17.0.0",
"@types/uuid": "^9.0.1",
"c8": "^9.0.0",
Expand All @@ -55,7 +58,7 @@
"nise": "6.0.0",
"path-to-regexp": "6.3.0",
"ts-loader": "^9.0.0",
"typescript": "^5.1.6",
"typescript": "^5.5.3",
"uuid": "^9.0.0",
"webpack": "^5.0.0",
"webpack-cli": "^5.0.0"
Expand Down
7 changes: 7 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import * as v1 from './v1';
import * as v1beta1 from './v1beta1';
import * as v1alpha from './v1alpha';
import * as managedwriter from './managedwriter';
import * as reader from './reader';
const BigQueryReadClient = v1.BigQueryReadClient;
type BigQueryReadClient = v1.BigQueryReadClient;
const BigQueryWriteClient = v1.BigQueryWriteClient;
Expand All @@ -28,6 +29,8 @@ const BigQueryStorageClient = v1beta1.BigQueryStorageClient;
type BigQueryStorageClient = v1beta1.BigQueryStorageClient;
const WriterClient = managedwriter.WriterClient;
type WriterClient = managedwriter.WriterClient;
const ReadClient = reader.ReadClient;
type ReadClient = reader.ReadClient;
export {
v1,
BigQueryReadClient,
Expand All @@ -37,6 +40,8 @@ export {
BigQueryWriteClient,
managedwriter,
WriterClient,
reader,
ReadClient,
};
// For compatibility with JavaScript libraries we need to provide this default export:
// tslint:disable-next-line no-default-export
Expand All @@ -46,6 +51,8 @@ export default {
BigQueryWriteClient,
managedwriter,
WriterClient,
reader,
ReadClient,
};
import * as protos from '../protos/protos';
export {protos};
Expand Down
101 changes: 101 additions & 0 deletions src/reader/arrow_reader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import {ResourceStream} from '@google-cloud/paginator';
import {RecordBatch} from 'apache-arrow';

import * as protos from '../../protos/protos';
import {TableReference, ReadClient} from './read_client';
import {logger} from '../util/logger';
import {
ArrowRawTransform,
ArrowRecordBatchTransform,
ArrowRecordReaderTransform,
} from './arrow_transform';
import {ReadSession, GetStreamOptions} from './read_session';
import {ArrowFormat} from './data_format';

type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession;

/**
* A BigQuery Storage API Reader that can be used to read data
* from BigQuery Tables using the Storage API in Arrow format.
*
* @class
* @memberof reader
*/
export class ArrowTableReader {
private _tableRef: TableReference;
private _session: ReadSession;

/**
* Creates a new ArrowTableReader instance. Usually created via
* ReadClient.createArrowTableReader().
*
* @param {ReadClient} readClient - Storage Read Client.
* @param {TableReference} table - target table to read data from.
*/
constructor(readClient: ReadClient, tableRef: TableReference) {
this._tableRef = tableRef;
this._session = new ReadSession(readClient, tableRef, ArrowFormat);
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
private trace(msg: string, ...otherArgs: any[]) {
logger(
'arrow_table_reader',
`[table: ${this._tableRef.tableId}]`,
msg,
...otherArgs
);
}

getSessionInfo(): ReadSessionInfo | undefined | null {
return this._session.getSessionInfo();
}

/**
* Get a byte stream of Arrow Record Batch.
*
* @param {GetStreamOptions} options
*/
async getStream(
options?: GetStreamOptions
): Promise<ResourceStream<Uint8Array>> {
this.trace('getStream', options);
const stream = await this._session.getStream(options);
return stream.pipe(new ArrowRawTransform()) as ResourceStream<Uint8Array>;
}

/**
* Get a stream of Arrow RecordBatch objects.
*
* @param {GetStreamOptions} options
*/
async getRecordBatchStream(
options?: GetStreamOptions
): Promise<ResourceStream<RecordBatch>> {
this.trace('getRecordBatchStream', options);
const stream = await this._session.getStream(options);
const info = this._session.getSessionInfo();
return stream
.pipe(new ArrowRawTransform())
.pipe(new ArrowRecordReaderTransform(info!))
.pipe(new ArrowRecordBatchTransform()) as ResourceStream<RecordBatch>;
}

close() {
this._session.close();
}
}
176 changes: 176 additions & 0 deletions src/reader/arrow_transform.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import {Transform, TransformCallback} from 'stream';
import {
RecordBatchReader,
RecordBatch,
RecordBatchStreamReader,
Vector,
} from 'apache-arrow';
import * as protos from '../../protos/protos';

type ReadRowsResponse =
protos.google.cloud.bigquery.storage.v1.IReadRowsResponse;
type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession;

interface TableCell {
v?: any;

Check warning on line 29 in src/reader/arrow_transform.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
}
interface TableRow {
f?: Array<TableCell>;
}

/**
* ArrowRawTransform implements a node stream Transform that reads
* ReadRowsResponse from BigQuery Storage Read API and convert
* a raw Arrow Record Batch.
*/
export class ArrowRawTransform extends Transform {
constructor() {
super({
readableObjectMode: false,
writableObjectMode: true,
});
}

_transform(
response: ReadRowsResponse,
_: BufferEncoding,
callback: TransformCallback
): void {
if (
!(
response.arrowRecordBatch &&
response.arrowRecordBatch.serializedRecordBatch
)
) {
callback(null);
return;
}
callback(null, response.arrowRecordBatch?.serializedRecordBatch);
}
}

/**
* ArrowRecordReaderTransform implements a node stream Transform that reads
* a byte stream of raw Arrow Record Batch and convert to a stream of Arrow
* RecordBatchStreamReader.
*/
export class ArrowRecordReaderTransform extends Transform {
private session: ReadSession;

constructor(session: ReadSession) {
super({
objectMode: true,
});
this.session = session;
}

_transform(
serializedRecordBatch: Uint8Array,
_: BufferEncoding,
callback: TransformCallback
): void {
const buf = Buffer.concat([
this.session.arrowSchema?.serializedSchema as Uint8Array,
serializedRecordBatch,
]);
const reader = RecordBatchReader.from(buf);
callback(null, reader);
}
}

/**
* ArrowRecordBatchTransform implements a node stream Transform that reads
* a RecordBatchStreamReader and convert a stream of Arrow RecordBatch.
*/
export class ArrowRecordBatchTransform extends Transform {
constructor() {
super({
objectMode: true,
});
}

_transform(
reader: RecordBatchStreamReader,
_: BufferEncoding,
callback: TransformCallback
): void {
const batches = reader.readAll();
for (const row of batches) {
this.push(row);
}
callback(null);
}
}

/**
* ArrowRecordBatchTableRowTransform implements a node stream Transform that reads
* an Arrow RecordBatch and convert a stream of BigQuery TableRow.
*/
export class ArrowRecordBatchTableRowTransform extends Transform {
constructor() {
super({
objectMode: true,
});
}

_transform(
batch: RecordBatch,
_: BufferEncoding,
callback: TransformCallback
): void {
const rows = new Array(batch.numRows);
for (let i = 0; i < batch.numRows; i++) {
rows[i] = {
f: new Array(batch.numCols),
};
}
for (let j = 0; j < batch.numCols; j++) {
const column = batch.selectAt([j]);
const columnName = column.schema.fields[0].name;
for (let i = 0; i < batch.numRows; i++) {
const fieldData = column.get(i);
const fieldValue = fieldData?.toJSON()[columnName];
rows[i].f[j] = {
v: convertArrowValue(fieldValue),
};
}
}
for (let i = 0; i < batch.numRows; i++) {
this.push(rows[i]);
}
callback(null);
}
}

function convertArrowValue(fieldValue: any): any {

Check warning on line 159 in src/reader/arrow_transform.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type

Check warning on line 159 in src/reader/arrow_transform.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
if (typeof fieldValue === 'object') {
if (fieldValue instanceof Vector) {
const arr = fieldValue.toJSON();
return arr.map((v: any) => {

Check warning on line 163 in src/reader/arrow_transform.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
return {v: convertArrowValue(v)};
});
}
const tableRow: TableRow = {f: []};
Object.keys(fieldValue).forEach(key => {
tableRow.f?.push({
v: convertArrowValue(fieldValue[key]),
});
});
return tableRow;
}
return fieldValue;
}
33 changes: 33 additions & 0 deletions src/reader/data_format.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import * as protos from '../../protos/protos';

export type DataFormat =
protos.google.cloud.bigquery.storage.v1.IReadSession['dataFormat'];
const DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat;

/**
* Return data in Apache Arrow format.
*
* @memberof reader
*/
export const ArrowFormat: DataFormat = 'ARROW';

/**
* Return data in Apache Avro format.
*
* @memberof reader
*/
export const AvroFormat: DataFormat = 'AVRO';
Loading

0 comments on commit 03f2b1f

Please sign in to comment.