Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13974] Handle idle Storage Api streams #16906

Merged
merged 4 commits into from
Mar 2, 2022
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,18 @@
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.ShardedKey;
Expand Down Expand Up @@ -92,12 +97,14 @@ public class StorageApiWritesShardedRecords<DestinationT, ElementT>
extends PTransform<
PCollection<KV<ShardedKey<DestinationT>, Iterable<byte[]>>>, PCollection<Void>> {
private static final Logger LOG = LoggerFactory.getLogger(StorageApiWritesShardedRecords.class);
private static final Duration DEFAULT_STREAM_IDLE_TIME = Duration.standardHours(1);

private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
private final CreateDisposition createDisposition;
private final String kmsKey;
private final BigQueryServices bqServices;
private final Coder<DestinationT> destinationCoder;
private final Duration streamIdleTime = DEFAULT_STREAM_IDLE_TIME;
private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();

private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
Expand Down Expand Up @@ -148,7 +155,7 @@ public PCollection<Void> expand(
PCollection<KV<String, Operation>> written =
input.apply(
"Write Records",
ParDo.of(new WriteRecordsDoFn(operationName))
ParDo.of(new WriteRecordsDoFn(operationName, streamIdleTime))
.withSideInputs(dynamicDestinations.getSideInputs()));

SchemaCoder<Operation> operationCoder;
Expand Down Expand Up @@ -185,6 +192,8 @@ class WriteRecordsDoFn
Metrics.counter(WriteRecordsDoFn.class, "recordsAppended");
private final Counter streamsCreated =
Metrics.counter(WriteRecordsDoFn.class, "streamsCreated");
private final Counter streamsIdle =
Metrics.counter(WriteRecordsDoFn.class, "idleStreamsFinalized");
private final Counter appendFailures =
Metrics.counter(WriteRecordsDoFn.class, "appendFailures");
private final Counter appendOffsetFailures =
Expand Down Expand Up @@ -212,8 +221,14 @@ class WriteRecordsDoFn
@StateId("streamOffset")
private final StateSpec<ValueState<Long>> streamOffsetSpec = StateSpecs.value();

public WriteRecordsDoFn(String operationName) {
@TimerId("idleTimer")
private final TimerSpec idleTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

private final Duration streamIdleTime;

public WriteRecordsDoFn(String operationName, Duration streamIdleTime) {
this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
this.streamIdleTime = streamIdleTime;
}

@StartBundle
Expand All @@ -229,6 +244,7 @@ String getOrCreateStream(
String tableId,
ValueState<String> streamName,
ValueState<Long> streamOffset,
Timer streamIdleTimer,
DatasetService datasetService)
throws IOException, InterruptedException {
String stream = streamName.read();
Expand All @@ -239,6 +255,12 @@ String getOrCreateStream(
streamOffset.write(0L);
streamsCreated.inc();
}
// Reset the idle timer.
streamIdleTimer
.offset(streamIdleTime)
.withOutputTimestamp(GlobalWindow.INSTANCE.maxTimestamp())
.setRelative();

return stream;
}

Expand Down Expand Up @@ -270,6 +292,7 @@ public void process(
@Element KV<ShardedKey<DestinationT>, Iterable<byte[]>> element,
final @AlwaysFetched @StateId("streamName") ValueState<String> streamName,
final @AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
@TimerId("idleTimer") Timer idleTimer,
final OutputReceiver<KV<String, Operation>> o)
throws Exception {
dynamicDestinations.setSideInputAccessorFromProcessContext(c);
Expand Down Expand Up @@ -343,7 +366,8 @@ public String toString() {
// Clear the stream name, forcing a new one to be created.
streamName.write("");
}
String stream = getOrCreateStream(tableId, streamName, streamOffset, datasetService);
String stream =
getOrCreateStream(tableId, streamName, streamOffset, idleTimer, datasetService);
StreamAppendClient appendClient =
APPEND_CLIENTS.get(
stream, () -> datasetService.getStreamAppendClient(stream, descriptor));
Expand Down Expand Up @@ -404,22 +428,27 @@ public String toString() {
"Got error " + failedContext.getError() + " closing " + failedContext.streamName);
clearClients.accept(contexts);
appendFailures.inc();
if (statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS)) {
// This means that the offset we have stored does not match the current end of
// the stream in the Storage API. Usually this happens because a crash or a bundle
// failure
// happened after an append but before the worker could checkpoint it's
// state. The records that were appended in a failed bundle will be retried,
// meaning that the unflushed tail of the stream must be discarded to prevent
// duplicates.
boolean offsetMismatch =
statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS);
// This implies that the stream doesn't exist or has already been finalized. In this
// case we have no
// choice but to create a new stream.
boolean streamDoesntExist = statusCode.equals(Code.INVALID_ARGUMENT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess FAILED_PRECONDITIONS can be returned when a WriteStream is finalized by idleness.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it? This seems like a bug if it's the case, as FAILED_PRECONDITION is generally used for errors that are fixable by the user (e.g. creating a file in a directory that does not exists), while this is an error that will always continue regardless of what the user does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yirutang is it true that FAILED_PRECONDITION can also be returned?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reuvenlax Sorry. Please ignore my comment. I read 217851955#comment12 about changing error code to FAILED_PRECONDITIONS but missed comment#14.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think FAILED_PRECONDITION is possible for SREAM_ALREADY_FINALIZED and NOT_FOUND can sometimes be returned for STREAM_NOT_FOUND. Hope we could have better specific exceptions soon. Currently StreamFinalized exception should already be thrown.

https://github.com/googleapis/java-bigquerystorage/blob/main/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java#L98

if (offsetMismatch || streamDoesntExist) {
appendOffsetFailures.inc();
LOG.warn(
"Append to "
+ failedContext
+ " failed with "
+ failedContext.getError()
+ " Will retry with a new stream");
// This means that the offset we have stored does not match the current end of
// the stream in the Storage API. Usually this happens because a crash or a bundle
// failure
// happened after an append but before the worker could checkpoint it's
// state. The records that were appended in a failed bundle will be retried,
// meaning that the unflushed tail of the stream must be discarded to prevent
// duplicates.

// Finalize the stream and clear streamName so a new stream will be created.
o.output(
KV.of(failedContext.streamName, new Operation(failedContext.offset - 1, true)));
Expand Down Expand Up @@ -466,24 +495,48 @@ public String toString() {

java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now());
appendLatencyDistribution.update(timeElapsed.toMillis());
idleTimer
.offset(streamIdleTime)
.withOutputTimestamp(GlobalWindow.INSTANCE.maxTimestamp())
.setRelative();
}

@OnWindowExpiration
public void onWindowExpiration(
// called by the idleTimer and window-expiration handlers.
private void finalizeStream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be a risk of some exceptions thrown here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry didn't send it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't have exceptions here.

@AlwaysFetched @StateId("streamName") ValueState<String> streamName,
@AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
OutputReceiver<KV<String, Operation>> o) {
// Window is done - usually because the pipeline has been drained. Make sure to clean up
// streams so that they are not leaked.
String stream = MoreObjects.firstNonNull(streamName.read(), null);

if (!Strings.isNullOrEmpty(stream)) {
// Finalize the stream
long nextOffset = MoreObjects.firstNonNull(streamOffset.read(), 0L);
o.output(KV.of(stream, new Operation(nextOffset - 1, true)));
streamName.clear();
streamOffset.clear();
// Make sure that the stream object is closed.
APPEND_CLIENTS.invalidate(stream);
}
}

@OnTimer("idleTimer")
public void onTimer(
@AlwaysFetched @StateId("streamName") ValueState<String> streamName,
@AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
OutputReceiver<KV<String, Operation>> o) {
// Stream is idle - clear it.
finalizeStream(streamName, streamOffset, o);
streamsIdle.inc();
}

@OnWindowExpiration
public void onWindowExpiration(
@AlwaysFetched @StateId("streamName") ValueState<String> streamName,
@AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
OutputReceiver<KV<String, Operation>> o) {
// Window is done - usually because the pipeline has been drained. Make sure to clean up
// streams so that they are not leaked.
finalizeStream(streamName, streamOffset, o);
}
}
}