Skip to content

Commit

Permalink
Add all failed writers to event
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-moseley committed May 3, 2022
1 parent 26f0743 commit 9bf7629
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ void writeWithMetadataWriters(
} catch (Exception e) {
meetException = true;
writer.reset(dbName, tableName);
addOrThrowException(e, tableString, dbName, tableName, writer.getClass().getName());
addOrThrowException(e, tableString, dbName, tableName, getFailedWriterList(writer));
}
}
}
Expand All @@ -360,7 +360,7 @@ static List<MetadataWriter> getAllowedMetadataWriters(GobblinMetadataChangeEvent
.collect(Collectors.toList());
}

private void addOrThrowException(Exception e, String tableString, String dbName, String tableName, String failedWriter) throws IOException {
private void addOrThrowException(Exception e, String tableString, String dbName, String tableName, List<String> failedWriters) throws IOException {
TableStatus tableStatus = tableOperationTypeMap.get(tableString);
Map<String, List<GobblinMetadataException>> tableErrorMap = this.datasetErrorMap.getOrDefault(tableStatus.datasetPath, new HashMap<>());
GobblinMetadataException lastException = null;
Expand All @@ -374,7 +374,7 @@ private void addOrThrowException(Exception e, String tableString, String dbName,
lastException.highWatermark = tableStatus.gmceHighWatermark;
} else {
lastException = new GobblinMetadataException(tableStatus.datasetPath, dbName, tableName, tableStatus.gmceTopicPartition,
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, failedWriter, tableStatus.operationType, partitionKeysMap.get(tableString), e);
tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, failedWriters, tableStatus.operationType, partitionKeysMap.get(tableString), e);
tableErrorMap.get(tableString).add(lastException);
}
if (e instanceof HiveMetadataWriterWithPartitionInfoException) {
Expand Down Expand Up @@ -407,7 +407,7 @@ private void flush(String dbName, String tableName) throws IOException {
} catch (IOException e) {
meetException = true;
writer.reset(dbName, tableName);
addOrThrowException(e, tableString, dbName, tableName, writer.getClass().getName());
addOrThrowException(e, tableString, dbName, tableName, getFailedWriterList(writer));
}
}
}
Expand Down Expand Up @@ -519,7 +519,7 @@ private void submitFailureEvents(List<GobblinMetadataException> exceptionList) {
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_TOPIC_PARTITION, exception.GMCETopicPartition.split("-")[1]);
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_HIGH_WATERMARK, Long.toString(exception.highWatermark));
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.GMCE_LOW_WATERMARK, Long.toString(exception.lowWatermark));
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILED_WRITER_KEY, exception.failedWriter);
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.FAILED_WRITERS_KEY, Joiner.on(',').join(exception.failedWriters));
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.OPERATION_TYPE_KEY, exception.operationType.toString());
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.ADDED_PARTITION_VALUES_KEY, Joiner.on(',').join(exception.addedPartitionValues));
gobblinTrackingEvent.addMetadata(IcebergMCEMetadataKeys.DROPPED_PARTITION_VALUES_KEY, Joiner.on(',').join(exception.droppedPartitionValues));
Expand All @@ -532,4 +532,9 @@ private void submitFailureEvents(List<GobblinMetadataException> exceptionList) {
eventSubmitter.submit(gobblinTrackingEvent);
}
}

private List<String> getFailedWriterList(MetadataWriter failedWriter) {
List<MetadataWriter> failedWriters = metadataWriters.subList(metadataWriters.indexOf(failedWriter), metadataWriters.size());
return failedWriters.stream().map(writer -> writer.getClass().getName()).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@ public class GobblinMetadataException extends IOException {
public String GMCETopicPartition;
public long highWatermark;
public long lowWatermark;
public String failedWriter;
public List<String> failedWriters;
public OperationType operationType;
public Set<String> addedPartitionValues;
public Set<String> droppedPartitionValues;
public List<HiveRegistrationUnit.Column> partitionKeys;

GobblinMetadataException(String datasetPath, String dbName, String tableName, String GMCETopicPartition, long lowWatermark, long highWatermark,
String failedWriter, OperationType operationType, List<HiveRegistrationUnit.Column> partitionKeys, Exception exception) {
List<String> failedWriters, OperationType operationType, List<HiveRegistrationUnit.Column> partitionKeys, Exception exception) {
super(String.format("failed to flush table %s, %s", dbName, tableName), exception);
this.datasetPath = datasetPath;
this.dbName = dbName;
this.tableName = tableName;
this.GMCETopicPartition = GMCETopicPartition;
this.highWatermark = highWatermark;
this.lowWatermark = lowWatermark;
this.failedWriter = failedWriter;
this.failedWriters = failedWriters;
this.operationType = operationType;
this.addedPartitionValues = new HashSet<>();
this.droppedPartitionValues = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class IcebergMCEMetadataKeys {
public static final String FAILURE_EVENT_TABLE_NAME = "tableName";
public static final String CLUSTER_IDENTIFIER_KEY_NAME = "clusterIdentifier";
public static final String EXCEPTION_MESSAGE_KEY_NAME = "exceptionMessage";
public static final String FAILED_WRITER_KEY = "failedWriter";
public static final String FAILED_WRITERS_KEY = "failedWriters";
public static final String OPERATION_TYPE_KEY = "operationType";
public static final String ADDED_PARTITION_VALUES_KEY = "failedToAddPartitionValues";
public static final String DROPPED_PARTITION_VALUES_KEY = "failedToDropPartitionValues";
Expand Down

0 comments on commit 9bf7629

Please sign in to comment.