Skip to content

Commit

Permalink
Merge pull request #24853: Modify windmill DirectStreamObserver to ca…
Browse files Browse the repository at this point in the history
…ll isReady only every 10 messages by default
  • Loading branch information
reuvenlax authored Jan 7, 2023
2 parents f9d5de3 + de0cf60 commit c162f4c
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,15 @@ public interface StreamingDataflowWorkerOptions extends DataflowWorkerHarnessOpt

void setWindmillServiceStreamingRpcHealthCheckPeriodMs(int value);

@Description(
"If positive, the number of messages to send on streaming rpc before checking isReady."
+ "Higher values reduce cost of output overhead at the cost of more memory used in grpc "
+ "buffers.")
@Default.Integer(10)
int getWindmillMessagesBetweenIsReadyChecks();

void setWindmillMessagesBetweenIsReadyChecks(int value);

/**
* Factory for creating local Windmill address. Reads from system propery 'windmill.hostport' for
* backwards compatibility.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.slf4j.LoggerFactory;

/**
* A {@link StreamObserver} which uses synchronization on the underlying {@link CallStreamObserver}
* A {@link StreamObserver} which synchronizes access to the underlying {@link CallStreamObserver}
* to provide thread safety.
*
* <p>Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser}
Expand All @@ -41,45 +41,66 @@ public final class DirectStreamObserver<T> implements StreamObserver<T> {
private static final Logger LOG = LoggerFactory.getLogger(DirectStreamObserver.class);
private final Phaser phaser;

@GuardedBy("outboundObserver")
private final Object lock = new Object();

@GuardedBy("lock")
private final CallStreamObserver<T> outboundObserver;

private final long deadlineSeconds;
private final int messagesBetweenIsReadyChecks;

@GuardedBy("outboundObserver")
private boolean firstMessage = true;
@GuardedBy("lock")
private int messagesSinceReady = 0;

public DirectStreamObserver(
Phaser phaser, CallStreamObserver<T> outboundObserver, long deadlineSeconds) {
Phaser phaser,
CallStreamObserver<T> outboundObserver,
long deadlineSeconds,
int messagesBetweenIsReadyChecks) {
this.phaser = phaser;
this.outboundObserver = outboundObserver;
this.deadlineSeconds = deadlineSeconds;
// We always let the first message pass through without blocking because it is performed under
// the StreamPool synchronized block and single header message isn't going to cause memory
// issues due to excessive buffering within grpc.
this.messagesBetweenIsReadyChecks = Math.max(1, messagesBetweenIsReadyChecks);
}

@Override
public void onNext(T value) {
final int phase = phaser.getPhase();
int awaitPhase = -1;
long totalSecondsWaited = 0;
long waitSeconds = 1;
while (true) {
try {
synchronized (outboundObserver) {
// We let the first message passthrough without blocking because it is performed under the
// StreamPool synchronized block and single message isn't going to cause memory issues due
// to excessive buffering within grpc.
if (firstMessage || outboundObserver.isReady()) {
firstMessage = false;
synchronized (lock) {
// We only check isReady periodically to effectively allow for increasing the outbound
// buffer periodically. This reduces the overhead of blocking while still restricting
// memory because there is a limited # of streams, and we have a max messages size of 2MB.
if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
outboundObserver.onNext(value);
return;
}
// If we awaited previously and timed out, wait for the same phase. Otherwise we're
// careful to observe the phase before observing isReady.
if (awaitPhase < 0) {
awaitPhase = phaser.getPhase();
}
if (outboundObserver.isReady()) {
messagesSinceReady = 0;
outboundObserver.onNext(value);
return;
}
}
// A callback has been registered to advance the phaser whenever the observer transitions to
// is ready. Since we are waiting for a phase observed before the outboundObserver.isReady()
// returned false, we expect it to advance after the channel has become ready. This doesn't
// always seem to be the case (despite documentation stating otherwise) so we poll
// periodically and enforce an overall timeout related to the stream deadline.
phaser.awaitAdvanceInterruptibly(phase, waitSeconds, TimeUnit.SECONDS);
synchronized (outboundObserver) {
// A callback has been registered to advance the phaser whenever the observer
// transitions to is ready. Since we are waiting for a phase observed before the
// outboundObserver.isReady() returned false, we expect it to advance after the
// channel has become ready. This doesn't always seem to be the case (despite
// documentation stating otherwise) so we poll periodically and enforce an overall
// timeout related to the stream deadline.
phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS);
synchronized (lock) {
messagesSinceReady = 0;
outboundObserver.onNext(value);
return;
}
Expand All @@ -88,33 +109,33 @@ public void onNext(T value) {
if (totalSecondsWaited > deadlineSeconds) {
LOG.error(
"Exceeded timeout waiting for the outboundObserver to become ready meaning "
+ "that the streamdeadline was not respected.");
+ "that the stream deadline was not respected.");
throw new RuntimeException(e);
}
if (totalSecondsWaited > 30) {
LOG.info(
"Output channel stalled for {}s, outbound thread {}.",
totalSecondsWaited,
Thread.currentThread().getName());
}
waitSeconds = waitSeconds * 2;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
if (totalSecondsWaited > 30) {
LOG.info(
"Output channel stalled for {}s, outbound thread {}.",
totalSecondsWaited,
Thread.currentThread().getName());
}
}
}

@Override
public void onError(Throwable t) {
synchronized (outboundObserver) {
synchronized (lock) {
outboundObserver.onError(t);
}
}

@Override
public void onCompleted() {
synchronized (outboundObserver) {
synchronized (lock) {
outboundObserver.onCompleted();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,8 @@ private static long uniqueId() {
*/
private abstract class AbstractWindmillStream<RequestT, ResponseT> implements WindmillStream {
private final StreamObserverFactory streamObserverFactory =
StreamObserverFactory.direct(streamDeadlineSeconds * 2);
StreamObserverFactory.direct(
streamDeadlineSeconds * 2, options.getWindmillMessagesBetweenIsReadyChecks());
private final Function<StreamObserver<ResponseT>, StreamObserver<RequestT>> clientFactory;
private final Executor executor =
Executors.newSingleThreadExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
* to use.
*/
public abstract class StreamObserverFactory {
public static StreamObserverFactory direct(long deadlineSeconds) {
return new Direct(deadlineSeconds);
public static StreamObserverFactory direct(
long deadlineSeconds, int messagesBetweenIsReadyChecks) {
return new Direct(deadlineSeconds, messagesBetweenIsReadyChecks);
}

public abstract <ReqT, RespT> StreamObserver<RespT> from(
Expand All @@ -38,9 +39,11 @@ public abstract <ReqT, RespT> StreamObserver<RespT> from(

private static class Direct extends StreamObserverFactory {
private final long deadlineSeconds;
private final int messagesBetweenIsReadyChecks;

Direct(long deadlineSeconds) {
Direct(long deadlineSeconds, int messagesBetweenIsReadyChecks) {
this.deadlineSeconds = deadlineSeconds;
this.messagesBetweenIsReadyChecks = messagesBetweenIsReadyChecks;
}

@Override
Expand All @@ -53,7 +56,8 @@ public <ReqT, RespT> StreamObserver<RespT> from(
clientFactory.apply(
new ForwardingClientResponseObserver<ReqT, RespT>(
inboundObserver, phaser::arrive, phaser::forceTermination));
return new DirectStreamObserver<>(phaser, outboundObserver, deadlineSeconds);
return new DirectStreamObserver<>(
phaser, outboundObserver, deadlineSeconds, messagesBetweenIsReadyChecks);
}
}
}

0 comments on commit c162f4c

Please sign in to comment.