diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java index 1c3686a08096..719e1ddb97b7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java @@ -172,6 +172,9 @@ public void process(PipelineOptions pipelineOptions, @Element KV { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index 15982a3ea46f..9e966310b5ce 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -22,6 +22,7 @@ import com.google.api.core.ApiFuture; import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException; import com.google.cloud.bigquery.storage.v1.ProtoRows; import com.google.cloud.bigquery.storage.v1.WriteStream.Type; import com.google.protobuf.Descriptors.Descriptor; @@ -56,6 +57,10 @@ import org.apache.beam.sdk.schemas.SchemaRegistry; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; @@ -63,6 +68,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.ShardedKey; @@ -92,12 +98,14 @@ public class StorageApiWritesShardedRecords extends PTransform< PCollection, Iterable>>, PCollection> { private static final Logger LOG = LoggerFactory.getLogger(StorageApiWritesShardedRecords.class); + private static final Duration DEFAULT_STREAM_IDLE_TIME = Duration.standardHours(1); private final StorageApiDynamicDestinations dynamicDestinations; private final CreateDisposition createDisposition; private final String kmsKey; private final BigQueryServices bqServices; private final Coder destinationCoder; + private final Duration streamIdleTime = DEFAULT_STREAM_IDLE_TIME; private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool(); private static final Cache APPEND_CLIENTS = @@ -148,7 +156,7 @@ public PCollection expand( PCollection> written = input.apply( "Write Records", - ParDo.of(new WriteRecordsDoFn(operationName)) + ParDo.of(new WriteRecordsDoFn(operationName, streamIdleTime)) .withSideInputs(dynamicDestinations.getSideInputs())); SchemaCoder operationCoder; @@ -185,6 +193,8 @@ class WriteRecordsDoFn Metrics.counter(WriteRecordsDoFn.class, "recordsAppended"); private final Counter streamsCreated = Metrics.counter(WriteRecordsDoFn.class, "streamsCreated"); + private final Counter streamsIdle = + Metrics.counter(WriteRecordsDoFn.class, "idleStreamsFinalized"); private final Counter appendFailures = Metrics.counter(WriteRecordsDoFn.class, "appendFailures"); private final Counter appendOffsetFailures = @@ -212,8 +222,14 @@ class WriteRecordsDoFn @StateId("streamOffset") private final StateSpec> streamOffsetSpec = StateSpecs.value(); - public WriteRecordsDoFn(String operationName) { + @TimerId("idleTimer") + private final TimerSpec idleTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + private final Duration streamIdleTime; + + public WriteRecordsDoFn(String operationName, Duration streamIdleTime) { this.messageConverters = new TwoLevelMessageConverterCache<>(operationName); + this.streamIdleTime = streamIdleTime; } @StartBundle @@ -229,6 +245,7 @@ String getOrCreateStream( String tableId, ValueState streamName, ValueState streamOffset, + Timer streamIdleTimer, DatasetService datasetService) throws IOException, InterruptedException { String stream = streamName.read(); @@ -239,6 +256,12 @@ String getOrCreateStream( streamOffset.write(0L); streamsCreated.inc(); } + // Reset the idle timer. + streamIdleTimer + .offset(streamIdleTime) + .withOutputTimestamp(GlobalWindow.INSTANCE.maxTimestamp()) + .setRelative(); + return stream; } @@ -270,6 +293,7 @@ public void process( @Element KV, Iterable> element, final @AlwaysFetched @StateId("streamName") ValueState streamName, final @AlwaysFetched @StateId("streamOffset") ValueState streamOffset, + @TimerId("idleTimer") Timer idleTimer, final OutputReceiver> o) throws Exception { dynamicDestinations.setSideInputAccessorFromProcessContext(c); @@ -343,7 +367,8 @@ public String toString() { // Clear the stream name, forcing a new one to be created. streamName.write(""); } - String stream = getOrCreateStream(tableId, streamName, streamOffset, datasetService); + String stream = + getOrCreateStream(tableId, streamName, streamOffset, idleTimer, datasetService); StreamAppendClient appendClient = APPEND_CLIENTS.get( stream, () -> datasetService.getStreamAppendClient(stream, descriptor)); @@ -398,13 +423,32 @@ public String toString() { // The first context is always the one that fails. AppendRowsContext failedContext = Preconditions.checkNotNull(Iterables.getFirst(failedContexts, null)); - Status.Code statusCode = Status.fromThrowable(failedContext.getError()).getCode(); // Invalidate the StreamWriter and force a new one to be created. LOG.error( "Got error " + failedContext.getError() + " closing " + failedContext.streamName); clearClients.accept(contexts); appendFailures.inc(); - if (statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS)) { + + boolean explicitStreamFinalized = + failedContext.getError() instanceof StreamFinalizedException; + Status.Code statusCode = Status.fromThrowable(failedContext.getError()).getCode(); + // This means that the offset we have stored does not match the current end of + // the stream in the Storage API. Usually this happens because a crash or a bundle + // failure + // happened after an append but before the worker could checkpoint it's + // state. The records that were appended in a failed bundle will be retried, + // meaning that the unflushed tail of the stream must be discarded to prevent + // duplicates. + boolean offsetMismatch = + statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS); + // This implies that the stream doesn't exist or has already been finalized. In this + // case we have no choice but to create a new stream. + boolean streamDoesNotExist = + explicitStreamFinalized + || statusCode.equals(Code.INVALID_ARGUMENT) + || statusCode.equals(Code.NOT_FOUND) + || statusCode.equals(Code.FAILED_PRECONDITION); + if (offsetMismatch || streamDoesNotExist) { appendOffsetFailures.inc(); LOG.warn( "Append to " @@ -412,14 +456,6 @@ public String toString() { + " failed with " + failedContext.getError() + " Will retry with a new stream"); - // This means that the offset we have stored does not match the current end of - // the stream in the Storage API. Usually this happens because a crash or a bundle - // failure - // happened after an append but before the worker could checkpoint it's - // state. The records that were appended in a failed bundle will be retried, - // meaning that the unflushed tail of the stream must be discarded to prevent - // duplicates. - // Finalize the stream and clear streamName so a new stream will be created. o.output( KV.of(failedContext.streamName, new Operation(failedContext.offset - 1, true))); @@ -466,24 +502,48 @@ public String toString() { java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now()); appendLatencyDistribution.update(timeElapsed.toMillis()); + idleTimer + .offset(streamIdleTime) + .withOutputTimestamp(GlobalWindow.INSTANCE.maxTimestamp()) + .setRelative(); } - @OnWindowExpiration - public void onWindowExpiration( + // called by the idleTimer and window-expiration handlers. + private void finalizeStream( @AlwaysFetched @StateId("streamName") ValueState streamName, @AlwaysFetched @StateId("streamOffset") ValueState streamOffset, OutputReceiver> o) { - // Window is done - usually because the pipeline has been drained. Make sure to clean up - // streams so that they are not leaked. String stream = MoreObjects.firstNonNull(streamName.read(), null); if (!Strings.isNullOrEmpty(stream)) { // Finalize the stream long nextOffset = MoreObjects.firstNonNull(streamOffset.read(), 0L); o.output(KV.of(stream, new Operation(nextOffset - 1, true))); + streamName.clear(); + streamOffset.clear(); // Make sure that the stream object is closed. APPEND_CLIENTS.invalidate(stream); } } + + @OnTimer("idleTimer") + public void onTimer( + @AlwaysFetched @StateId("streamName") ValueState streamName, + @AlwaysFetched @StateId("streamOffset") ValueState streamOffset, + OutputReceiver> o) { + // Stream is idle - clear it. + finalizeStream(streamName, streamOffset, o); + streamsIdle.inc(); + } + + @OnWindowExpiration + public void onWindowExpiration( + @AlwaysFetched @StateId("streamName") ValueState streamName, + @AlwaysFetched @StateId("streamOffset") ValueState streamOffset, + OutputReceiver> o) { + // Window is done - usually because the pipeline has been drained. Make sure to clean up + // streams so that they are not leaked. + finalizeStream(streamName, streamOffset, o); + } } }