Skip to content

Commit

Permalink
Vortex performance improvement: Enable multiple stream clients per wo…
Browse files Browse the repository at this point in the history
…rker (#17550) (#17718)

Co-authored-by: pablo rodriguez defino <[email protected]>
  • Loading branch information
y1chi and prodriguezdefino authored May 19, 2022
1 parent 9b8ec45 commit 15a211e
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,22 @@ public interface BigQueryOptions
void setUseStorageWriteApiAtLeastOnce(Boolean value);

@Description(
"If set, then BigQueryIO.Write will default to using this number of Storage Write API streams.")
"If set, then BigQueryIO.Write will default to using this number of Storage Write API streams. ")
@Default.Integer(0)
Integer getNumStorageWriteApiStreams();

void setNumStorageWriteApiStreams(Integer value);

@Description(
"When using the {@link org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method#STORAGE_API_AT_LEAST_ONCE} write method, "
+ "this option sets the number of stream append clients that will be allocated at a per worker and destination basis. "
+ "A large value can cause a large pipeline to go over the BigQuery connection quota quickly on a job with "
+ "enough number of workers. On the case of low-mid volume pipelines using the default configuration should be sufficient.")
@Default.Integer(1)
Integer getNumStorageWriteApiStreamAppendClients();

void setNumStorageWriteApiStreamAppendClients(Integer value);

@Description(
"If set, then BigQueryIO.Write will default to triggering the Storage Write API writes this often.")
Integer getStorageWriteApiTriggeringFrequencySec();
Expand Down Expand Up @@ -129,4 +139,10 @@ public interface BigQueryOptions
Integer getStorageApiAppendThresholdBytes();

void setStorageApiAppendThresholdBytes(Integer value);

@Description("Maximum (best effort) record count of a single append to the storage API.")
@Default.Integer(150000)
Integer getStorageApiAppendThresholdRecordCount();

void setStorageApiAppendThresholdRecordCount(Integer value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public StorageApiWriteRecordsInconsistent(
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
BigQueryServices bqServices) {
this.dynamicDestinations = dynamicDestinations;
;
this.bqServices = bqServices;
}

Expand All @@ -57,7 +56,9 @@ public PCollection<Void> expand(PCollection<KV<DestinationT, StorageApiWritePayl
dynamicDestinations,
bqServices,
true,
bigQueryOptions.getStorageApiAppendThresholdBytes()))
bigQueryOptions.getStorageApiAppendThresholdBytes(),
bigQueryOptions.getStorageApiAppendThresholdRecordCount(),
bigQueryOptions.getNumStorageWriteApiStreamAppendClients()))
.withSideInputs(dynamicDestinations.getSideInputs()));
return input.getPipeline().apply("voids", Create.empty(VoidCoder.of()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand All @@ -57,7 +60,6 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalNotification;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.joda.time.Duration;
Expand All @@ -79,13 +81,12 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
private final BigQueryServices bqServices;
private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();

// The Guava cache object is threadsafe. However our protocol requires that client pin the
// StreamAppendClient
// after looking up the cache, and we must ensure that the cache is not accessed in between the
// lookup and the pin
// (any access of the cache could trigger element expiration). Therefore most used of
// APPEND_CLIENTS should
// synchronize.
/**
* The Guava cache object is thread-safe. However our protocol requires that client pin the
* StreamAppendClient after looking up the cache, and we must ensure that the cache is not
* accessed in between the lookup and the pin (any access of the cache could trigger element
* expiration). Therefore most used of APPEND_CLIENTS should synchronize.
*/
private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
CacheBuilder.newBuilder()
.expireAfterAccess(15, TimeUnit.MINUTES)
Expand Down Expand Up @@ -138,7 +139,9 @@ public PCollection<Void> expand(PCollection<KV<DestinationT, StorageApiWritePayl
dynamicDestinations,
bqServices,
false,
options.getStorageApiAppendThresholdBytes()))
options.getStorageApiAppendThresholdBytes(),
options.getStorageApiAppendThresholdRecordCount(),
options.getNumStorageWriteApiStreamAppendClients()))
.withSideInputs(dynamicDestinations.getSideInputs()))
.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
// Calling Reshuffle makes the output stable - once this completes, the append operations
Expand Down Expand Up @@ -171,18 +174,21 @@ class DestinationState {
private final boolean useDefaultStream;
private DescriptorWrapper descriptorWrapper;
private Instant nextCacheTickle;
private final int clientNumber;

public DestinationState(
String tableUrn,
MessageConverter<ElementT> messageConverter,
DatasetService datasetService,
boolean useDefaultStream) {
boolean useDefaultStream,
int streamAppendClientCount) {
this.tableUrn = tableUrn;
this.messageConverter = messageConverter;
this.pendingMessages = Lists.newArrayList();
this.datasetService = datasetService;
this.useDefaultStream = useDefaultStream;
this.descriptorWrapper = messageConverter.getSchemaDescriptor();
this.clientNumber = new Random().nextInt(streamAppendClientCount);
}

void teardown() {
Expand All @@ -197,6 +203,13 @@ String getDefaultStreamName() {
return BigQueryHelpers.stripPartitionDecorator(tableUrn) + "/streams/_default";
}

String getStreamAppendClientCacheEntryKey() {
if (useDefaultStream) {
return getDefaultStreamName() + "-client" + clientNumber;
}
return this.streamName;
}

String createStreamIfNeeded() {
try {
if (!useDefaultStream) {
Expand All @@ -213,6 +226,10 @@ String createStreamIfNeeded() {
return this.streamName;
}

StreamAppendClient generateClient() throws Exception {
return datasetService.getStreamAppendClient(streamName, descriptorWrapper.descriptor);
}

StreamAppendClient getStreamAppendClient(boolean lookupCache) {
try {
if (streamAppendClient == null) {
Expand All @@ -221,14 +238,11 @@ StreamAppendClient getStreamAppendClient(boolean lookupCache) {
if (lookupCache) {
this.streamAppendClient =
APPEND_CLIENTS.get(
streamName,
() ->
datasetService.getStreamAppendClient(
streamName, descriptorWrapper.descriptor));
getStreamAppendClientCacheEntryKey(), () -> generateClient());
} else {
this.streamAppendClient =
datasetService.getStreamAppendClient(streamName, descriptorWrapper.descriptor);
APPEND_CLIENTS.put(streamName, this.streamAppendClient);
this.streamAppendClient = generateClient();
// override the clients in the cache
APPEND_CLIENTS.put(getStreamAppendClientCacheEntryKey(), streamAppendClient);
}
this.streamAppendClient.pin();
}
Expand All @@ -244,7 +258,7 @@ StreamAppendClient getStreamAppendClient(boolean lookupCache) {
void maybeTickleCache() {
if (streamAppendClient != null && Instant.now().isAfter(nextCacheTickle)) {
synchronized (APPEND_CLIENTS) {
APPEND_CLIENTS.getIfPresent(streamName);
APPEND_CLIENTS.getIfPresent(getStreamAppendClientCacheEntryKey());
}
nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1));
}
Expand All @@ -262,12 +276,13 @@ void invalidateWriteStream() {
// cache still contains the object we created before invalidating (in case another
// thread has already invalidated
// and recreated the stream).
String cacheEntryKey = getStreamAppendClientCacheEntryKey();
@Nullable
StreamAppendClient cachedAppendClient = APPEND_CLIENTS.getIfPresent(streamName);
StreamAppendClient cachedAppendClient = APPEND_CLIENTS.getIfPresent(cacheEntryKey);
if (cachedAppendClient != null
&& System.identityHashCode(cachedAppendClient)
== System.identityHashCode(streamAppendClient)) {
APPEND_CLIENTS.invalidate(streamName);
APPEND_CLIENTS.invalidate(cacheEntryKey);
}
}
streamAppendClient = null;
Expand Down Expand Up @@ -327,19 +342,20 @@ void flush(RetryManager<AppendRowsResponse, Context<AppendRowsResponse>> retryMa
inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds());
if (writeStream.getInflightWaitSeconds() > 5) {
LOG.warn(
"Storage Api write delay more than " + writeStream.getInflightWaitSeconds());
"Storage Api write delay more than {} seconds.",
writeStream.getInflightWaitSeconds());
}
return response;
} catch (Exception e) {
throw new RuntimeException(e);
}
},
contexts -> {
LOG.info(
"Append to stream "
+ streamName
+ " failed with error "
+ Iterables.getFirst(contexts, null).getError());
LOG.warn(
"Append to stream {} by client #{} failed with error, operations will be retried. Details: {}",
streamName,
clientNumber,
retrieveErrorDetails(contexts));
invalidateWriteStream();
appendFailures.inc();
return RetryType.RETRY_ALL_OPERATIONS;
Expand All @@ -350,35 +366,53 @@ void flush(RetryManager<AppendRowsResponse, Context<AppendRowsResponse>> retryMa
new Context<>());
maybeTickleCache();
}

String retrieveErrorDetails(Iterable<Context<AppendRowsResponse>> contexts) {
return StreamSupport.stream(contexts.spliterator(), false)
.map(ctx -> ctx.getError())
.map(
err ->
String.format(
"message: %s, stacktrace: %s",
err.toString(),
Lists.newArrayList(err.getStackTrace()).stream()
.map(se -> se.toString())
.collect(Collectors.joining("\n"))))
.collect(Collectors.joining(","));
}
}

private Map<DestinationT, DestinationState> destinations = Maps.newHashMap();
private final TwoLevelMessageConverterCache<DestinationT, ElementT> messageConverters;
private transient @Nullable DatasetService datasetService;
private int numPendingRecords = 0;
private int numPendingRecordBytes = 0;
private static final int FLUSH_THRESHOLD_RECORDS = 150000;
private final int flushThresholdBytes;
private final int flushThresholdCount;
private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
private final BigQueryServices bqServices;
private final boolean useDefaultStream;
private int streamAppendClientCount;

WriteRecordsDoFn(
String operationName,
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
BigQueryServices bqServices,
boolean useDefaultStream,
int flushThresholdBytes) {
int flushThresholdBytes,
int flushThresholdCount,
int streamAppendClientCount) {
this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
this.dynamicDestinations = dynamicDestinations;
this.bqServices = bqServices;
this.useDefaultStream = useDefaultStream;
this.flushThresholdBytes = flushThresholdBytes;
this.flushThresholdCount = flushThresholdCount;
this.streamAppendClientCount = streamAppendClientCount;
}

boolean shouldFlush() {
return numPendingRecords > FLUSH_THRESHOLD_RECORDS
|| numPendingRecordBytes > flushThresholdBytes;
return numPendingRecords > flushThresholdCount || numPendingRecordBytes > flushThresholdBytes;
}

void flushIfNecessary() throws Exception {
Expand Down Expand Up @@ -432,7 +466,11 @@ DestinationState createDestinationState(
throw new RuntimeException(e);
}
return new DestinationState(
tableDestination1.getTableUrn(), messageConverter, datasetService, useDefaultStream);
tableDestination1.getTableUrn(),
messageConverter,
datasetService,
useDefaultStream,
streamAppendClientCount);
}

@ProcessElement
Expand Down

0 comments on commit 15a211e

Please sign in to comment.