Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vortex performance improvement: Enable multiple stream clients per worker #17550

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ public interface BigQueryOptions
void setUseStorageWriteApiAtLeastOnce(Boolean value);

@Description(
"If set, then BigQueryIO.Write will default to using this number of Storage Write API streams.")
"If set, then BigQueryIO.Write will default to using this number of Storage Write API streams. "
+ "The number of streams indicated will be allocated at a per worker and destination basis, "
+ "a high number can cause a large pipeline to go over the BigQuery connection quota quickly. "
+ "With low-mid volume pipelines using the default configuration should be enough.")
@Default.Integer(0)
prodriguezdefino marked this conversation as resolved.
Show resolved Hide resolved
Integer getNumStorageWriteApiStreams();

Expand Down Expand Up @@ -129,4 +132,10 @@ public interface BigQueryOptions
Integer getStorageApiAppendThresholdBytes();

void setStorageApiAppendThresholdBytes(Integer value);

@Description("Maximum (best effort) record count of a single append to the storage API.")
@Default.Integer(150000)
Integer getStorageApiAppendThresholdRecordCount();

void setStorageApiAppendThresholdRecordCount(Integer value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,18 @@ public StorageApiWriteRecordsInconsistent(
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
BigQueryServices bqServices) {
this.dynamicDestinations = dynamicDestinations;
;
this.bqServices = bqServices;
}

@Override
public PCollection<Void> expand(PCollection<KV<DestinationT, StorageApiWritePayload>> input) {
String operationName = input.getName() + "/" + getName();
BigQueryOptions bigQueryOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
// default value from options is 0, so we set at least one client
Integer numStreams =
bigQueryOptions.getNumStorageWriteApiStreams() == 0
? 1
: bigQueryOptions.getNumStorageWriteApiStreams();
// Append records to the Storage API streams.
input.apply(
"Write Records",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing transform names can affect update compatibility - do you need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I don't, but this does not follow the same convention other apply labels use (no spaces on names). Should I revert?

Expand All @@ -57,7 +61,9 @@ public PCollection<Void> expand(PCollection<KV<DestinationT, StorageApiWritePayl
dynamicDestinations,
bqServices,
true,
bigQueryOptions.getStorageApiAppendThresholdBytes()))
bigQueryOptions.getStorageApiAppendThresholdBytes(),
bigQueryOptions.getStorageApiAppendThresholdRecordCount(),
numStreams))
.withSideInputs(dynamicDestinations.getSideInputs()));
return input.getPipeline().apply("voids", Create.empty(VoidCoder.of()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand Down Expand Up @@ -86,15 +89,18 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT>
// (any access of the cache could trigger element expiration). Therefore most used of
// APPEND_CLIENTS should
// synchronize.
private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
private static final Cache<String, List<StreamAppendClient>> APPEND_CLIENTS =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what did you find about the cost of synchronization?

CacheBuilder.newBuilder()
.expireAfterAccess(15, TimeUnit.MINUTES)
.removalListener(
(RemovalNotification<String, StreamAppendClient> removal) -> {
(RemovalNotification<String, List<StreamAppendClient>> removal) -> {
LOG.info("Expiring append client for " + removal.getKey());
@Nullable final StreamAppendClient streamAppendClient = removal.getValue();
// Close the writer in a different thread so as not to block the main one.
runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::close);
@Nullable final List<StreamAppendClient> streamAppendClients = removal.getValue();
streamAppendClients.forEach(
(StreamAppendClient client) -> {
// Close the writer in a different thread so as not to block the main one.
runAsyncIgnoreFailure(closeWriterExecutor, client::close);
});
})
.build();

Expand Down Expand Up @@ -129,6 +135,9 @@ public StorageApiWriteUnshardedRecords(
public PCollection<Void> expand(PCollection<KV<DestinationT, StorageApiWritePayload>> input) {
String operationName = input.getName() + "/" + getName();
BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
// default value from options is 0, so we set at least one client
Integer numStreams =
options.getNumStorageWriteApiStreams() == 0 ? 1 : options.getNumStorageWriteApiStreams();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

create a new option that defaults to 1

return input
.apply(
"Write Records",
Expand All @@ -138,7 +147,9 @@ public PCollection<Void> expand(PCollection<KV<DestinationT, StorageApiWritePayl
dynamicDestinations,
bqServices,
false,
options.getStorageApiAppendThresholdBytes()))
options.getStorageApiAppendThresholdBytes(),
options.getStorageApiAppendThresholdRecordCount(),
numStreams))
.withSideInputs(dynamicDestinations.getSideInputs()))
.setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
// Calling Reshuffle makes the output stable - once this completes, the append operations
Expand Down Expand Up @@ -171,18 +182,23 @@ class DestinationState {
private final boolean useDefaultStream;
private DescriptorWrapper descriptorWrapper;
private Instant nextCacheTickle;
private final int streamAppendClientCount;
private final int clientNumber;

public DestinationState(
String tableUrn,
MessageConverter<ElementT> messageConverter,
DatasetService datasetService,
boolean useDefaultStream) {
boolean useDefaultStream,
int streamAppendClientCount) {
this.tableUrn = tableUrn;
this.messageConverter = messageConverter;
this.pendingMessages = Lists.newArrayList();
this.datasetService = datasetService;
this.useDefaultStream = useDefaultStream;
this.descriptorWrapper = messageConverter.getSchemaDescriptor();
this.streamAppendClientCount = streamAppendClientCount;
this.clientNumber = new Random().nextInt(streamAppendClientCount);
}

void teardown() {
Expand Down Expand Up @@ -213,22 +229,38 @@ String createStreamIfNeeded() {
return this.streamName;
}

List<StreamAppendClient> generateClients() {
return IntStream.range(0, streamAppendClientCount)
.mapToObj(
i -> {
try {
StreamAppendClient client =
datasetService.getStreamAppendClient(
streamName, descriptorWrapper.descriptor);
return client;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
})
.collect(Collectors.toList());
}

StreamAppendClient getStreamAppendClient(boolean lookupCache) {
try {
if (streamAppendClient == null) {
createStreamIfNeeded();
synchronized (APPEND_CLIENTS) {
if (lookupCache) {
this.streamAppendClient =
APPEND_CLIENTS.get(
streamName,
() ->
datasetService.getStreamAppendClient(
streamName, descriptorWrapper.descriptor));
APPEND_CLIENTS.get(streamName, () -> generateClients()).get(clientNumber);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of generating all clients eagerly, let's do it lazily. Initialize a List with count copies of Optional.empty(). Then do
this.streamAppendCient = APPEND_CLIENTS.get(streamName, this.generateClients).get(clientNumber).get().orElseGet(this.getStreamAppendClient).

FYI you could also do this with null if you don't care to use Optional here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of making the client creation lazy, I reverted the cache structure back to have a single client per entry. But now, the cache key is the stream name + the assigned client number.

} else {
// TODO (rpablo): this dance may make connections
// go over quota for a short period of time, need to check

// override the clients in the cache
APPEND_CLIENTS.put(streamName, generateClients());
this.streamAppendClient =
datasetService.getStreamAppendClient(streamName, descriptorWrapper.descriptor);
APPEND_CLIENTS.put(streamName, this.streamAppendClient);
APPEND_CLIENTS.get(streamName, () -> generateClients()).get(clientNumber);
}
this.streamAppendClient.pin();
}
Expand Down Expand Up @@ -263,9 +295,9 @@ void invalidateWriteStream() {
// thread has already invalidated
// and recreated the stream).
@Nullable
StreamAppendClient cachedAppendClient = APPEND_CLIENTS.getIfPresent(streamName);
if (cachedAppendClient != null
&& System.identityHashCode(cachedAppendClient)
List<StreamAppendClient> cachedAppendClients = APPEND_CLIENTS.getIfPresent(streamName);
if (cachedAppendClients != null
&& System.identityHashCode(cachedAppendClients.get(clientNumber))
== System.identityHashCode(streamAppendClient)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't quite right - we're now invalidating all of the StreamWriters when any one of them fails. I think instead you want to just null out the one that failed and allow it to be recreated the next get.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The invalidation here corresponds to a schema mismatch, shouldn't all the clients be invalidated for a particular stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made the changes to only invalidate the writer in use by the bundle, not all of them.

APPEND_CLIENTS.invalidate(streamName);
}
Expand Down Expand Up @@ -357,28 +389,33 @@ void flush(RetryManager<AppendRowsResponse, Context<AppendRowsResponse>> retryMa
private transient @Nullable DatasetService datasetService;
private int numPendingRecords = 0;
private int numPendingRecordBytes = 0;
private static final int FLUSH_THRESHOLD_RECORDS = 150000;
private final int flushThresholdBytes;
private final int flushThresholdCount;
private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
private final BigQueryServices bqServices;
private final boolean useDefaultStream;
// default append client count to 1
private Integer streamAppendClientCount = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not private int?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


WriteRecordsDoFn(
String operationName,
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
BigQueryServices bqServices,
boolean useDefaultStream,
int flushThresholdBytes) {
int flushThresholdBytes,
int flushThresholdCount,
int streamAppendClientCount) {
this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
this.dynamicDestinations = dynamicDestinations;
this.bqServices = bqServices;
this.useDefaultStream = useDefaultStream;
this.flushThresholdBytes = flushThresholdBytes;
this.flushThresholdCount = flushThresholdCount;
this.streamAppendClientCount = streamAppendClientCount;
}

boolean shouldFlush() {
return numPendingRecords > FLUSH_THRESHOLD_RECORDS
|| numPendingRecordBytes > flushThresholdBytes;
return numPendingRecords > flushThresholdCount || numPendingRecordBytes > flushThresholdBytes;
}

void flushIfNecessary() throws Exception {
Expand Down Expand Up @@ -432,7 +469,11 @@ DestinationState createDestinationState(
throw new RuntimeException(e);
}
return new DestinationState(
tableDestination1.getTableUrn(), messageConverter, datasetService, useDefaultStream);
tableDestination1.getTableUrn(),
messageConverter,
datasetService,
useDefaultStream,
streamAppendClientCount);
}

@ProcessElement
Expand Down