-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add shutdown and start mechanics to windmill streams #32774
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
9591a9a
to
ce57880
Compare
assign set of reviewers |
Assigning reviewers. If you would like to opt out of this review, comment R: @damccorm added as fallback since no labels match configuration Available commands:
The PR bot will only process comments in the main thread (not review comments). |
@@ -216,12 +207,18 @@ static FanOutStreamingEngineWorkerHarness forTesting( | |||
return fanOutStreamingEngineWorkProvider; | |||
} | |||
|
|||
@SuppressWarnings("ReturnValueIgnored") | |||
@SuppressWarnings("FutureReturnValueIgnored") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assign the future to a variable named unusedFuture
and remove the suppression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...pache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Show resolved
Hide resolved
...er/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStream.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Outdated
Show resolved
Hide resolved
throw new RuntimeException(e); | ||
} finally { | ||
pending.remove(request.id()); | ||
} | ||
} | ||
|
||
// If we have exited the loop here, the stream has been shutdown. Cancel the response stream. | ||
request.getResponseStream().cancel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this? only the parseFn.parse
above was reading the responseStream and nothing should be blocked on the response stream at this point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not needed, just wanted to cancel the stream incase there was anything dangling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is not needed, lets remove it. To catch anything dangling, we can add a check and throw
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
...apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java
Outdated
Show resolved
Hide resolved
f05b41e
to
b66c086
Compare
b66c086
to
881399e
Compare
881399e
to
8618174
Compare
back to you thanks @scwhittle |
1b819bc
to
77969d0
Compare
d49d905
to
5a68e0f
Compare
private @Nullable StreamObserver<T> delegateStreamObserver; | ||
|
||
/** | ||
* Indicates that the request observer should no longer be used. Attempts to perform operations on |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move comment next to poison method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...n/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserver.java
Outdated
Show resolved
Hide resolved
/** Records stream metrics for debugging. */ | ||
@ThreadSafe | ||
final class StreamDebugMetrics { | ||
private final AtomicInteger restartCount = new AtomicInteger(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can these just be synchronized ints as well instead of atomic? WE're not doing anything expensive under lock so it doesn't seem like it needs separate consideration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
lastRestartTime = DateTime.now(); | ||
} | ||
|
||
synchronized long startTimeMs() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getStartTimeMs?
ditto for lastSendTimeMs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
.../java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
Show resolved
Hide resolved
/** | ||
* Used to guard {@link #start()} and {@link #shutdown()} behavior. | ||
* | ||
* @implNote Do not hold when performing IO. If also locking on {@code this} in the same context, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you acquire this before (this) lock, then this mutex could be blocked by I/O because we perform IO beneath this lock.
If this is supposed to be lightweight it seems like it should be acquired after (this) to avoid that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@GuardedBy("this") | ||
private boolean streamClosed; | ||
|
||
private volatile boolean isShutdown; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be guarded by shutdown lock? the accessor method can synchronize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private boolean streamClosed; | ||
|
||
private volatile boolean isShutdown; | ||
private volatile boolean started; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be guarded by one of hte locks instead of volatile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
685d386
to
3fb76ac
Compare
back to you @scwhittle thank you! |
throw new IllegalStateException("Send called on a client closed stream."); | ||
} | ||
|
||
requestObserver().onNext(request); | ||
try { | ||
verify(!Thread.holdsLock(shutdownLock), "shutdownLock should not be held during send."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would remove this since it seems likely expensive.
Instead you could verify with a test:
- setup requestObserver that blocks until notified
- one thread calls send and starts blocking
- main test thread calls shutdown() and verifies the method returns
- main test thread unblocks the requestObserver
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...n/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableStreamObserver.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...va/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
@@ -156,29 +163,44 @@ public void sendHealthCheck() { | |||
protected void onResponse(StreamingCommitResponse response) { | |||
commitWorkThrottleTimer.stop(); | |||
|
|||
RuntimeException finalException = null; | |||
CommitCompletionException failures = new CommitCompletionException(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: how about a builder for this since we won't want to add exceptions in other places.
then the final line of method can be
builder.throwIfNonEmpty();
and interrnally it builds exception and throws if needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking the builder would not be an exception itself, and then the exception would just be a simple class without mutating methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
.../java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
Outdated
Show resolved
Hide resolved
queuedBytes = 0; | ||
queue.clear(); | ||
try { | ||
if (!hasReceivedShutdownSignal()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove? it is handled in stream methods already and doing it here leaves a gap between shutdown check and flush anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
send(StreamingGetDataRequest.newBuilder().setHeader(jobHeader).build()); | ||
if (clientClosed.get()) { | ||
if (clientClosed && !hasReceivedShutdownSignal()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove? above send will fail if shutdown
though I think we can also guarantee at higher level that onNewStream isn't called if shutdown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java
Outdated
Show resolved
Hide resolved
return true; | ||
} | ||
private boolean maybeTeardownStream() { | ||
synchronized (AbstractWindmillStream.this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just make a synchronized method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed by moving it out of the inner class
GuardedBy check was failing since reference to this
was ambiguous
private boolean maybeTeardownStream() { | ||
synchronized (AbstractWindmillStream.this) { | ||
if (isShutdown || (clientClosed && !hasPendingRequests())) { | ||
streamRegistry.remove(AbstractWindmillStream.this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why doesn't just "this" work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed by moving it out of the inner class
GuardedBy check was failing since reference to this
was ambiguous
@@ -59,6 +59,9 @@ final class StreamDebugMetrics { | |||
@GuardedBy("this") | |||
private DateTime shutdownTime = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mark nullable and above
I think you still need to fix the over exemption of windmill classes from checker.
#30183
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// If the stream was stopped due to a resource exhausted error then we are throttled. | ||
if (status != null && status.getCode() == Status.Code.RESOURCE_EXHAUSTED) { | ||
if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. I would maybe keep this in the onError case, as the other stuff is more just logs/debug page and this is more functional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
while (true) { | ||
private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<ResponseT> parseFn) | ||
throws WindmillStreamShutdownException { | ||
while (!isShutdownLocked()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm? below needs to handle it anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...ain/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java
Show resolved
Hide resolved
} | ||
|
||
private void queueRequestAndWait(QueuedRequest request) throws InterruptedException { | ||
private synchronized void handleShutdown(QueuedRequest request, Throwable cause) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: name throwIfShutdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
request.addToStreamingGetDataRequest(builder); | ||
} | ||
return builder.build(); | ||
private synchronized void verify(boolean condition, String message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can this be removed? it seems you have shutdown check first in cases I see (and if not I think it would be clearer to have it as part of the check where it is than hidden in method that doesn't sound like it examines shutdown)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Verify.verify(condition || isShutdown, message); | ||
} | ||
|
||
private synchronized boolean isShutdownLocked() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm if you remove above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dpne
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some of previous comments were not resolved, please look through them too. Thanks!
requestObserver.poison(); | ||
isShutdown = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should remove this (and the suppress)
shouldn't the poison prevent the blocking beneath the mutex? and then the below lock will be acquired soon?
Setting it to true outside the mutex will break invariants that are easier to think about if it is strictly guarded by. (and it breaks logic below we'll never run shutdownInternal)
if we do need it for something it seems like we could have a separate volatile shutdownRequested boolean. But I'd prefer to figure out what gets stuck with the current code and fix it instead because it is confusing to have two.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleaneed up was supposed to stay within the sync block
...apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java
Outdated
Show resolved
Hide resolved
@@ -472,7 +473,8 @@ private void flushResponse() { | |||
responseObserver.onNext(responseBuilder.build()); | |||
} catch (Exception e) { | |||
// Stream is already closed. | |||
System.out.println("trieu: " + e); | |||
LOG.warn("trieu: ", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm debug logs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed this waas for debugging
.../java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java
Show resolved
Hide resolved
...apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java
Show resolved
Hide resolved
synchronized (lock) { | ||
isClosed = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ie if (!isClosed) check to all of these
still need to add some more tests to thanks! |
@@ -121,7 +121,7 @@ public void onNext(T t) throws StreamClosedException, WindmillStreamShutdownExce | |||
|
|||
try { | |||
delegate.onError(e); | |||
} catch (RuntimeException ignored) { | |||
} catch (IllegalStateException ignored) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add the ignored to the exception too? just in case we catch something unexpected and it helps debugging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -188,7 +188,9 @@ protected void onResponse(StreamingGetDataResponse chunk) { | |||
|
|||
for (int i = 0; i < chunk.getRequestIdCount(); ++i) { | |||
AppendableInputStream responseStream = pending.get(chunk.getRequestId(i)); | |||
verify(responseStream != null, "No pending response stream"); | |||
synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid synchronizing if not necessary and you need to handle the null case or you will just get an exception in following line.
if (responseStream == null) {
sychronized (this) { verify(isShutdown); }
continue;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
outboundObserver.onCompleted(); | ||
} | ||
} | ||
|
||
private void markClosedOrThrow() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just make method synchronized (though with suggestion below, probably easier to just duplicate in onCompleted/onError and use the same if block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
outboundObserver.onCompleted(); | ||
} | ||
} | ||
|
||
private void markClosedOrThrow() { | ||
synchronized (lock) { | ||
Preconditions.checkState(!isClosed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is going to throw if we have sequence
T1: onNext()
T2: terminate()
T1: onError()
It seems like that could happen if we're terminating from other threads than the one driving the observer generally. We could have a separate bool tracking if userClosed or not, and change this exception to be based upon that as that is using the class wrong. having a terminate before/during a onCompleted/onError doesn't necessarily seem like misuse and I think we should avoid throwing an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -104,6 +106,10 @@ public void setMessageCompression(boolean b) {} | |||
() -> | |||
assertThrows(WindmillStreamShutdownException.class, () -> testStream.testSend(1))); | |||
testStream.shutdown(); | |||
|
|||
// Sleep a bit to give sendExecutor time to execute the send(). | |||
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sleep for less, 100ms maybe? Tests take long enough already
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -142,7 +144,8 @@ public void testQueuedBatch_notifyFailed_throwsWindmillStreamShutdownExceptionOn | |||
assertThrows( | |||
WindmillStreamShutdownException.class, | |||
queuedBatch::waitForSendOrFailNotification)); | |||
|
|||
// Wait a few seconds for the above future to get scheduled and run. | |||
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto how about 100ms
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
}); | ||
|
||
// Sleep a bit to allow future to run. | ||
Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto less
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java
Show resolved
Hide resolved
...apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java
Show resolved
Hide resolved
...apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java
Show resolved
Hide resolved
64e61d9
to
42836f4
Compare
42836f4
to
f75df0f
Compare
back to you @scwhittle thanks! |
@@ -132,6 +132,10 @@ private static Optional<HostAndPort> tryParseDirectEndpointIntoIpV6Address( | |||
directEndpointAddress.getHostAddress(), (int) endpointProto.getPort())); | |||
} | |||
|
|||
public final boolean isEmpty() { | |||
return equals(none()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about changing none() to return some singleton instead of building every time if we might be calling empty a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed but made none a singleton
@@ -104,6 +104,10 @@ synchronized void poison() { | |||
} | |||
} | |||
|
|||
synchronized boolean hasReceivedPoisonPill() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: how about isPoisoned?
Poison pill was a special element we added to queues etc, it's a little confusing to use the term here to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
@@ -125,6 +129,8 @@ public void onNext(T t) throws StreamClosedException, WindmillStreamShutdownExce | |||
// If the delegate above was already terminated via onError or onComplete from another | |||
// thread. | |||
logger.warn("StreamObserver was previously cancelled.", e); | |||
} catch (RuntimeException ignored) { | |||
logger.warn("StreamObserver was unexpectedly cancelled.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking above you should log both e and the currently ignored exception
ditto here. Also this log message is a little confusing how about
"encountered error {} when cancelling due to error"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -89,6 +90,9 @@ public void onNext(T value) throws StreamObserverCancelledException { | |||
throw new StreamObserverCancelledException("StreamObserver was terminated."); | |||
} | |||
|
|||
// We close under "lock", so this should never happen. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs note about terminating phaser first, update below too.
// We close under "lock" after terminating, so if we didn't observer termination above
// we shouldn't be closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -162,24 +170,23 @@ public void onNext(T value) throws StreamObserverCancelledException { | |||
public void onError(Throwable t) { | |||
isReadyNotifier.forceTermination(); | |||
synchronized (lock) { | |||
markClosedOrThrow(); | |||
outboundObserver.onError(t); | |||
if (!isClosed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have
isUserClosed = onError or onCompleted called on this object
isClosed = onError or onCompleted called on outboundObserver
Could name isClosed and isOutboundClosed or something if clearer. Woudl be good to comment above too.
I think that onError/onCompleted should be like:
check(!isUserClosed);
isUserClosed = true;
if (!isClosed) {
outboundObserver.onError(t);
isClosed = true;
}
enforcing that only one of them is called. And terminate should be like
if (!isClosed) {
outboundObserver.onError(t);
isClosed = true;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
also caught the illegal state exception in AbstractWindmillStream#halfClose
() -> closeStreamSender(sender.endpoint(), sender), windmillStreamManager)) | ||
.collect(Collectors.toList()); | ||
|
||
return ImmutableList.<CompletableFuture<Void>>builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to return the list of futures or woudl it better to return a single future by using CompletableFuture.allOf(..)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
@Override | ||
public String backendWorkerToken() { | ||
return backendWorkerToken; | ||
} | ||
|
||
@SuppressWarnings("GuardedBy") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove suppression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
/** Returns true if the stream was torn down and should not be restarted internally. */ | ||
private synchronized boolean maybeTearDownStream() { | ||
if (requestObserver.hasReceivedPoisonPill() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this receivedPoisonPill check guarding against?
It is racy because we poison outside of the synchronized block so we could have
T1: notices unrelated stream failure, passes this check and isn't poisoned, starts calling onNewStream
T2: calls shutdown, poisons request observer
T1: calls requestObserver.reset() gets exception due to poison.
Instead of checking the poison here, it seems like we should just handle the exception due to reset failing as that covers both cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, realized we weren't calling break
on the WindmillStreamShutdownException
in startStream()
} | ||
commitWorkStream.shutdown(); | ||
|
||
Set<Windmill.CommitStatus> commitStatuses = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping on unresolved comment here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Start and closing Windmill streams are currently via
halfClose()
and on stream creation. Implementations were previously created and returned in a "started" state usually after the stream has already sent the initial headers to open the connection to the backend servers.Starting in the current state prevents us from being able to start the stream "lazily". And closing allows other blocking stream operations to prevent streams from being able to be closed (stalling at times up to 10-20 minutes).
This is especially important in direct path mode where the user worker manages the fan out to the backend.
in terms of implementation, similar to WindmillStream.shutdown(), WindmillStream.start()'s behavior will only execute once during the lifetime of the WindmillStream object. Subsequent calls to start() and shutdown() will do nothing.
R: @arunpandianp @scwhittle
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.