Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vortex performance improvement: Enable multiple stream clients per worker #17550

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,22 @@ public interface BigQueryOptions
void setUseStorageWriteApiAtLeastOnce(Boolean value);

@Description(
"If set, then BigQueryIO.Write will default to using this number of Storage Write API streams.")
"If set, then BigQueryIO.Write will default to using this number of Storage Write API streams. ")
@Default.Integer(0)
prodriguezdefino marked this conversation as resolved.
Show resolved Hide resolved
Integer getNumStorageWriteApiStreams();

void setNumStorageWriteApiStreams(Integer value);

@Description(
"When using the {@link org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method#STORAGE_API_AT_LEAST_ONCE} write method, "
+ "this option sets the number of stream append clients that will be allocated at a per worker and destination basis. "
+ "A large value can cause a large pipeline to go over the BigQuery connection quota quickly on a job with "
+ "enough number of workers. On the case of low-mid volume pipelines using the default configuration should be sufficient.")
@Default.Integer(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit confusing - need to clarify that this only applies for at-least once writes using the default stream

Integer getNumStorageWriteApiStreamAppendClients();

void setNumStorageWriteApiStreamAppendClients(Integer value);

@Description(
"If set, then BigQueryIO.Write will default to triggering the Storage Write API writes this often.")
Integer getStorageWriteApiTriggeringFrequencySec();
Expand Down Expand Up @@ -129,4 +139,10 @@ public interface BigQueryOptions
Integer getStorageApiAppendThresholdBytes();

void setStorageApiAppendThresholdBytes(Integer value);

@Description("Maximum (best effort) record count of a single append to the storage API.")
@Default.Integer(150000)
Integer getStorageApiAppendThresholdRecordCount();

void setStorageApiAppendThresholdRecordCount(Integer value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public StorageApiWriteRecordsInconsistent(
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
BigQueryServices bqServices) {
this.dynamicDestinations = dynamicDestinations;
;
this.bqServices = bqServices;
}

Expand All @@ -57,7 +56,9 @@ public PCollection<Void> expand(PCollection<KV<DestinationT, StorageApiWritePayl
dynamicDestinations,
bqServices,
true,
bigQueryOptions.getStorageApiAppendThresholdBytes()))
bigQueryOptions.getStorageApiAppendThresholdBytes(),
bigQueryOptions.getStorageApiAppendThresholdRecordCount(),
bigQueryOptions.getNumStorageWriteApiStreamAppendClients()))
.withSideInputs(dynamicDestinations.getSideInputs()));
return input.getPipeline().apply("voids", Create.empty(VoidCoder.of()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand All @@ -57,7 +60,6 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalNotification;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.joda.time.Duration;
Expand All @@ -79,13 +81,12 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
private final BigQueryServices bqServices;
private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();

// The Guava cache object is threadsafe. However our protocol requires that client pin the
// StreamAppendClient
// after looking up the cache, and we must ensure that the cache is not accessed in between the
// lookup and the pin
// (any access of the cache could trigger element expiration). Therefore most used of
// APPEND_CLIENTS should
// synchronize.
/**
* The Guava cache object is thread-safe. However our protocol requires that client pin the
* StreamAppendClient after looking up the cache, and we must ensure that the cache is not
* accessed in between the lookup and the pin (any access of the cache could trigger element
* expiration). Therefore most used of APPEND_CLIENTS should synchronize.
*/
private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
CacheBuilder.newBuilder()
.expireAfterAccess(15, TimeUnit.MINUTES)
Expand Down Expand Up @@ -138,7 +139,9 @@ public PCollection<Void> expand(PCollection<KV<DestinationT, StorageApiWritePayl
dynamicDestinations,
bqServices,
false,
options.getStorageApiAppendThresholdBytes()))
options.getStorageApiAppendThresholdBytes(),
options.getStorageApiAppendThresholdRecordCount(),
options.getNumStorageWriteApiStreamAppendClients()))
.withSideInputs(dynamicDestinations.getSideInputs()))
.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
// Calling Reshuffle makes the output stable - once this completes, the append operations
Expand Down Expand Up @@ -171,18 +174,21 @@ class DestinationState {
private final boolean useDefaultStream;
private DescriptorWrapper descriptorWrapper;
private Instant nextCacheTickle;
private final int clientNumber;

public DestinationState(
String tableUrn,
MessageConverter<ElementT> messageConverter,
DatasetService datasetService,
boolean useDefaultStream) {
boolean useDefaultStream,
int streamAppendClientCount) {
this.tableUrn = tableUrn;
this.messageConverter = messageConverter;
this.pendingMessages = Lists.newArrayList();
this.datasetService = datasetService;
this.useDefaultStream = useDefaultStream;
this.descriptorWrapper = messageConverter.getSchemaDescriptor();
this.clientNumber = new Random().nextInt(streamAppendClientCount);
}

void teardown() {
Expand All @@ -197,6 +203,13 @@ String getDefaultStreamName() {
return BigQueryHelpers.stripPartitionDecorator(tableUrn) + "/streams/_default";
}

String getStreamAppendClientCacheEntryName() {
if (useDefaultStream) {
return getDefaultStreamName() + "-client" + clientNumber;
}
return this.streamName;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit weird, since this code doesn't always use the default stream. Now the cache is probably not needed in the non default stream case (since we'll create a new stream for every bundle), however if we change that we need to rename the cache and also make sure to close the client (since right now we rely on the cache removal listener to close the client)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will make the changes to consider default streams and per bundle on-demand streams.


String createStreamIfNeeded() {
try {
if (!useDefaultStream) {
Expand All @@ -213,6 +226,14 @@ String createStreamIfNeeded() {
return this.streamName;
}

StreamAppendClient generateClient() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit - remove try/catch since the calling function already catches the exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

try {
return datasetService.getStreamAppendClient(streamName, descriptorWrapper.descriptor);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

StreamAppendClient getStreamAppendClient(boolean lookupCache) {
try {
if (streamAppendClient == null) {
Expand All @@ -221,14 +242,11 @@ StreamAppendClient getStreamAppendClient(boolean lookupCache) {
if (lookupCache) {
this.streamAppendClient =
APPEND_CLIENTS.get(
streamName,
() ->
datasetService.getStreamAppendClient(
streamName, descriptorWrapper.descriptor));
getStreamAppendClientCacheEntryName(), () -> generateClient());
} else {
this.streamAppendClient =
datasetService.getStreamAppendClient(streamName, descriptorWrapper.descriptor);
APPEND_CLIENTS.put(streamName, this.streamAppendClient);
this.streamAppendClient = generateClient();
// override the clients in the cache
APPEND_CLIENTS.put(getStreamAppendClientCacheEntryName(), streamAppendClient);
}
this.streamAppendClient.pin();
}
Expand All @@ -244,7 +262,7 @@ StreamAppendClient getStreamAppendClient(boolean lookupCache) {
void maybeTickleCache() {
if (streamAppendClient != null && Instant.now().isAfter(nextCacheTickle)) {
synchronized (APPEND_CLIENTS) {
APPEND_CLIENTS.getIfPresent(streamName);
APPEND_CLIENTS.getIfPresent(getStreamAppendClientCacheEntryName());
}
nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1));
}
Expand All @@ -263,11 +281,12 @@ void invalidateWriteStream() {
// thread has already invalidated
// and recreated the stream).
@Nullable
StreamAppendClient cachedAppendClient = APPEND_CLIENTS.getIfPresent(streamName);
StreamAppendClient cachedAppendClient =
APPEND_CLIENTS.getIfPresent(getStreamAppendClientCacheEntryName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - store cachedEntryName in a local variable instead of recomputing twice here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (cachedAppendClient != null
&& System.identityHashCode(cachedAppendClient)
== System.identityHashCode(streamAppendClient)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't quite right - we're now invalidating all of the StreamWriters when any one of them fails. I think instead you want to just null out the one that failed and allow it to be recreated the next get.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The invalidation here corresponds to a schema mismatch, shouldn't all the clients be invalidated for a particular stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made the changes to only invalidate the writer in use by the bundle, not all of them.

APPEND_CLIENTS.invalidate(streamName);
APPEND_CLIENTS.invalidate(getStreamAppendClientCacheEntryName());
}
}
streamAppendClient = null;
Expand Down Expand Up @@ -327,19 +346,20 @@ void flush(RetryManager<AppendRowsResponse, Context<AppendRowsResponse>> retryMa
inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds());
if (writeStream.getInflightWaitSeconds() > 5) {
LOG.warn(
"Storage Api write delay more than " + writeStream.getInflightWaitSeconds());
"Storage Api write delay more than {} seconds.",
writeStream.getInflightWaitSeconds());
}
return response;
} catch (Exception e) {
throw new RuntimeException(e);
}
},
contexts -> {
LOG.info(
"Append to stream "
+ streamName
+ " failed with error "
+ Iterables.getFirst(contexts, null).getError());
LOG.warn(
"Append to stream {} by client #{} failed with error, operations will be retried. Details: {}",
streamName,
clientNumber,
retrieveErrorDetails(contexts));
invalidateWriteStream();
appendFailures.inc();
return RetryType.RETRY_ALL_OPERATIONS;
Expand All @@ -350,35 +370,53 @@ void flush(RetryManager<AppendRowsResponse, Context<AppendRowsResponse>> retryMa
new Context<>());
maybeTickleCache();
}

String retrieveErrorDetails(Iterable<Context<AppendRowsResponse>> contexts) {
return StreamSupport.stream(contexts.spliterator(), false)
.map(ctx -> ctx.getError())
.map(
err ->
String.format(
"message: %s, stacktrace: %s",
err.toString(),
Lists.newArrayList(err.getStackTrace()).stream()
.map(se -> se.toString())
.collect(Collectors.joining("\n"))))
.collect(Collectors.joining(","));
}
}

private Map<DestinationT, DestinationState> destinations = Maps.newHashMap();
private final TwoLevelMessageConverterCache<DestinationT, ElementT> messageConverters;
private transient @Nullable DatasetService datasetService;
private int numPendingRecords = 0;
private int numPendingRecordBytes = 0;
private static final int FLUSH_THRESHOLD_RECORDS = 150000;
private final int flushThresholdBytes;
private final int flushThresholdCount;
private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
private final BigQueryServices bqServices;
private final boolean useDefaultStream;
private int streamAppendClientCount;

WriteRecordsDoFn(
String operationName,
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
BigQueryServices bqServices,
boolean useDefaultStream,
int flushThresholdBytes) {
int flushThresholdBytes,
int flushThresholdCount,
int streamAppendClientCount) {
this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
this.dynamicDestinations = dynamicDestinations;
this.bqServices = bqServices;
this.useDefaultStream = useDefaultStream;
this.flushThresholdBytes = flushThresholdBytes;
this.flushThresholdCount = flushThresholdCount;
this.streamAppendClientCount = streamAppendClientCount;
}

boolean shouldFlush() {
return numPendingRecords > FLUSH_THRESHOLD_RECORDS
|| numPendingRecordBytes > flushThresholdBytes;
return numPendingRecords > flushThresholdCount || numPendingRecordBytes > flushThresholdBytes;
}

void flushIfNecessary() throws Exception {
Expand Down Expand Up @@ -432,7 +470,11 @@ DestinationState createDestinationState(
throw new RuntimeException(e);
}
return new DestinationState(
tableDestination1.getTableUrn(), messageConverter, datasetService, useDefaultStream);
tableDestination1.getTableUrn(),
messageConverter,
datasetService,
useDefaultStream,
streamAppendClientCount);
}

@ProcessElement
Expand Down