diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java index 05421758ea59..2dc4c0f66b43 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java @@ -53,7 +53,7 @@ public final class DirectStreamObserver implements StreamObserver { private final int maxMessagesBeforeCheck; private final Object lock = new Object(); - private int numMessages = -1; + private int numMessages; public DirectStreamObserver(Phaser phaser, CallStreamObserver outboundObserver) { this(phaser, outboundObserver, DEFAULT_MAX_MESSAGES_BEFORE_CHECK); @@ -61,9 +61,12 @@ public DirectStreamObserver(Phaser phaser, CallStreamObserver outboundObserve DirectStreamObserver( Phaser phaser, CallStreamObserver outboundObserver, int maxMessagesBeforeCheck) { - this.phaser = phaser; - this.outboundObserver = outboundObserver; - this.maxMessagesBeforeCheck = maxMessagesBeforeCheck; + synchronized (this.lock) { + this.phaser = phaser; + this.outboundObserver = outboundObserver; + this.maxMessagesBeforeCheck = maxMessagesBeforeCheck; + this.numMessages = -1; + } } @Override