Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor BQ to expose all beam's configurations #5456

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

RustedBones
Copy link
Contributor

@RustedBones RustedBones commented Aug 20, 2024

Here are the main changes:

  • the BQ Table source has a single normalized definition, with multiple constrictors (form string spec or TableReference). It nows includes an optional Table.Filter that can be used is the storage read API to project and filter.

  • read API changes with

API method returned type
bigQuerySelect export TableRow
bigQuerySelectFormat export T
bigQueryTable export TableRow
bigQueryTableFormat export T
bigQueryStorage direct TableRow
bigQueryStorageFormat direct T
typedBigQuery export T
typedBigQueryStorage direct T

Format API take a BigqueryIO.Format object allowing to convert either from GenericRecord (this should be prefered) or TableRow

  • The Storage Api allow to pass an ErrorHandler (Fix Add ErrorHandling in BigQuery #5530). In order to preserve a flat structure ScioContext.errorSink(): ErrorSink has been added. This allow to do the following
val errorSink = sc.errorSink()
sc.bigQueryStorageFormat[MyType](
  table,
  format,
  errorHandler = errorSink.handler
)
val errors: SCollection[BadRecord] = errorSink.sink()

The handler can be passed to multiple IOs before sink is materialized. The sink will flatten the errors from the IOs.

Copy link

codecov bot commented Aug 20, 2024

Codecov Report

Attention: Patch coverage is 47.58364% with 141 lines in your changes missing coverage. Please review.

Project coverage is 61.47%. Comparing base (d5d20ad) to head (37b545f).

Files with missing lines Patch % Lines
...otify/scio/bigquery/syntax/ScioContextSyntax.scala 30.15% 44 Missing ⚠️
...n/scala/com/spotify/scio/bigquery/BigQueryIO.scala 67.76% 39 Missing ⚠️
...rc/main/scala/com/spotify/scio/bigquery/taps.scala 0.00% 22 Missing ⚠️
...cala/com/spotify/scio/bigquery/BigQueryTypes.scala 41.93% 18 Missing ⚠️
...otify/scio/bigquery/syntax/SCollectionSyntax.scala 25.00% 9 Missing ⚠️
...io/bigquery/dynamic/syntax/SCollectionSyntax.scala 0.00% 3 Missing ⚠️
...scala/com/spotify/scio/bigquery/MockBigQuery.scala 0.00% 2 Missing ⚠️
...la/com/spotify/scio/bigquery/client/BigQuery.scala 0.00% 2 Missing ⚠️
.../src/main/scala/com/spotify/scio/ScioContext.scala 50.00% 1 Missing ⚠️
...la/com/spotify/scio/bigquery/client/QueryOps.scala 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5456      +/-   ##
==========================================
+ Coverage   61.29%   61.47%   +0.18%     
==========================================
  Files         314      315       +1     
  Lines       11250    11269      +19     
  Branches      793      823      +30     
==========================================
+ Hits         6896     6928      +32     
+ Misses       4354     4341      -13     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

import org.apache.beam.sdk.values.{PCollection, PCollectionTuple, TupleTag}

/**
* A sink for error records.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bit more explanation on error records could be helpful, maybe:

Suggested change
* A sink for error records.
* A sink for error records.
*
* An error record is produced by certain PTransforms that catch processing exceptions and transform the resulting (element, exception) pair into a [[BadRecord]] instance.
* When an ErrorSink is configured (via ScioContext#errorSink), these BadRecords can be accessed as an SCollection by invoking the ErrorSink#sink method.
* An ErrorSink is useful if you'd like to set up special handling of exceptions (incrementing Counters, logging the exceptions in a database, etc).

* Once the [[sink]] is materialized, the [[handler]] must not be used anymore.
*/
sealed trait ErrorSink {
def handler: ErrorHandler[BadRecord, _]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could def handler be private[scio]? not sure when a user would need to access this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the API exposed by beam. As mentioned in the description we do not pass the ErrorSink directly.

sc.bigQueryStorageFormat[MyType](
  table,
  format,
  errorHandler = errorSink.handler
)

I was thinking of adding to the ScioContext a beam java like API too

def registerBadRecordErrorHandler[T](handler: PTransform[PCollection[BadRecord], T] sinkTransform): BadRecordErrorHandler[OutputT]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add ErrorHandling in BigQuery
2 participants