Skip to content

Commit

Permalink
Merge pull request #16906: [BEAM-13974] Handle idle Storage Api streams
Browse files Browse the repository at this point in the history
  • Loading branch information
reuvenlax authored Mar 2, 2022
1 parent c0f9eff commit da36470
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ public void process(PipelineOptions pipelineOptions, @Element KV<String, Operati
// TODO: Storage API should provide a more-specific way of identifying this failure.
return RetryType.DONT_RETRY;
}
if (statusCode.equals(Code.NOT_FOUND)) {
return RetryType.DONT_RETRY;
}
}
return RetryType.RETRY_ALL_OPERATIONS;
},
Expand Down Expand Up @@ -207,6 +210,13 @@ public void process(PipelineOptions pipelineOptions, @Element KV<String, Operati
+ " failed with "
+ Iterables.getFirst(contexts, null).getError());
finalizeOperationsFailed.inc();
Throwable error = Iterables.getFirst(contexts, null).getError();
if (error instanceof ApiException) {
Code statusCode = ((ApiException) error).getStatusCode().getCode();
if (statusCode.equals(Code.NOT_FOUND)) {
return RetryType.DONT_RETRY;
}
}
return RetryType.RETRY_ALL_OPERATIONS;
},
r -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.Descriptors.Descriptor;
Expand Down Expand Up @@ -56,13 +57,18 @@
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.ShardedKey;
Expand Down Expand Up @@ -92,12 +98,14 @@ public class StorageApiWritesShardedRecords<DestinationT, ElementT>
extends PTransform<
PCollection<KV<ShardedKey<DestinationT>, Iterable<byte[]>>>, PCollection<Void>> {
private static final Logger LOG = LoggerFactory.getLogger(StorageApiWritesShardedRecords.class);
private static final Duration DEFAULT_STREAM_IDLE_TIME = Duration.standardHours(1);

private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
private final CreateDisposition createDisposition;
private final String kmsKey;
private final BigQueryServices bqServices;
private final Coder<DestinationT> destinationCoder;
private final Duration streamIdleTime = DEFAULT_STREAM_IDLE_TIME;
private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();

private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
Expand Down Expand Up @@ -148,7 +156,7 @@ public PCollection<Void> expand(
PCollection<KV<String, Operation>> written =
input.apply(
"Write Records",
ParDo.of(new WriteRecordsDoFn(operationName))
ParDo.of(new WriteRecordsDoFn(operationName, streamIdleTime))
.withSideInputs(dynamicDestinations.getSideInputs()));

SchemaCoder<Operation> operationCoder;
Expand Down Expand Up @@ -185,6 +193,8 @@ class WriteRecordsDoFn
Metrics.counter(WriteRecordsDoFn.class, "recordsAppended");
private final Counter streamsCreated =
Metrics.counter(WriteRecordsDoFn.class, "streamsCreated");
private final Counter streamsIdle =
Metrics.counter(WriteRecordsDoFn.class, "idleStreamsFinalized");
private final Counter appendFailures =
Metrics.counter(WriteRecordsDoFn.class, "appendFailures");
private final Counter appendOffsetFailures =
Expand Down Expand Up @@ -212,8 +222,14 @@ class WriteRecordsDoFn
@StateId("streamOffset")
private final StateSpec<ValueState<Long>> streamOffsetSpec = StateSpecs.value();

public WriteRecordsDoFn(String operationName) {
@TimerId("idleTimer")
private final TimerSpec idleTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

private final Duration streamIdleTime;

public WriteRecordsDoFn(String operationName, Duration streamIdleTime) {
this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
this.streamIdleTime = streamIdleTime;
}

@StartBundle
Expand All @@ -229,6 +245,7 @@ String getOrCreateStream(
String tableId,
ValueState<String> streamName,
ValueState<Long> streamOffset,
Timer streamIdleTimer,
DatasetService datasetService)
throws IOException, InterruptedException {
String stream = streamName.read();
Expand All @@ -239,6 +256,12 @@ String getOrCreateStream(
streamOffset.write(0L);
streamsCreated.inc();
}
// Reset the idle timer.
streamIdleTimer
.offset(streamIdleTime)
.withOutputTimestamp(GlobalWindow.INSTANCE.maxTimestamp())
.setRelative();

return stream;
}

Expand Down Expand Up @@ -270,6 +293,7 @@ public void process(
@Element KV<ShardedKey<DestinationT>, Iterable<byte[]>> element,
final @AlwaysFetched @StateId("streamName") ValueState<String> streamName,
final @AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
@TimerId("idleTimer") Timer idleTimer,
final OutputReceiver<KV<String, Operation>> o)
throws Exception {
dynamicDestinations.setSideInputAccessorFromProcessContext(c);
Expand Down Expand Up @@ -343,7 +367,8 @@ public String toString() {
// Clear the stream name, forcing a new one to be created.
streamName.write("");
}
String stream = getOrCreateStream(tableId, streamName, streamOffset, datasetService);
String stream =
getOrCreateStream(tableId, streamName, streamOffset, idleTimer, datasetService);
StreamAppendClient appendClient =
APPEND_CLIENTS.get(
stream, () -> datasetService.getStreamAppendClient(stream, descriptor));
Expand Down Expand Up @@ -398,28 +423,39 @@ public String toString() {
// The first context is always the one that fails.
AppendRowsContext failedContext =
Preconditions.checkNotNull(Iterables.getFirst(failedContexts, null));
Status.Code statusCode = Status.fromThrowable(failedContext.getError()).getCode();
// Invalidate the StreamWriter and force a new one to be created.
LOG.error(
"Got error " + failedContext.getError() + " closing " + failedContext.streamName);
clearClients.accept(contexts);
appendFailures.inc();
if (statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS)) {

boolean explicitStreamFinalized =
failedContext.getError() instanceof StreamFinalizedException;
Status.Code statusCode = Status.fromThrowable(failedContext.getError()).getCode();
// This means that the offset we have stored does not match the current end of
// the stream in the Storage API. Usually this happens because a crash or a bundle
// failure
// happened after an append but before the worker could checkpoint it's
// state. The records that were appended in a failed bundle will be retried,
// meaning that the unflushed tail of the stream must be discarded to prevent
// duplicates.
boolean offsetMismatch =
statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS);
// This implies that the stream doesn't exist or has already been finalized. In this
// case we have no choice but to create a new stream.
boolean streamDoesNotExist =
explicitStreamFinalized
|| statusCode.equals(Code.INVALID_ARGUMENT)
|| statusCode.equals(Code.NOT_FOUND)
|| statusCode.equals(Code.FAILED_PRECONDITION);
if (offsetMismatch || streamDoesNotExist) {
appendOffsetFailures.inc();
LOG.warn(
"Append to "
+ failedContext
+ " failed with "
+ failedContext.getError()
+ " Will retry with a new stream");
// This means that the offset we have stored does not match the current end of
// the stream in the Storage API. Usually this happens because a crash or a bundle
// failure
// happened after an append but before the worker could checkpoint it's
// state. The records that were appended in a failed bundle will be retried,
// meaning that the unflushed tail of the stream must be discarded to prevent
// duplicates.

// Finalize the stream and clear streamName so a new stream will be created.
o.output(
KV.of(failedContext.streamName, new Operation(failedContext.offset - 1, true)));
Expand Down Expand Up @@ -466,24 +502,48 @@ public String toString() {

java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now());
appendLatencyDistribution.update(timeElapsed.toMillis());
idleTimer
.offset(streamIdleTime)
.withOutputTimestamp(GlobalWindow.INSTANCE.maxTimestamp())
.setRelative();
}

@OnWindowExpiration
public void onWindowExpiration(
// called by the idleTimer and window-expiration handlers.
private void finalizeStream(
@AlwaysFetched @StateId("streamName") ValueState<String> streamName,
@AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
OutputReceiver<KV<String, Operation>> o) {
// Window is done - usually because the pipeline has been drained. Make sure to clean up
// streams so that they are not leaked.
String stream = MoreObjects.firstNonNull(streamName.read(), null);

if (!Strings.isNullOrEmpty(stream)) {
// Finalize the stream
long nextOffset = MoreObjects.firstNonNull(streamOffset.read(), 0L);
o.output(KV.of(stream, new Operation(nextOffset - 1, true)));
streamName.clear();
streamOffset.clear();
// Make sure that the stream object is closed.
APPEND_CLIENTS.invalidate(stream);
}
}

@OnTimer("idleTimer")
public void onTimer(
@AlwaysFetched @StateId("streamName") ValueState<String> streamName,
@AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
OutputReceiver<KV<String, Operation>> o) {
// Stream is idle - clear it.
finalizeStream(streamName, streamOffset, o);
streamsIdle.inc();
}

@OnWindowExpiration
public void onWindowExpiration(
@AlwaysFetched @StateId("streamName") ValueState<String> streamName,
@AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
OutputReceiver<KV<String, Operation>> o) {
// Window is done - usually because the pipeline has been drained. Make sure to clean up
// streams so that they are not leaked.
finalizeStream(streamName, streamOffset, o);
}
}
}

0 comments on commit da36470

Please sign in to comment.