From 42a9d3b930248d47c80ef77b759bf4e60eaad22b Mon Sep 17 00:00:00 2001 From: t2h6 <60020185+t2h6@users.noreply.github.com> Date: Tue, 2 Jul 2024 16:18:13 -0700 Subject: [PATCH] Address a data race on numMessages in DirectStreamObserver.java --- .../beam/sdk/fn/stream/DirectStreamObserver.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java index 05421758ea59..2dc4c0f66b43 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java @@ -53,7 +53,7 @@ public final class DirectStreamObserver implements StreamObserver { private final int maxMessagesBeforeCheck; private final Object lock = new Object(); - private int numMessages = -1; + private int numMessages; public DirectStreamObserver(Phaser phaser, CallStreamObserver outboundObserver) { this(phaser, outboundObserver, DEFAULT_MAX_MESSAGES_BEFORE_CHECK); @@ -61,9 +61,12 @@ public DirectStreamObserver(Phaser phaser, CallStreamObserver outboundObserve DirectStreamObserver( Phaser phaser, CallStreamObserver outboundObserver, int maxMessagesBeforeCheck) { - this.phaser = phaser; - this.outboundObserver = outboundObserver; - this.maxMessagesBeforeCheck = maxMessagesBeforeCheck; + synchronized (this.lock) { + this.phaser = phaser; + this.outboundObserver = outboundObserver; + this.maxMessagesBeforeCheck = maxMessagesBeforeCheck; + this.numMessages = -1; + } } @Override