diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
index 8df42ea42ff5..908221973fae 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/options/StreamingDataflowWorkerOptions.java
@@ -121,6 +121,15 @@ public interface StreamingDataflowWorkerOptions extends DataflowWorkerHarnessOpt
void setWindmillServiceStreamingRpcHealthCheckPeriodMs(int value);
+ @Description(
+ "If positive, the number of messages to send on streaming rpc before checking isReady."
+ + "Higher values reduce cost of output overhead at the cost of more memory used in grpc "
+ + "buffers.")
+ @Default.Integer(10)
+ int getWindmillMessagesBetweenIsReadyChecks();
+
+ void setWindmillMessagesBetweenIsReadyChecks(int value);
+
/**
* Factory for creating local Windmill address. Reads from system propery 'windmill.hostport' for
* backwards compatibility.
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java
index 0646aba9c116..b2e9ec925153 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java
@@ -28,7 +28,7 @@
import org.slf4j.LoggerFactory;
/**
- * A {@link StreamObserver} which uses synchronization on the underlying {@link CallStreamObserver}
+ * A {@link StreamObserver} which synchronizes access to the underlying {@link CallStreamObserver}
* to provide thread safety.
*
*
Flow control with the underlying {@link CallStreamObserver} is handled with a {@link Phaser}
@@ -41,45 +41,66 @@ public final class DirectStreamObserver implements StreamObserver {
private static final Logger LOG = LoggerFactory.getLogger(DirectStreamObserver.class);
private final Phaser phaser;
- @GuardedBy("outboundObserver")
+ private final Object lock = new Object();
+
+ @GuardedBy("lock")
private final CallStreamObserver outboundObserver;
private final long deadlineSeconds;
+ private final int messagesBetweenIsReadyChecks;
- @GuardedBy("outboundObserver")
- private boolean firstMessage = true;
+ @GuardedBy("lock")
+ private int messagesSinceReady = 0;
public DirectStreamObserver(
- Phaser phaser, CallStreamObserver outboundObserver, long deadlineSeconds) {
+ Phaser phaser,
+ CallStreamObserver outboundObserver,
+ long deadlineSeconds,
+ int messagesBetweenIsReadyChecks) {
this.phaser = phaser;
this.outboundObserver = outboundObserver;
this.deadlineSeconds = deadlineSeconds;
+ // We always let the first message pass through without blocking because it is performed under
+ // the StreamPool synchronized block and single header message isn't going to cause memory
+ // issues due to excessive buffering within grpc.
+ this.messagesBetweenIsReadyChecks = Math.max(1, messagesBetweenIsReadyChecks);
}
@Override
public void onNext(T value) {
- final int phase = phaser.getPhase();
+ int awaitPhase = -1;
long totalSecondsWaited = 0;
long waitSeconds = 1;
while (true) {
try {
- synchronized (outboundObserver) {
- // We let the first message passthrough without blocking because it is performed under the
- // StreamPool synchronized block and single message isn't going to cause memory issues due
- // to excessive buffering within grpc.
- if (firstMessage || outboundObserver.isReady()) {
- firstMessage = false;
+ synchronized (lock) {
+ // We only check isReady periodically to effectively allow for increasing the outbound
+ // buffer periodically. This reduces the overhead of blocking while still restricting
+ // memory because there is a limited # of streams, and we have a max messages size of 2MB.
+ if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
+ outboundObserver.onNext(value);
+ return;
+ }
+ // If we awaited previously and timed out, wait for the same phase. Otherwise we're
+ // careful to observe the phase before observing isReady.
+ if (awaitPhase < 0) {
+ awaitPhase = phaser.getPhase();
+ }
+ if (outboundObserver.isReady()) {
+ messagesSinceReady = 0;
outboundObserver.onNext(value);
return;
}
}
- // A callback has been registered to advance the phaser whenever the observer transitions to
- // is ready. Since we are waiting for a phase observed before the outboundObserver.isReady()
- // returned false, we expect it to advance after the channel has become ready. This doesn't
- // always seem to be the case (despite documentation stating otherwise) so we poll
- // periodically and enforce an overall timeout related to the stream deadline.
- phaser.awaitAdvanceInterruptibly(phase, waitSeconds, TimeUnit.SECONDS);
- synchronized (outboundObserver) {
+ // A callback has been registered to advance the phaser whenever the observer
+ // transitions to is ready. Since we are waiting for a phase observed before the
+ // outboundObserver.isReady() returned false, we expect it to advance after the
+ // channel has become ready. This doesn't always seem to be the case (despite
+ // documentation stating otherwise) so we poll periodically and enforce an overall
+ // timeout related to the stream deadline.
+ phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, TimeUnit.SECONDS);
+ synchronized (lock) {
+ messagesSinceReady = 0;
outboundObserver.onNext(value);
return;
}
@@ -88,33 +109,33 @@ public void onNext(T value) {
if (totalSecondsWaited > deadlineSeconds) {
LOG.error(
"Exceeded timeout waiting for the outboundObserver to become ready meaning "
- + "that the streamdeadline was not respected.");
+ + "that the stream deadline was not respected.");
throw new RuntimeException(e);
}
+ if (totalSecondsWaited > 30) {
+ LOG.info(
+ "Output channel stalled for {}s, outbound thread {}.",
+ totalSecondsWaited,
+ Thread.currentThread().getName());
+ }
waitSeconds = waitSeconds * 2;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
- if (totalSecondsWaited > 30) {
- LOG.info(
- "Output channel stalled for {}s, outbound thread {}.",
- totalSecondsWaited,
- Thread.currentThread().getName());
- }
}
}
@Override
public void onError(Throwable t) {
- synchronized (outboundObserver) {
+ synchronized (lock) {
outboundObserver.onError(t);
}
}
@Override
public void onCompleted() {
- synchronized (outboundObserver) {
+ synchronized (lock) {
outboundObserver.onCompleted();
}
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index 6a5e608f5b8f..d7ae8b2b7347 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -625,7 +625,8 @@ private static long uniqueId() {
*/
private abstract class AbstractWindmillStream implements WindmillStream {
private final StreamObserverFactory streamObserverFactory =
- StreamObserverFactory.direct(streamDeadlineSeconds * 2);
+ StreamObserverFactory.direct(
+ streamDeadlineSeconds * 2, options.getWindmillMessagesBetweenIsReadyChecks());
private final Function, StreamObserver> clientFactory;
private final Executor executor =
Executors.newSingleThreadExecutor(
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamObserverFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamObserverFactory.java
index e3f344c1fb52..fe8878f8f52f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamObserverFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamObserverFactory.java
@@ -28,8 +28,9 @@
* to use.
*/
public abstract class StreamObserverFactory {
- public static StreamObserverFactory direct(long deadlineSeconds) {
- return new Direct(deadlineSeconds);
+ public static StreamObserverFactory direct(
+ long deadlineSeconds, int messagesBetweenIsReadyChecks) {
+ return new Direct(deadlineSeconds, messagesBetweenIsReadyChecks);
}
public abstract StreamObserver from(
@@ -38,9 +39,11 @@ public abstract StreamObserver from(
private static class Direct extends StreamObserverFactory {
private final long deadlineSeconds;
+ private final int messagesBetweenIsReadyChecks;
- Direct(long deadlineSeconds) {
+ Direct(long deadlineSeconds, int messagesBetweenIsReadyChecks) {
this.deadlineSeconds = deadlineSeconds;
+ this.messagesBetweenIsReadyChecks = messagesBetweenIsReadyChecks;
}
@Override
@@ -53,7 +56,8 @@ public StreamObserver from(
clientFactory.apply(
new ForwardingClientResponseObserver(
inboundObserver, phaser::arrive, phaser::forceTermination));
- return new DirectStreamObserver<>(phaser, outboundObserver, deadlineSeconds);
+ return new DirectStreamObserver<>(
+ phaser, outboundObserver, deadlineSeconds, messagesBetweenIsReadyChecks);
}
}
}