Skip to content

Commit

Permalink
Merge pull request #26503: fix dataloss bug in batch Storage API sink. (
Browse files Browse the repository at this point in the history
#26512)

Co-authored-by: reuvenlax <[email protected]>
  • Loading branch information
bvolpato and reuvenlax authored May 4, 2023
1 parent a486f51 commit e80e46a
Showing 1 changed file with 55 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Status;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -224,11 +226,14 @@ static class AppendRowsContext extends RetryManager.Operation.Context<AppendRows
ProtoRows protoRows;
List<org.joda.time.Instant> timestamps;

int failureCount;

public AppendRowsContext(
long offset, ProtoRows protoRows, List<org.joda.time.Instant> timestamps) {
this.offset = offset;
this.protoRows = protoRows;
this.timestamps = timestamps;
this.failureCount = 0;
}
}

Expand Down Expand Up @@ -301,17 +306,20 @@ String getStreamAppendClientCacheEntryKey() {
}

String getOrCreateStreamName() {
try {
if (!useDefaultStream) {
this.streamName =
Preconditions.checkStateNotNull(maybeDatasetService)
.createWriteStream(tableUrn, Type.PENDING)
.getName();
} else {
this.streamName = getDefaultStreamName();
if (Strings.isNullOrEmpty(this.streamName)) {
try {
if (!useDefaultStream) {
this.streamName =
Preconditions.checkStateNotNull(maybeDatasetService)
.createWriteStream(tableUrn, Type.PENDING)
.getName();
this.currentOffset = 0;
} else {
this.streamName = getDefaultStreamName();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return this.streamName;
}
Expand Down Expand Up @@ -376,7 +384,6 @@ AppendClientInfo getAppendClientInfo(
// This pin is "owned" by the current DoFn.
Preconditions.checkStateNotNull(newAppendClientInfo.getStreamAppendClient()).pin();
}
this.currentOffset = 0;
nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1));
this.appendClientInfo = newAppendClientInfo;
}
Expand Down Expand Up @@ -507,6 +514,7 @@ long flush(

long offset = -1;
if (!this.useDefaultStream) {
getOrCreateStreamName(); // Force creation of the stream before we get offsets.
offset = this.currentOffset;
this.currentOffset += inserts.getSerializedRowsCount();
}
Expand Down Expand Up @@ -598,7 +606,42 @@ long flush(
streamName,
clientNumber,
retrieveErrorDetails(contexts));
failedContext.failureCount += 1;

// Maximum number of times we retry before we fail the work item.
if (failedContext.failureCount > 5) {
throw new RuntimeException("More than 5 attempts to call AppendRows failed.");
}

// The following errors are known to be persistent, so always fail the work item in
// this case.
Throwable error = Preconditions.checkStateNotNull(failedContext.getError());
Status.Code statusCode = Status.fromThrowable(error).getCode();
if (statusCode.equals(Status.Code.OUT_OF_RANGE)
|| statusCode.equals(Status.Code.ALREADY_EXISTS)) {
throw new RuntimeException(
"Append to stream "
+ this.streamName
+ " failed with invalid "
+ "offset of "
+ failedContext.offset);
}

boolean streamDoesNotExist =
failedContext.getError() instanceof Exceptions.StreamFinalizedException
|| statusCode.equals(Status.Code.INVALID_ARGUMENT)
|| statusCode.equals(Status.Code.NOT_FOUND)
|| statusCode.equals(Status.Code.FAILED_PRECONDITION);
if (streamDoesNotExist) {
throw new RuntimeException(
"Append to stream "
+ this.streamName
+ " failed with stream "
+ "doesn't exist");
}

invalidateWriteStream();

appendFailures.inc();
return RetryType.RETRY_ALL_OPERATIONS;
},
Expand Down Expand Up @@ -629,7 +672,7 @@ long flush(
String retrieveErrorDetails(Iterable<AppendRowsContext> failedContext) {
return StreamSupport.stream(failedContext.spliterator(), false)
.<@Nullable Throwable>map(AppendRowsContext::getError)
.filter(err -> err != null)
.filter(Objects::nonNull)
.map(
thrw ->
Preconditions.checkStateNotNull(thrw).toString()
Expand Down

0 comments on commit e80e46a

Please sign in to comment.