Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vortex performance improvement: Enable multiple stream clients per worker #17550

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,22 @@ public interface BigQueryOptions
void setUseStorageWriteApiAtLeastOnce(Boolean value);

@Description(
"If set, then BigQueryIO.Write will default to using this number of Storage Write API streams.")
"If set, then BigQueryIO.Write will default to using this number of Storage Write API streams. ")
@Default.Integer(0)
prodriguezdefino marked this conversation as resolved.
Show resolved Hide resolved
Integer getNumStorageWriteApiStreams();

void setNumStorageWriteApiStreams(Integer value);

@Description(
"When using the \"_default\" table stream, this option sets the number of stream append clients that will be allocated "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reference at-least once writes. Users don't know about default streams, as that's an implementation detail

+ "at a per worker and destination basis. A large value can cause a large pipeline to go over the BigQuery "
+ "connection quota quickly on a job with large number of workers. On the case of low-mid volume pipelines "
+ "using the default configuration should be enough.")
@Default.Integer(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit confusing - need to clarify that this only applies for at-least once writes using the default stream

Integer getNumStorageWriteApiStreamAppendClients();

void setNumStorageWriteApiStreamAppendClients(Integer value);

@Description(
"If set, then BigQueryIO.Write will default to triggering the Storage Write API writes this often.")
Integer getStorageWriteApiTriggeringFrequencySec();
Expand Down Expand Up @@ -129,4 +139,10 @@ public interface BigQueryOptions
Integer getStorageApiAppendThresholdBytes();

void setStorageApiAppendThresholdBytes(Integer value);

@Description("Maximum (best effort) record count of a single append to the storage API.")
@Default.Integer(150000)
Integer getStorageApiAppendThresholdRecordCount();

void setStorageApiAppendThresholdRecordCount(Integer value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public StorageApiWriteRecordsInconsistent(
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
BigQueryServices bqServices) {
this.dynamicDestinations = dynamicDestinations;
;
this.bqServices = bqServices;
}

Expand All @@ -57,7 +56,9 @@ public PCollection<Void> expand(PCollection<KV<DestinationT, StorageApiWritePayl
dynamicDestinations,
bqServices,
true,
bigQueryOptions.getStorageApiAppendThresholdBytes()))
bigQueryOptions.getStorageApiAppendThresholdBytes(),
bigQueryOptions.getStorageApiAppendThresholdRecordCount(),
bigQueryOptions.getNumStorageWriteApiStreamAppendClients()))
.withSideInputs(dynamicDestinations.getSideInputs()));
return input.getPipeline().apply("voids", Create.empty(VoidCoder.of()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand All @@ -57,7 +60,6 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalNotification;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.joda.time.Duration;
Expand All @@ -79,13 +81,12 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
private final BigQueryServices bqServices;
private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();

// The Guava cache object is threadsafe. However our protocol requires that client pin the
// StreamAppendClient
// after looking up the cache, and we must ensure that the cache is not accessed in between the
// lookup and the pin
// (any access of the cache could trigger element expiration). Therefore most used of
// APPEND_CLIENTS should
// synchronize.
/**
* The Guava cache object is thread-safe. However our protocol requires that client pin the
* StreamAppendClient after looking up the cache, and we must ensure that the cache is not
* accessed in between the lookup and the pin (any access of the cache could trigger element
* expiration). Therefore most used of APPEND_CLIENTS should synchronize.
*/
private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
CacheBuilder.newBuilder()
.expireAfterAccess(15, TimeUnit.MINUTES)
Expand Down Expand Up @@ -138,7 +139,9 @@ public PCollection<Void> expand(PCollection<KV<DestinationT, StorageApiWritePayl
dynamicDestinations,
bqServices,
false,
options.getStorageApiAppendThresholdBytes()))
options.getStorageApiAppendThresholdBytes(),
options.getStorageApiAppendThresholdRecordCount(),
options.getNumStorageWriteApiStreamAppendClients()))
.withSideInputs(dynamicDestinations.getSideInputs()))
.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
// Calling Reshuffle makes the output stable - once this completes, the append operations
Expand Down Expand Up @@ -171,18 +174,21 @@ class DestinationState {
private final boolean useDefaultStream;
private DescriptorWrapper descriptorWrapper;
private Instant nextCacheTickle;
private final int clientNumber;

public DestinationState(
String tableUrn,
MessageConverter<ElementT> messageConverter,
DatasetService datasetService,
boolean useDefaultStream) {
boolean useDefaultStream,
int streamAppendClientCount) {
this.tableUrn = tableUrn;
this.messageConverter = messageConverter;
this.pendingMessages = Lists.newArrayList();
this.datasetService = datasetService;
this.useDefaultStream = useDefaultStream;
this.descriptorWrapper = messageConverter.getSchemaDescriptor();
this.clientNumber = new Random().nextInt(streamAppendClientCount);
}

void teardown() {
Expand All @@ -197,6 +203,13 @@ String getDefaultStreamName() {
return BigQueryHelpers.stripPartitionDecorator(tableUrn) + "/streams/_default";
}

String getStreamAppendClientCacheEntryName() {
if (useDefaultStream) {
return getDefaultStreamName() + "-client" + clientNumber;
}
return this.streamName;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit weird, since this code doesn't always use the default stream. Now the cache is probably not needed in the non default stream case (since we'll create a new stream for every bundle), however if we change that we need to rename the cache and also make sure to close the client (since right now we rely on the cache removal listener to close the client)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will make the changes to consider default streams and per bundle on-demand streams.


String createStreamIfNeeded() {
try {
if (!useDefaultStream) {
Expand All @@ -213,6 +226,14 @@ String createStreamIfNeeded() {
return this.streamName;
}

StreamAppendClient generateClient() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit - remove try/catch since the calling function already catches the exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

try {
return datasetService.getStreamAppendClient(streamName, descriptorWrapper.descriptor);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

StreamAppendClient getStreamAppendClient(boolean lookupCache) {
try {
if (streamAppendClient == null) {
Expand All @@ -221,14 +242,11 @@ StreamAppendClient getStreamAppendClient(boolean lookupCache) {
if (lookupCache) {
this.streamAppendClient =
APPEND_CLIENTS.get(
streamName,
() ->
datasetService.getStreamAppendClient(
streamName, descriptorWrapper.descriptor));
getStreamAppendClientCacheEntryName(), () -> generateClient());
} else {
this.streamAppendClient =
datasetService.getStreamAppendClient(streamName, descriptorWrapper.descriptor);
APPEND_CLIENTS.put(streamName, this.streamAppendClient);
this.streamAppendClient = generateClient();
// override the clients in the cache
APPEND_CLIENTS.put(getStreamAppendClientCacheEntryName(), streamAppendClient);
}
this.streamAppendClient.pin();
}
Expand All @@ -244,7 +262,7 @@ StreamAppendClient getStreamAppendClient(boolean lookupCache) {
void maybeTickleCache() {
if (streamAppendClient != null && Instant.now().isAfter(nextCacheTickle)) {
synchronized (APPEND_CLIENTS) {
APPEND_CLIENTS.getIfPresent(streamName);
APPEND_CLIENTS.getIfPresent(getStreamAppendClientCacheEntryName());
}
nextCacheTickle = Instant.now().plus(java.time.Duration.ofMinutes(1));
}
Expand All @@ -263,11 +281,12 @@ void invalidateWriteStream() {
// thread has already invalidated
// and recreated the stream).
@Nullable
StreamAppendClient cachedAppendClient = APPEND_CLIENTS.getIfPresent(streamName);
StreamAppendClient cachedAppendClient =
APPEND_CLIENTS.getIfPresent(getStreamAppendClientCacheEntryName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - store cachedEntryName in a local variable instead of recomputing twice here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (cachedAppendClient != null
&& System.identityHashCode(cachedAppendClient)
== System.identityHashCode(streamAppendClient)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't quite right - we're now invalidating all of the StreamWriters when any one of them fails. I think instead you want to just null out the one that failed and allow it to be recreated the next get.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The invalidation here corresponds to a schema mismatch, shouldn't all the clients be invalidated for a particular stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made the changes to only invalidate the writer in use by the bundle, not all of them.

APPEND_CLIENTS.invalidate(streamName);
APPEND_CLIENTS.invalidate(getStreamAppendClientCacheEntryName());
}
}
streamAppendClient = null;
Expand Down Expand Up @@ -327,19 +346,20 @@ void flush(RetryManager<AppendRowsResponse, Context<AppendRowsResponse>> retryMa
inflightWaitSecondsDistribution.update(writeStream.getInflightWaitSeconds());
if (writeStream.getInflightWaitSeconds() > 5) {
LOG.warn(
"Storage Api write delay more than " + writeStream.getInflightWaitSeconds());
"Storage Api write delay more than {} seconds.",
writeStream.getInflightWaitSeconds());
}
return response;
} catch (Exception e) {
throw new RuntimeException(e);
}
},
contexts -> {
LOG.info(
"Append to stream "
+ streamName
+ " failed with error "
+ Iterables.getFirst(contexts, null).getError());
LOG.warn(
"Append to stream {} by client #{} failed with error, operations will be retried. Details: {}",
streamName,
clientNumber,
retrieveErrorDetails(contexts));
invalidateWriteStream();
appendFailures.inc();
return RetryType.RETRY_ALL_OPERATIONS;
Expand All @@ -350,35 +370,53 @@ void flush(RetryManager<AppendRowsResponse, Context<AppendRowsResponse>> retryMa
new Context<>());
maybeTickleCache();
}

String retrieveErrorDetails(Iterable<Context<AppendRowsResponse>> contexts) {
return StreamSupport.stream(contexts.spliterator(), false)
.map(ctx -> ctx.getError())
.map(
err ->
String.format(
"message: %s, stacktrace: %s",
err.toString(),
Lists.newArrayList(err.getStackTrace()).stream()
.map(se -> se.toString())
.collect(Collectors.joining("\n"))))
.collect(Collectors.joining(","));
}
}

private Map<DestinationT, DestinationState> destinations = Maps.newHashMap();
private final TwoLevelMessageConverterCache<DestinationT, ElementT> messageConverters;
private transient @Nullable DatasetService datasetService;
private int numPendingRecords = 0;
private int numPendingRecordBytes = 0;
private static final int FLUSH_THRESHOLD_RECORDS = 150000;
private final int flushThresholdBytes;
private final int flushThresholdCount;
private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
private final BigQueryServices bqServices;
private final boolean useDefaultStream;
private int streamAppendClientCount;

WriteRecordsDoFn(
String operationName,
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
BigQueryServices bqServices,
boolean useDefaultStream,
int flushThresholdBytes) {
int flushThresholdBytes,
int flushThresholdCount,
int streamAppendClientCount) {
this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
this.dynamicDestinations = dynamicDestinations;
this.bqServices = bqServices;
this.useDefaultStream = useDefaultStream;
this.flushThresholdBytes = flushThresholdBytes;
this.flushThresholdCount = flushThresholdCount;
this.streamAppendClientCount = streamAppendClientCount;
}

boolean shouldFlush() {
return numPendingRecords > FLUSH_THRESHOLD_RECORDS
|| numPendingRecordBytes > flushThresholdBytes;
return numPendingRecords > flushThresholdCount || numPendingRecordBytes > flushThresholdBytes;
}

void flushIfNecessary() throws Exception {
Expand Down Expand Up @@ -409,9 +447,18 @@ private void initializeDatasetService(PipelineOptions pipelineOptions) {
}
}

@Setup
public void setup() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this is adding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen evidence that closing one stream append client for the _default stream causes a cascade close of the other ones, so moving the state for destinations to be reused between bundle executions at least decreased the occurrences of those cascading closes.

I can revert this if this is not the right idea to try.

if (useDefaultStream) {
destinations = Maps.newHashMap();
}
}

@StartBundle
public void startBundle() throws IOException {
destinations = Maps.newHashMap();
if (!useDefaultStream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this if?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only want to reuse the destination state for the default stream clients.

destinations = Maps.newHashMap();
}
numPendingRecords = 0;
numPendingRecordBytes = 0;
}
Expand All @@ -432,7 +479,11 @@ DestinationState createDestinationState(
throw new RuntimeException(e);
}
return new DestinationState(
tableDestination1.getTableUrn(), messageConverter, datasetService, useDefaultStream);
tableDestination1.getTableUrn(),
messageConverter,
datasetService,
useDefaultStream,
streamAppendClientCount);
}

@ProcessElement
Expand All @@ -455,21 +506,26 @@ public void process(
@FinishBundle
public void finishBundle(FinishBundleContext context) throws Exception {
flushAll();
for (DestinationState state : destinations.values()) {
if (!useDefaultStream) {
if (!useDefaultStream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? Won't this prevent us from unpinning the client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will, but those clients can be reused across bundle executions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the clients can be reused regardless - unpinning won't close the client unless close is also called (i.e. if we hit the cache idle timeout)

for (DestinationState state : destinations.values()) {
context.output(
KV.of(state.tableUrn, state.streamName),
BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)),
GlobalWindow.INSTANCE);
state.teardown();
}
state.teardown();
destinations.clear();
destinations = null;
}
destinations.clear();
destinations = null;
}

@Teardown
public void teardown() {
if (destinations != null) {
for (DestinationState state : destinations.values()) {
state.teardown();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be doing this in teardown

}
destinations = null;
try {
if (datasetService != null) {
Expand Down