-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-50] Implement BigQueryIO.Write as a custom sink. #48
Conversation
R: @dhalperi |
467a924
to
d252f4c
Compare
@peihe Can you rebase? I don't want GitHub to lose all comments across the rename. |
@dhalperi |
@@ -778,6 +786,7 @@ private Bound(String name, TableReference ref, | |||
this.createDisposition = createDisposition; | |||
this.writeDisposition = writeDisposition; | |||
this.validate = validate; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
drop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Overall, looks pretty good. I'd like to see some more comments -- lots of things going on in this file -- and more sanity checks. I'd like to see some end-to-end tests of BigQueryIO.Write -- e.g., the write data is actually written. |
a7d6519
to
0141f7f
Compare
BackOff backoff = new AttemptBoundedExponentialBackOff( | ||
MAX_LOAD_JOB_RPC_ATTEMPTS, INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS); | ||
String retryingJobId = jobId + "-" + i; | ||
insertLoadJob(retryingJobId, loadConfig, Sleeper.DEFAULT, backoff); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirming: this will block until the load job itself succeeds, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no
pollJobStatus() blocks until the job is done(SUCCEEDED or FAILED) or it exceeds max retries (UNKNOWN)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, I meant finishes. thanks,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log job ID -- attempting to create BigQuery load job?
0141f7f
to
b70452b
Compare
CreateDisposition createDisposition, | ||
String tempFile, | ||
Coder<TableRow> coder) { | ||
super(tempFile, "" /* extension */); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not json as extension?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.avro, right?
done
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
Ready for another look. |
CreateDisposition createDisposition, WriteDisposition writeDisposition, | ||
boolean validate, BigQueryServices testBigQueryServices) { | ||
super(name); | ||
this.jsonTableRef = jsonTableRef; | ||
this.tableRefFunction = tableRefFunction; | ||
this.schema = schema; | ||
this.jsonSchema = jsonSchema; | ||
this.createDisposition = createDisposition; | ||
this.writeDisposition = writeDisposition; | ||
this.validate = validate; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkNotNulls for all non-@Nullable
fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That the tests now fail is a symptom, I think, that we did not set the defaults in the initial constructor. But feel free to revert if it's not easy to fix, as this is only a minor user win.
I would, however, like you to manually disable the backend IO translation for Dataflow service and run the 1T import integration test (manually is fine for now). That reasonable? |
72c1fe7
to
24fc00e
Compare
Verified with 0M, 1M, 1G, 1T bq imports. Thanks @dhalperi |
This is back ported from: apache/beam#48
Create BigQueryServices interface and added BigQueryIO pipeline tests. Removed BigQueryIO.Write evaluator.
a63b339
to
26a4eb1
Compare
Yay! |
This is back ported from: apache/beam#48
This is back ported from: apache/beam#48
This is back ported from: apache/beam#48
This is back ported from: apache/beam#48
This is back ported from: apache/beam#48
Wire state request handler for streaming.
…active by default)
…active by default)
[BEAM-3900] Kryo - build and test fix
🤖 I have created a release \*beep\* \*boop\* --- ## [1.7.0](https://www.github.com/googleapis/python-firestore/compare/v1.6.2...v1.7.0) (2020-05-18) ### Features * Create CODEOWNERS ([apache#40](https://www.github.com/googleapis/python-firestore/issues/40)) ([a0cbf40](https://www.github.com/googleapis/python-firestore/commit/a0cbf403fe88f07c83bec81f275ac168be573e93)) ### Bug Fixes * **firestore:** fix get and getall method of transaction ([apache#16](https://www.github.com/googleapis/python-firestore/issues/16)) ([de3aca0](https://www.github.com/googleapis/python-firestore/commit/de3aca0e78b68f66eb76bc679c6e95b0746ad590)) * Update team to be in correct org ([apache#43](https://www.github.com/googleapis/python-firestore/issues/43)) ([bef5a3a](https://www.github.com/googleapis/python-firestore/commit/bef5a3af4613b5f9d753bb6f45275e480e4bb301)) * **firestore:** fix lint ([apache#48](https://www.github.com/googleapis/python-firestore/issues/48)) ([7fa00c4](https://www.github.com/googleapis/python-firestore/commit/7fa00c49dc3fab1d687fff9246f3e5ff0682cac0)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please).
commit: "Move tempLocation to PipelineOptions." is sent out as a separate pull request.
I am including it to have diff base, and will remove it once the other pull request is merged.