-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Vortex performance improvement: Enable multiple stream clients per worker #17550
Vortex performance improvement: Enable multiple stream clients per worker #17550
Conversation
…es instead of just one.
Can one of the admins verify this patch? |
1 similar comment
Can one of the admins verify this patch? |
R: @reuvenlax |
Run Java PreCommit |
...google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.java
Show resolved
Hide resolved
@@ -86,15 +89,18 @@ | |||
// (any access of the cache could trigger element expiration). Therefore most used of | |||
// APPEND_CLIENTS should | |||
// synchronize. | |||
private static final Cache<String, StreamAppendClient> APPEND_CLIENTS = | |||
private static final Cache<String, List<StreamAppendClient>> APPEND_CLIENTS = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what did you find about the cost of synchronization?
@@ -129,6 +135,9 @@ public StorageApiWriteUnshardedRecords( | |||
public PCollection<Void> expand(PCollection<KV<DestinationT, StorageApiWritePayload>> input) { | |||
String operationName = input.getName() + "/" + getName(); | |||
BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); | |||
// default value from options is 0, so we set at least one client | |||
Integer numStreams = | |||
options.getNumStorageWriteApiStreams() == 0 ? 1 : options.getNumStorageWriteApiStreams(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
create a new option that defaults to 1
() -> | ||
datasetService.getStreamAppendClient( | ||
streamName, descriptorWrapper.descriptor)); | ||
APPEND_CLIENTS.get(streamName, () -> generateClients()).get(clientNumber); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of generating all clients eagerly, let's do it lazily. Initialize a List with count copies of Optional.empty(). Then do
this.streamAppendCient = APPEND_CLIENTS.get(streamName, this.generateClients).get(clientNumber).get().orElseGet(this.getStreamAppendClient).
FYI you could also do this with null if you don't care to use Optional here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of making the client creation lazy, I reverted the cache structure back to have a single client per entry. But now, the cache key is the stream name + the assigned client number.
&& System.identityHashCode(cachedAppendClient) | ||
List<StreamAppendClient> cachedAppendClients = APPEND_CLIENTS.getIfPresent(streamName); | ||
if (cachedAppendClients != null | ||
&& System.identityHashCode(cachedAppendClients.get(clientNumber)) | ||
== System.identityHashCode(streamAppendClient)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't quite right - we're now invalidating all of the StreamWriters when any one of them fails. I think instead you want to just null out the one that failed and allow it to be recreated the next get.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The invalidation here corresponds to a schema mismatch, shouldn't all the clients be invalidated for a particular stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made the changes to only invalidate the writer in use by the bundle, not all of them.
private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations; | ||
private final BigQueryServices bqServices; | ||
private final boolean useDefaultStream; | ||
// default append client count to 1 | ||
private Integer streamAppendClientCount = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not private int?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
"The number of stream append clients indicated will be allocated at a per worker and destination " | ||
+ "basis. A large value can cause a large pipeline to go over the BigQuery connection quota quickly. " | ||
+ "With low-mid volume pipelines using the default configuration should be enough.") | ||
@Default.Integer(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit confusing - need to clarify that this only applies for at-least once writes using the default stream
@@ -50,14 +49,16 @@ public PCollection<Void> expand(PCollection<KV<DestinationT, StorageApiWritePayl | |||
BigQueryOptions bigQueryOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); | |||
// Append records to the Storage API streams. | |||
input.apply( | |||
"Write Records", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing transform names can affect update compatibility - do you need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No I don't, but this does not follow the same convention other apply labels use (no spaces on names). Should I revert?
@@ -197,6 +204,10 @@ String getDefaultStreamName() { | |||
return BigQueryHelpers.stripPartitionDecorator(tableUrn) + "/streams/_default"; | |||
} | |||
|
|||
String getStreamAppendClientCacheEntryName() { | |||
return getDefaultStreamName() + "-client" + clientNumber; | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bit weird, since this code doesn't always use the default stream. Now the cache is probably not needed in the non default stream case (since we'll create a new stream for every bundle), however if we change that we need to rename the cache and also make sure to close the client (since right now we rely on the cache removal listener to close the client)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will make the changes to consider default streams and per bundle on-demand streams.
() -> | ||
datasetService.getStreamAppendClient( | ||
streamName, descriptorWrapper.descriptor)); | ||
getStreamAppendClientCacheEntryName(), () -> createStreamAppendClient()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getStreamAppendClientCacheEntryName doesn't necessarily return the stream name we used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will also be broken if you have two sinks in the pipeline, one using the default stream and one not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see the problem of mixing the default with dynamically gen streams, I will change that.
…eanup/init to teardown/setup when using default stream
@@ -409,9 +447,18 @@ private void initializeDatasetService(PipelineOptions pipelineOptions) { | |||
} | |||
} | |||
|
|||
@Setup | |||
public void setup() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what this is adding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've seen evidence that closing one stream append client for the _default
stream causes a cascade close of the other ones, so moving the state for destinations to be reused between bundle executions at least decreased the occurrences of those cascading closes.
I can revert this if this is not the right idea to try.
@StartBundle | ||
public void startBundle() throws IOException { | ||
destinations = Maps.newHashMap(); | ||
if (!useDefaultStream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this if?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only want to reuse the destination state for the default stream clients.
@@ -455,21 +506,26 @@ public void process( | |||
@FinishBundle | |||
public void finishBundle(FinishBundleContext context) throws Exception { | |||
flushAll(); | |||
for (DestinationState state : destinations.values()) { | |||
if (!useDefaultStream) { | |||
if (!useDefaultStream) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change? Won't this prevent us from unpinning the client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will, but those clients can be reused across bundle executions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the clients can be reused regardless - unpinning won't close the client unless close is also called (i.e. if we hit the cache idle timeout)
if (destinations != null) { | ||
for (DestinationState state : destinations.values()) { | ||
state.teardown(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be doing this in teardown
@Default.Integer(0) | ||
Integer getNumStorageWriteApiStreams(); | ||
|
||
void setNumStorageWriteApiStreams(Integer value); | ||
|
||
@Description( | ||
"When using the \"_default\" table stream, this option sets the number of stream append clients that will be allocated " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reference at-least once writes. Users don't know about default streams, as that's an implementation detail
Run Java PreCommit |
A couple of very minor nits, but otherwise LGTM |
@@ -213,6 +226,14 @@ String createStreamIfNeeded() { | |||
return this.streamName; | |||
} | |||
|
|||
StreamAppendClient generateClient() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit - remove try/catch since the calling function already catches the exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -263,11 +281,12 @@ void invalidateWriteStream() { | |||
// thread has already invalidated | |||
// and recreated the stream). | |||
@Nullable | |||
StreamAppendClient cachedAppendClient = APPEND_CLIENTS.getIfPresent(streamName); | |||
StreamAppendClient cachedAppendClient = | |||
APPEND_CLIENTS.getIfPresent(getStreamAppendClientCacheEntryName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - store cachedEntryName in a local variable instead of recomputing twice here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Run Java PreCommit |
2 similar comments
Run Java PreCommit |
Run Java PreCommit |
…rker (#17550) (#17718) Co-authored-by: pablo rodriguez defino <[email protected]>
Changing the StorageWrite stream append client cache to use a list of entries instead of just one entry per stream.
In the majority of the cases the default configuration would be sufficient to address the ingestion volume, but there are cases were the ingestion will need a higher level of parallelism. Setting
--numStorageWriteApiStreams=X
to a value > 1 will help to increase the ingestion parallelism level, users should be mindful of changing this number to a high one since the number of open connections will rapidly scale with the number of workers potentially reaching BigQuery quota/limits (see: https://cloud.google.com/bigquery/quotas#write-api-limits).Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.