Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Troubleshooting Guide #176

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
300194b
Populate User Agent Header and Trace ID to track Read and Write API u…
prashastia Sep 23, 2024
c11c8ee
Populate User Agent Header and Trace ID to track Read and Write API u…
prashastia Sep 23, 2024
4e75e1e
Populate User Agent Header and Trace ID to track Read and Write API u…
prashastia Sep 23, 2024
f30c973
Remove the commented out `.setTraceId()` for
prashastia Sep 24, 2024
8549563
Remove User Agent Header from the transport channel.
prashastia Sep 24, 2024
7f08506
throw BigQueryConnector Exception in case of schema mismatch.
prashastia Sep 30, 2024
e4c3f49
throw BigQueryConnector Exception in case of schema mismatch.
prashastia Sep 30, 2024
72d0db2
Fix No edge case time type.
prashastia Sep 30, 2024
a652677
Merge remote-tracking branch 'dataproc/main' into improve-debuggability
prashastia Sep 30, 2024
df4741f
Remove log messages.
prashastia Sep 30, 2024
64c38c8
Fix day, month and year partition reads along with tests.
prashastia Oct 24, 2024
cefd02f
Merge remote-tracking branch 'dataproc/main' into troubleshooting-guide
prashastia Oct 27, 2024
80e8470
Add TROUBLESHOOT.md
prashastia Oct 27, 2024
99903c6
Merge remote-tracking branch 'dataproc/main' into troubleshooting-guide
prashastia Oct 29, 2024
e7fd9ea
Address review changes for code.
prashastia Oct 29, 2024
5fddee2
Address review changes for code.
prashastia Oct 29, 2024
39c2ea0
Merge remote-tracking branch 'dataproc/main' into troubleshooting-guide
prashastia Oct 29, 2024
250240c
Address review changes for code.
prashastia Nov 6, 2024
2d179c4
Remove extra space.
prashastia Nov 6, 2024
6acf4d1
Address Review Comments - elaborate BigQueryConnector Exception.
prashastia Nov 6, 2024
5aa3b10
Reset Since Checkpoint variables in SnapshotState rather than first w…
prashastia Nov 12, 2024
89050de
Merge remote-tracking branch 'dataproc/main' into troubleshooting-guide
prashastia Nov 12, 2024
c126f5a
Address review comments.
prashastia Nov 19, 2024
4e52d6a
Address review comments.
prashastia Nov 19, 2024
dc2babf
Address review comments.
prashastia Dec 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 231 additions & 0 deletions TROUBLESHOOT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
# Debug Manual Flink - BigQuery Connector

## Overview
When problems occur, debug tools enable users to quickly pinpoint the root cause, whether it's
within the connector itself, the BigQuery API, or the network infrastructure.
Detailed logs, traces, and debugging capabilities help you effectively troubleshoot issues,
reducing the time and effort required to resolve them.
This document is a debugging manual highlighting possible issues that could be faced by the users
and how to troubleshoot them. It also aids in identifying the error causes and proposed steps for
mitigating them.

<i>Note: This is a running document containing the issues reported to the Developers of the
[Flink - BigQuery Connector](https://github.com/GoogleCloudDataproc/flink-bigquery-connector).
Please feel free to contribute as per any new issues that may arise.</i>

## Flink Metrics
### Writing to BigQuery
As an effort to increase observability, the Flink-BigQuery Connector Team provides support to
collect and report Flink Metrics for a Flink Application.
The details of the metrics supported so far are available in the
[README](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/README.md#flink-metrics).

An overview of records seen by the writer and number of records successfully written to
BigQuery would enable users to track the flow of records through their application.
Comparison of the counts would help troubleshoot if records are being seen by the Sink (Writer)
at all, are being serialized and send to the Write API and if BigQuery is able to write
these records.

See Flink’s documentation on
[Metric Reporters](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/)
to deploy the reporter most conveniently as per user’s need.

## General Debugging
### Records are not being written to BigQuery
With the help of [metrics available as a part of 0.4.0 release](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/main/README.md#flink-metrics) of the connector,
users should be able to track the number of records that enter the sink(writer)
(`numberOfRecordsSeenByWriter`) and the
number of records successfully written to BigQuery(`numberOfRecordsWrittenToBigQuery`).
If records are not being written to BigQuery, then records are stuck in either of the two phases:
#### The records are not arriving at the sink
- Most likely not an issue in the sink, since previous subtasks are not passing records forward for the sink.

#### The records are arriving at the sink but not being successfully written to BigQuery.
Check the logs or error message for the following errors:

#### `BigQuerySerializationException`
- This message illustrates that the record could not be serialized by the connector.
- Note: This error is <b>logged not thrown</b>, explaining why the record could not be serialized.
- In the future, this will be supplemented with dead letter queues.

#### `BigQueryConnectorException`
<b><i>
- Users are requested to:
- Refer to [Write API Documentation](https://cloud.google.com/bigquery/docs/write-api) for proper usage of the BigQuery Storage Write API
and the errors that might arise from violation of the same.
- Ensure checkpointing is enabled and properly configured. Setting too large checkpointing
intervals can cause records to not be written for long periods of time.
</b> </i>


There are multiple instances under which the connector throws a `BigQueryConnectorException`.
The exception contains an error message that indicates the problem causing the exception.
<br>
Few reasons are documented below along with steps to mitigate the issue.
##### Source
- `Failed to forward the deserialized record to the next operator.` - Flink pipelines may
be subjected to incompatible record schemas and values.
This error is thrown when flink is unable to read a record field value corresponding to
its schema.
This is detailed in [section below](#misconfigured-table-schema-goes-uncaught)

- `Problems creating the BigQuery Storage Read session` - The SplitDiscoverer throws this
exception when unable to form the Storage Read Client while attempting to create
the Storage Read Session.
Ensure proper auth permissions,
correct credentials, and internet connectivity to prevent this error.

- `Error in converting Avro Generic Record to Row Data` -When reading records from BigQuery
via Table API, records read via Storage Read API in Avro format need to be converted to
RowData format. This error is thrown when the application cannot convert
(deserialize) Avro Generic Record format to RowData records. Users should provide records with
supported RowData and Generic Record formats. Record values should match the record schema.

- `The provided Flink expression is not supported` - Thrown by application when unable to
resolve the provided SQL expressions (e.g. ">", "<", etc) while using the SQL mode to
read records. Users are requested to provide SQL queries containing only the
[supported expressions](https://github.com/GoogleCloudDataproc/flink-bigquery-connector/blob/ae7950613190b2d878bac736331ebb0032fd4d1f/flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/table/restrictions/BigQueryRestriction.java#L54).

- `Can't read query results without setting a SQL query.` or
`Can't read query results without setting a GCP project.` - Thrown by
[BigQuerySource](flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/source/BigQuerySource.java)
when reading records from a query (via Datastream API) in case the SQL Query or GCP Project is not provided.
Please provide the SQL Query (this is the query to be executed) and GCP Project
(project which runs the query) to read records from a query.

##### Sink
- `Error while writing to BigQuery` - Thrown by the Sink Writer in case it is unable to
write records to BigQuery.
This error is thrown when the API responsible for
writing records (Default Stream in at-least-once mode and Buffered Stream in
exactly once mode `append()` request) fails and for
exactly-once when response has an error or offset mismatch.
Users are requested to consult the
[Java Documentation](https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/3.5.0/com.google.cloud.bigquery.storage.v1.Exceptions)
for various exceptions thrown by the Storage Write APIs.

- `Unable to connect to BigQuery` - Thrown by the Sink Writer
in case it is unable to form the Write Client while attempting to write records to BigQuery.
Failing of StreamWriter creation causes this error.
Ensure proper auth permissions, correct credentials,
internet connectivity along with suitable table schema to prevent this error.

- `Commit operation failed` - Thrown by the Commiter(part of Flink's two-phase commit protocol)
in case it is unable to form the Write Client
while attempting to add rows to BQ via Storage Write API.
Ensure proper auth permissions, correct credentials,
and internet connectivity to prevent this error.

- `Could not obtain Descriptor from Descriptor Proto` -
Thrown by the [BigQuerySchemaProvider](flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/sink/serializer/BigQuerySchemaProviderImpl.java)
when unable to convert the Descriptor Proto to Descriptor.
This error might arise when building descriptors fails because the source DescriptorProto is not a
valid Descriptor.
Since the connector handles formation of DescriptorProto (for writing Avro Records to Proto),
users should not face this error.
Users might face this error in case custom serializer is used.

## Known Issues/Limitations
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also reference the 100 max parallelism here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have mentioned this in the Readme, should we mention it here as well?

<b><i>Note: Users must go through the readme documentation for the connector first to
ensure proper usage of the connector.
The connector also has certain limitations which are documented in the readme.</i></b>
### Flink Application with Windows
- Windows are tricky to deal with in Flink. Flink relies on watermarks to close windows.
- In case no records are written to BigQuery, users can observe the flow of records through
the application to ensure the sink (writer) is receiving records.
- In case the sink is not receiving any records, then the windows are not being closed.
- The incoming records are windowed together by Flink Application that is continuously waiting
for closing events (as per the windowing condition) to arrive.
- [Flink's Documentation on Debugging Windows](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/debugging/debugging_event_time/)

### BigQuery sink does not appear to be writing consistently
- The Connector in At Least Once mode writes to BigQuery at two instances - when the append request
reaches its limit and when checkpoints are triggered.
- The write to BigQuery API would not be at a steady rate, it would contain spiking intervals
pertaining to each checkpoint event when data is being written to BigQuery.
jayehwhyehentee marked this conversation as resolved.
Show resolved Hide resolved
- This is applicable for exactly once mode.

### Misconfigured Table Schema goes uncaught
- It must be ensured that the avro records passed via the connector have the correct schema or
compatible schema to that of the BigQuery Table.
- It is also expected that the value passed in the Avro Generic Record follows the Schema.
- In case of a mismatch between the passed avro record value type and expected type,
`BigQueryConnectorException` is thrown.
- Flink does not impose a check on the value of a Generic Record, which means that the Avro
Record's Schema could indicate a `INTEGER` type but yet store a `STRING` type value in the field.
- Hence, a few cases arise due to a mismatch of the avro record value, its schema and expected
BigQuery Schema:
#### For Datastream API
##### Case 1: The Record's Schema is incompatible with the BigQuery Table Schema
- <b>Record value matches the expected BQ schema type</b> (but does not follow the schema rules)
- <i>Example: Read from a table having field of type `STRING` and write to BQ table having
field of type `INTEGER`, but the record value is modified from the read string input to a
long value.</i>
- This case works without any problem since the serializer matches the expected BQ Schema type
to the passed record value.
- <b>Record value matches the record schema type (but incompatible with BQ Table Schema)
<br>
OR
<br>
- Record value does not match either schema type </b>
- This is a mismatch between the expected BQ Schema type and the passed value.
So, `BigQueryConnectorException` would be thrown indicating that the pipeline is unable to
forward the record to the next operator (sink).
##### Case 2: The Record's Schema is compatible with BigQuery Table Schema
- <b> Record value matches the record schema type and the expected BQ schema type </b>

- The desired case, no issues.
- <b> Record value does not match either schema type </b>

- This is a mismatch between the expected BQ Schema type and the passed value.
So, `BigQueryConnectorException` would be thrown indicating that the pipeline is unable to
forward the record to the next operator (sink).

#### For Table API
##### Case 1: The record schema is incompatible with the BigQuery Table Schema
- The schema of the source (transformed source table) should match the schema of the sink table
(BQ Table).
- In case of a mismatch `org.apache.flink.table.api.ValidationException` is thrown.
##### Case 2: The records schema is compatible with BigQuery table schema
- <b> Record value matches the record schema type and the expected BQ schema type </b>

- The desired case, no issues.
- <b> Record value does not match either schema type </b>

- `org.apache.flink.table.api.ValidationException` is thrown when the RowData values do not match
the described schema.

### Records are not available in the BigQuery Table when checkpointing is disabled.
- The connector relies on checkpoints for triggering writes to the BigQuery table.
- It is important to enable checkpointing and configure the interval suitably.

### Problems related to unbounded reads
- Unbounded reads currently have limitations,
and may not work as expected, so users should avoid using them for now.
- It will be temporarily unavailable as we develop a significantly improved version.
- Expect an enhanced feature in 2025!

[//]: # (READ PROBLEMS)
### Flink BigQuery Connector Fails to read record fields of type `RANGE`
- The connector does not support reading from and writing to BigQuery's `RANGE` data type.

### Flink BigQuery Connector Fails to read record fields of type `RECORD` and MODE `NULLABLE`
- Reading a NULLABLE field of type `record` is not supported and throws an exception.
- The read works fine when the record field has the value null.
- The above failure is due to the inbuilt `org.apache.flink.formats.avro.typeutils.AvroSerializer`
is unable to serialize nullable type records.

### BigQuery `INTEGER` type field is read as a long value and not int.
- BigQuery `INTEGER` is used to represent numbers upto 64 bit.
- However, java int is only capable of holding 32 bit numbers.
- Hence, to accommodate all the values BQ field is capable of holding, the `INTEGER`
field is read as a java long instead of java int.

## Additional Debugging facilities offered by Apache Flink
Flink Offers these extra features that might help users to expand their explainability of the
application beyond what is mentioned above.
- [End to end latency tracking](https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/metrics/#end-to-end-latency-tracking)
- [Structuring Flink Logs](https://logback.qos.ch/manual/mdc.html)
- Users can then parse these logs for constructing dashboards
- [Flamegraphs](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/flame_graphs/)

Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ static String convertTime(Object value, boolean micros) {
LocalTime time =
LocalTime.MIDNIGHT.plusNanos(
TimeUnit.MICROSECONDS.toNanos(microSecondsSinceMidnight));
return time.toString();
return time.format(DateTimeFormatter.ISO_TIME);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public DescriptorProto getDescriptorProto() {
public Descriptor getDescriptor() {
try {
return getDescriptorFromDescriptorProto(descriptorProto);
} catch (DescriptorValidationException e) {
} catch (DescriptorValidationException | IllegalArgumentException e) {
throw new BigQueryConnectorException(
String.format(
"Could not obtain Descriptor from Descriptor Proto.%nError: %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQuerySerializationException;
Expand Down Expand Up @@ -132,6 +133,17 @@ void validateAppendResponse(AppendInfo appendInfo) {
numberOfRecordsWrittenToBigQuery.inc(recordsAppended);
numberOfRecordsWrittenToBigQuerySinceCheckpoint.inc(recordsAppended);
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof Exceptions.AppendSerializationError) {
Exceptions.AppendSerializationError appendSerializationError =
(Exceptions.AppendSerializationError) e.getCause();
logger.error(
String.format(
"AppendSerializationError%nCause: %s%nMessage: %s%nRowIndexToErrorMessage: %s%nStreamName: %s",
appendSerializationError.getCause(),
appendSerializationError.getMessage(),
appendSerializationError.getRowIndexToErrorMessage(),
appendSerializationError.getStreamName()));
}
logAndThrowFatalException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

import java.io.IOException;

/**
* A simple Identity de-serialization for pipelines that just want {@link GenericRecord} as response
* from BigQuery.
Expand All @@ -40,7 +38,7 @@ public AvroDeserializationSchema(String avroSchemaString) {
}

@Override
public GenericRecord deserialize(GenericRecord record) throws IOException {
public GenericRecord deserialize(GenericRecord record) {
return record;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,36 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import com.google.cloud.flink.bigquery.common.exceptions.BigQueryConnectorException;
import org.apache.avro.generic.GenericRecord;

import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Simple implementation for the Deserialization schema (from Avro GenericRecord to RowData). */
@Internal
public class AvroToRowDataDeserializationSchema
implements BigQueryDeserializationSchema<GenericRecord, RowData> {
private final AvroToRowDataConverters.AvroToRowDataConverter converter;
private final TypeInformation<RowData> typeInfo;
private static final Logger LOG =
LoggerFactory.getLogger(AvroToRowDataDeserializationSchema.class);

public AvroToRowDataDeserializationSchema(RowType rowType, TypeInformation<RowData> typeInfo) {
this.converter = AvroToRowDataConverters.createRowConverter(rowType);
this.typeInfo = typeInfo;
}

@Override
public RowData deserialize(GenericRecord record) throws IOException {
return (GenericRowData) converter.convert(record);
public RowData deserialize(GenericRecord record) throws BigQueryConnectorException {
try {
return (GenericRowData) converter.convert(record);
} catch (Exception e) {
LOG.error(
String.format(
"Error in converting Avro Generic Record %s to Row Data.%nError: %s.%nCause:%s ",
record.toString(), e.getMessage(), e.getCause()));
throw new BigQueryConnectorException("Error in converting to Row Data", e);
}
}

@Override
Expand Down
Loading
Loading