-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update windmill proto definition #30046
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
assign set of reviewers |
run java precommit |
Assigning reviewers. If you would like to opt out of this review, comment R: @lostluck added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
Outdated
Show resolved
Hide resolved
|
||
return directEndpointIpV6Address.isPresent() | ||
? directEndpointIpV6Address | ||
: tryParseEndpointIntoHostAndPort(directEndpoint).map(WindmillServiceAddress::create); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, the WindmillServiceAddress::create is common,
so maybe cleaner to do
return
tryParseDirectEndpointIntoIpV6Address(directEndpoint).orElseGet(tryParseEndpointIntoHostAndPort(directEndpoint)).map(WindmillServiceAddress::create);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am wondering if we should jsut drop the ipv6
since HostAndPort supports ipv6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its possible that the tryParseEndpointIntoHostAndPort fails so it returns Optional and orElseGet requires a signature that returns HostAndPort
public static Endpoint.Builder builder() { | ||
return new AutoValue_WindmillEndpoints_Endpoint.Builder(); | ||
} | ||
|
||
public static Endpoint from(Windmill.WorkerMetadataResponse.Endpoint endpointProto) { | ||
public static Endpoint from( | ||
Windmill.WorkerMetadataResponse.Endpoint endpointProto, String authenticatingService) { | ||
Endpoint.Builder endpointBuilder = Endpoint.builder(); | ||
if (endpointProto.hasDirectEndpoint() && !endpointProto.getDirectEndpoint().isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the has check, can rely on default value being empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java
Outdated
Show resolved
Hide resolved
@@ -76,6 +76,10 @@ public abstract GetWorkStream getWorkStream( | |||
/** Returns the amount of time the server has been throttled and resets the time to 0. */ | |||
public abstract long getAndResetThrottleTime(); | |||
|
|||
public long clientId() { | |||
return 0L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
client id is supposed to change on process restart. It is otherwise not a value that has to be coordinated in advance with the service so it can just be a random value as it was before. I think that it can stay in StreamingDataflowWorker because it also doesn't need to be different for each endpoint.
This helps the backend speed up retries, ensure caching is correct etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah ok out of curiosity wouldn't the other components restart also?
looks like we create a new worker in StreamingDataflowWorker.main
And inject the WindmillServerStub (GrpcWindmillServer) via the options.
@@ -45,92 +48,156 @@ class GrpcDispatcherClient { | |||
private final WindmillStubFactory windmillStubFactory; | |||
|
|||
@GuardedBy("this") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like this wouldn't need guarding if it is atomic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
.../java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
Outdated
Show resolved
Hide resolved
R: @scwhittle (Just to get the bot off the hook. The "R" needs to be R and not r.) |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control |
ack thanks! |
68409ad
to
4de69da
Compare
} | ||
|
||
private synchronized <T> T randomlySelectNextStub(List<T> stubs) { | ||
return stubs.get(rand.nextInt(dispatcherStubs.get().size())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mixing stubs and dispatcherStubs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to use stubs.size()
.../java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
Show resolved
Hide resolved
.../java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
Outdated
Show resolved
Hide resolved
!options.isEnableStreamingEngine() && options.getLocalWindmillHostport() != null | ||
? GrpcWindmillServer.LOCALHOST_BACKOFF | ||
: GrpcWindmillServer.MAX_BACKOFF; | ||
GrpcWindmillStreamFactory windmillStreamFactory = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems odd to create this here if the server isn't grpc server.
Can we just pass in somethign generic like JobHeader to windmillServer.start?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java
Show resolved
Hide resolved
this.windmillServer = options.getWindmillServerStub(); | ||
this.windmillServer.setProcessHeartbeatResponses(this::handleHeartbeatResponses); | ||
windmillServer.start( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we avoid the start method (which is more complex than a factory method taking necessary requirements and initalizing) by moving the grpc server initialization into this file?
It's kind of odd that it is within getWindmillServerStub default factory. It appears that is done so that the test can inject a fake server. One idea would be to remove the default factory from options and create a server here if the option is null but use injected server otherwise.
Then you could just create the GrpcWindmillServer directly with either a factory or constructor taking the JobHeader without modifying the WindmillServerStub interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
opted to allow StreamingDataflowWorker to inject a stream factory
} | ||
|
||
synchronized boolean isReady() { | ||
return !dispatcherStubs.isEmpty(); | ||
boolean isReady() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems like it could lead to racy usage
if (client.isReady()) {
client.xxx
}
but since the dispatcherStubs can change in-between, there may not be a stub by the time the next call is made.
Can this be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use this to make sure that the endpoints are set in StreamingDataflowWorker before we fetch the pipeline config as well as schedule periodic config fetches
Line 1615 in f08058c
if (!windmillServer.isReady()) { |
if we remove this we would need another mechanism for that.
we could use some kind of publish/subscribe mechanism to signal when the endpoint is ready (kind of like a count down latch) and then send another signal during an update (i.e was ready with endpoints, but is no longer ready).
How often does this value change? I would think that the service endpoint itself would not change during pipeline execution. @scwhittle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess we can do it async
how often does this value change? @scwhittle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We never really expect it to change. It was added in case we needed to migrate dispatcher endpoint since there are possibly stale long-running client processes.
How about ignoring updates that are empty? Then we only go from not read -> ready and not backwards? That prevents the possible race, shouldn't cause any problems and is simpler than another type of notification.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the current behavior which won't allow empty updates
synchronized void consumeWindmillDispatcherEndpoints(
ImmutableSet<HostAndPort> dispatcherEndpoints) {
ImmutableSet<HostAndPort> currentDispatcherEndpoints =
dispatcherStubs.get().dispatcherEndpoints();
Preconditions.checkArgument(
dispatcherEndpoints != null && !dispatcherEndpoints.isEmpty(),
"Cannot set dispatcher endpoints to nothing.");
if (this.dispatcherEndpoints.equals(dispatcherEndpoints)) {
if (currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
// The endpoints are equal don't recreate the stubs.
return;
}....
We currently will throw an exception if we try to set the endpoints to nothing, should we just log a warning and ignore the update here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that seems fine, and we can just comment here that once ready it remains ready
could rename isReady to endpointsInitialized() or something that clarifies what it means more and is clearer it won't go back to false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack done
...in/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
Show resolved
Hide resolved
@@ -101,36 +103,21 @@ public final class GrpcWindmillServer extends WindmillServerStub { | |||
// newer ComputationHeartbeatRequests. | |||
private final boolean sendKeyedGetDataRequests; | |||
private Consumer<List<ComputationHeartbeatResponse>> processHeartbeatResponses; | |||
private @Nullable GrpcWindmillStreamFactory windmillStreamFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we can change constructor to require the streamfactory this nullable and preconditons below can be removed, see other comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private final String testName; | ||
private final Function<String, ManagedChannel> channelFactory; | ||
|
||
InProcessWindmillStubFactory(String testName, Function<String, ManagedChannel> channelFactory) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: coudl just take a Supplier
If tests have a Function<String, ManagedChannel> they can always bind the test name themselves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel; | ||
|
||
/** | ||
* Creates in process stubs to talk to Streaming Engine. Only recommended to be used for testing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be moved to test folder and renamed FakeWindmillStubFactory? it ignores the serviceAddress
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; | ||
|
||
@Internal | ||
public final class WindmillStubFactories { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we get rid of this one? If the fake is moved to test folder this just has single method. I think callers could use RemoteWindmillStubFactory constructor directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
d04cdf9
to
23d6047
Compare
theres some tests failing for windows that are unrelated (passing in other configurations) |
|
||
public static StreamingDataflowWorker fromOptions(StreamingDataflowWorkerOptions options) { | ||
StreamingDataflowWorkerOptions streamingOptions = | ||
options.as(StreamingDataflowWorkerOptions.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no longer needed, already right option type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
ShardedKey shardedKey = entry.getKey(); | ||
Optional.ofNullable(failedWork.get(shardedKey.shardingKey())) | ||
.ifPresent( | ||
failedWorkTokens -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I find this harder to read than just having a variable storing result of failedWork.get and using an if statement, as it's harder to eyeball and takes a little more effort to realize that this is processing on the result of failedWork.get
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
private final Function<String, Optional<ComputationState>> computationStateFetcher; | ||
|
||
public WorkHeartbeatProcessor( | ||
Function<String, Optional<ComputationState>> computationStateFetcher) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on what string is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
synchronized boolean isReady() { | ||
return !dispatcherStubs.isEmpty(); | ||
boolean isReady() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping
private static final Logger LOG = LoggerFactory.getLogger(GrpcWindmillServerTest.class); | ||
private static final int STREAM_CHUNK_SIZE = 2 << 20; | ||
private final long clientId = new Random().nextLong(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just hard-code something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
b9e9949
to
a60ec02
Compare
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merge with #29082 and remove this class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
synchronized boolean isReady() { | ||
return !dispatcherStubs.isEmpty(); | ||
boolean isReady() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We never really expect it to change. It was added in case we needed to migrate dispatcher endpoint since there are possibly stale long-running client processes.
How about ignoring updates that are empty? Then we only go from not read -> ready and not backwards? That prevents the possible race, shouldn't cause any problems and is simpler than another type of notification.
|
||
private ManagedChannel createChannel(WindmillServiceAddress serviceAddress) { | ||
ManagedChannel channel = remoteChannel(serviceAddress, rpcChannelTimeoutSec); | ||
return useIsolatedChannels ? IsolationChannel.create(() -> channel) : channel; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is using the same ManagedChannel for all the separate channels the IsolationChannel is trying to create.
Instead you can share the code this way. Maybe add a comment since it wasn't clear before
Producer channelFactory = () -> remoteChannel(service_address, rpcChannelTimeoutSec);
return useIsolatedChannels ? IsolationChannel.create(channelFactory) : channelFactory();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
computationHeartbeatResponse.getHeartbeatResponsesList()) { | ||
if (heartbeatResponse.getFailed()) { | ||
failedWork | ||
.computeIfAbsent(heartbeatResponse.getShardingKey(), key -> new ArrayList<>()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update based upon changes from other PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
a60ec02
to
2b59492
Compare
failures are not related to changes ready for another look! @scwhittle looks like we already do not allow empty updates to the dispatcher endpoints |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a couple minor comments, otherwise looks good
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimap; | ||
|
||
@Internal | ||
public final class WorkHeartbeatProcessor implements Consumer<List<ComputationHeartbeatResponse>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WorkHeartbeatResponseProcessor?
might help prevent confusion that it is processing for outgoing heartbeats
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done and added comment
} | ||
|
||
synchronized boolean isReady() { | ||
return !dispatcherStubs.isEmpty(); | ||
boolean isReady() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that seems fine, and we can just comment here that once ready it remains ready
could rename isReady to endpointsInitialized() or something that clarifies what it means more and is clearer it won't go back to false.
Internal and external protos were not matching leading to gRPC errors.
make both protos match
add client id to the job headers for communication with Windmill Service
let the client id be owned by GrpcWindmillServer (WindmillServerStub) for now since its injected.
Will need to find a way to inject the client id (maybe from StreamingDataflowWorkerOptions) as well as other components. I wonder if that will be cleaner
r: @scwhittle
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.