Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Fix handling errors (#77)
Browse files Browse the repository at this point in the history
* Fail on retry failure

* Capture error records
  • Loading branch information
sravankorumilli authored Feb 18, 2021
1 parent 7291d88 commit bd3e0a1
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ public List<Record> convert(final Iterable<ConsumerRecord<byte[], byte[]>> messa
for (ConsumerRecord<byte[], byte[]> message : messages) {
if (message.value() == null) {
// don't handle empty message
statsClient.increment("kafka.batch.records.null," + statsClient.getBqTags());
if (appConfig.getFailOnNullMessage()) {
statsClient.gauge("record.processing.failure,type=null," + statsClient.getBqTags(), 1);
throw new NullInputMessageException(message.offset());
}
statsClient.increment("kafka.error.records.count,type=null," + statsClient.getBqTags());
continue;
}
OffsetInfo offsetInfo = new OffsetInfo(message.topic(), message.partition(), message.offset(), message.timestamp());
Expand All @@ -68,8 +69,8 @@ private Map<String, Object> mapToColumns(ConsumerRecord<byte[], byte[]> message)
} catch (InvalidProtocolBufferException e) {
log.info("failed to deserialize message: {} at offset: {}, partition: {}", UnknownProtoFields.toString(message.value()),
message.offset(), message.partition());
statsClient.increment("kafka.batch.records.deserialize_error," + statsClient.getBqTags());
if (appConfig.getFailOnDeserializeError()) {
statsClient.gauge("record.processing.failure,type=deserialize," + statsClient.getBqTags(), 1);
throw new InvalidProtocolBufferException(e);
}
}
Expand Down Expand Up @@ -99,6 +100,7 @@ private void sinkToErrorWriter(List<Record> errorRecordList) {
log.error("Exception::Batch with records size: {} contains DLQ sinkable records but failed to sink", errorRecordList.size());
throw new ErrorWriterFailedException(dlqStatus.getException().orElse(null));
}
statsClient.count("kafka.error.records.count,type=deserialize," + statsClient.getBqTags(), errorRecordList.size());
}
}
}
13 changes: 7 additions & 6 deletions src/main/java/com/gojek/beast/sink/bq/BqSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public Status push(Records records) {
// if batch contains records that we can't really handle, fail whole batch
List<Record> unhandledRecords = filteredResponse.getUnhandledRecords();
if (!unhandledRecords.isEmpty()) {
log.info("Batch with records size: {} contains invalid records, marking this batch to fail", unhandledRecords.size());
statsClient.gauge("data.error.records,type=invalid", unhandledRecords.size());
log.error("Batch with records size: {} contains invalid records, marking this batch to fail", unhandledRecords.size());
statsClient.gauge("record.processing.failure,type=invalid," + statsClient.getBqTags(), unhandledRecords.size());
return new InsertStatus(false, response.getInsertErrors());
}

Expand All @@ -58,18 +58,19 @@ public Status push(Records records) {
// try inserting valid records into bq
InsertAllResponse retriedResponse = insertIntoBQ(retryableRecords);
if (retriedResponse.hasErrors()) {
return new InsertStatus(true, retriedResponse.getInsertErrors());
statsClient.gauge("record.processing.failure,type=retry," + statsClient.getBqTags(), retryableRecords.size());
return new InsertStatus(false, retriedResponse.getInsertErrors());
}
}

// DLQ sinkable records
List<Record> oobRecords = filteredResponse.getOobRecords();
if (!oobRecords.isEmpty()) {
log.info("Error handler parsed OOB records size {}, handoff to the writer {}", oobRecords.size(), errorWriter.getClass().getSimpleName());
statsClient.gauge("data.error.records,type=oob", oobRecords.size());
log.warn("Error handler parsed OOB records size {}, handoff to the writer {}", oobRecords.size(), errorWriter.getClass().getSimpleName());
statsClient.count("kafka.error.records.count,type=oob," + statsClient.getBqTags(), oobRecords.size());
final Status dlqStatus = errorWriter.writeRecords(ImmutableMap.of(RecordsErrorType.OOB, oobRecords));
if (!dlqStatus.isSuccess()) {
log.info("Batch with records size: {} contains DLQ sinkable records but failed to sink", oobRecords.size());
log.error("Batch with records size: {} contains DLQ sinkable records but failed to sink", oobRecords.size());
return dlqStatus;
}
}
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/com/gojek/beast/sink/BqSinkTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,35 @@ public void testHandlerSucceedsForOOBAndValidErrors() {

assertTrue(status.isSuccess());
}

@Test
public void testFailsOnRetrialOfValidRecordsFails() {
Record user1 = new Record(offsetInfo, createUser("alice"), null, null);
Record user2 = new Record(offsetInfo, createUser("mary"), null, null);
Record user3 = new Record(offsetInfo, createUser("jazz"), null, null);

Records records = new Records(Arrays.asList(user1, user2, user3));
InsertAllRequest request = builder
.addRow(user1.getId(), user1.getColumns())
.addRow(user2.getId(), user2.getColumns())
.addRow(user3.getId(), user3.getColumns())
.build();

InsertAllRequest.Builder retryRequestBuilder = InsertAllRequest.newBuilder(tableId);
InsertAllRequest retryRequest = retryRequestBuilder
.addRow(user2.getId(), user2.getColumns())
.build();

BQFilteredResponse filteredResponse = new BQFilteredResponse(Arrays.asList(user2), Arrays.asList(),
Arrays.asList(user1));
when(responseParser.parseResponse(any(), any())).thenReturn(filteredResponse);
when(bigquery.insertAll(request)).thenReturn(failureResponse);
when(bigquery.insertAll(retryRequest)).thenReturn(failureResponse);

Sink localSink = new BqSink(bigquery, tableId, responseParser, bqRow, errorWriter);
Status status = localSink.push(records);
verify(bigquery).insertAll(request);

assertFalse(status.isSuccess());
}
}

0 comments on commit bd3e0a1

Please sign in to comment.