-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add wrapper for reading table data using Storage API #431
feat: add wrapper for reading table data using Storage API #431
Conversation
Some early results:
|
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Re third bullet point, something like this test I am working on in gax - in this test in gax, I have two streams piped together (I'm using pipeline) - attemptStream is the stream that's making the request to the server, and userStream the consuming stream. They are being piped together
Some things to note
- Idk if you'd need the error checking I have here - I'm specifically writing a retry test so the .on('error') stuff may be irrelevant - I'm also firing errors using the showcase server at very specific points in the data stream (mainly after the 85th item)
- This test has listeners for
data
on both theattemptStream
anduserStream
- each of those appends the data content to an array. At the end, the two arrays should be the same length. I think a test of yours could usepipeline
to connect streams and check that the lengths agree at the end. - If you experiment with this test and can't find a way to ever make it fail/be confident it's doing anything, don't sweat it. It may be more important on the retries case and when I originally wrote this comment I was very much in the weeds with that CBT stuff
- the pausing happens in the user stream, but not in the attemptStream. This forces data to build up in the buffer. You could also change that
userStream.on('data')
bit to only pause whenresults2
is a certain length, but I think pausing every time is more of a surefire way to force that buffer to fill.
let results: string[] = [];
let results2: string[] = [];
attemptStream.on('data', (data: {content: string}) => {
results.push(data.content);
});
attemptStream.on('end', () => {
assert.strictEqual(results.length, 100);
});
attemptStream.on('error', (e: GoogleError) => {
assert.strictEqual(e.code, 13);
});
userStream.on('data', (data: {content: string}) => {
results2.push(data.content);
userStream.pause();
setTimeout(() => {
userStream.resume();
}, 100);
});
userStream.on('end', () => {
assert.strictEqual(results.length, 100);
assert.strictEqual(results2.length, 100);
});
userStream.on('error', (e: GoogleError) => {
// assert.strictEqual(results2.length, 85)
assert.strictEqual(results.length, 85);
assert.strictEqual(e.code, 14);
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some more comments, address some minor issues and pushed a new test using CTA and generate_array
to test it with more data cc @shollyman @leahecole
return stream | ||
.pipe(new ArrowRawTransform()) | ||
.pipe(new ArrowRecordReaderTransform(info!)) | ||
.pipe(new ArrowRecordBatchTransform()) as ResourceStream<RecordBatch>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried to use pipeline
, but is meant to be used when we have a destination. In this case where, we are just applying a bunch of transforms and we don't know the destination beforehand.
The error that I got:
TypeError [ERR_INVALID_ARG_TYPE]: The "streams[stream.length - 1]" property must be of type function. Received an instance of ArrowRecordBatchTransform
return stream.pipe( | ||
new ArrowRecordBatchTableRowTransform() | ||
) as ResourceStream<TableRow>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errors are handled by the consumer of the stream and when used internally like here, we handle the errors.
Add support for easily reading Tables using the BigQuery Storage API instead of the BigQuery API. This will provide increased performance and reduced memory usage for most use cases and will allow users to keep using the same interface as they used to use on our main library or fetch data directly via a new veneer on BigQuery Storage Read API