diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherCache.java index 76b69b3b93bd..ac85ba98267c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherCache.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherCache.java @@ -27,9 +27,13 @@ import java.util.HashMap; import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** A map of working publishers by PublisherOptions. */ class PublisherCache implements AutoCloseable { + private final Logger logger = LoggerFactory.getLogger(PublisherCache.class); + @GuardedBy("this") private final HashMap> livePublishers = new HashMap<>(); @@ -49,6 +53,7 @@ synchronized Publisher get(PublisherOptions options) throws Api new Listener() { @Override public void failed(State s, Throwable t) { + logger.warn("Publisher failed.", t); evict(options); } }, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherOrError.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherOrError.java deleted file mode 100644 index 7eb1c66d7632..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PublisherOrError.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsublite.internal; - -import com.google.auto.value.AutoOneOf; -import com.google.cloud.pubsublite.MessageMetadata; -import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.Publisher; - -/** A helper representing either a Publisher or an error. */ -@AutoOneOf(PublisherOrError.Kind.class) -@SuppressWarnings({ - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) -abstract class PublisherOrError { - enum Kind { - PUBLISHER, - ERROR - } - - abstract Kind getKind(); - - abstract Publisher publisher(); - - abstract CheckedApiException error(); - - static PublisherOrError ofPublisher(Publisher p) { - return AutoOneOf_PublisherOrError.publisher(p); - } - - static PublisherOrError ofError(CheckedApiException e) { - return AutoOneOf_PublisherOrError.error(e); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java index 4b666d25a29a..f370919f4964 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSink.java @@ -17,25 +17,20 @@ */ package org.apache.beam.sdk.io.gcp.pubsublite.internal; +import static java.util.concurrent.TimeUnit.MINUTES; + import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; -import com.google.api.core.ApiService.Listener; -import com.google.api.core.ApiService.State; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.internal.Publisher; -import com.google.cloud.pubsublite.internal.wire.SystemExecutors; import com.google.cloud.pubsublite.proto.PubSubMessage; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.util.ArrayDeque; import java.util.Deque; -import java.util.function.Consumer; import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions; -import org.apache.beam.sdk.io.gcp.pubsublite.internal.PublisherOrError.Kind; import org.apache.beam.sdk.transforms.DoFn; /** A sink which publishes messages to Pub/Sub Lite. */ @@ -46,98 +41,44 @@ public class PubsubLiteSink extends DoFn { private final PublisherOptions options; @GuardedBy("this") - private transient PublisherOrError publisherOrError; - - // Whenever outstanding is decremented, notify() must be called. - @GuardedBy("this") - private transient int outstanding; - - @GuardedBy("this") - private transient Deque errorsSinceLastFinish; + private transient RunState runState; public PubsubLiteSink(PublisherOptions options) { this.options = options; } - @Setup - public void setup() throws ApiException { - Publisher publisher; - publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(options); - synchronized (this) { - outstanding = 0; - errorsSinceLastFinish = new ArrayDeque<>(); - publisherOrError = PublisherOrError.ofPublisher(publisher); + private static class RunState { + private final Deque> futures = new ArrayDeque<>(); + + private final Publisher publisher; + + RunState(PublisherOptions options) { + publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(options); + } + + void publish(PubSubMessage message) { + futures.add(publisher.publish(Message.fromProto(message))); + } + + void waitForDone() throws Exception { + ApiFutures.allAsList(futures).get(1, MINUTES); } - // cannot declare in inner class since 'this' means something different. - Consumer onFailure = - t -> { - synchronized (this) { - publisherOrError = PublisherOrError.ofError(ExtractStatus.toCanonical(t)); - } - }; - publisher.addListener( - new Listener() { - @Override - public void failed(State s, Throwable t) { - onFailure.accept(t); - } - }, - SystemExecutors.getFuturesExecutor()); } - private synchronized void decrementOutstanding() { - --outstanding; - notify(); + @StartBundle + public synchronized void startBundle() throws ApiException { + runState = new RunState(options); } @ProcessElement public synchronized void processElement(@Element PubSubMessage message) throws CheckedApiException { - ++outstanding; - if (publisherOrError.getKind() == Kind.ERROR) { - throw publisherOrError.error(); - } - ApiFuture future = - publisherOrError.publisher().publish(Message.fromProto(message)); - // cannot declare in inner class since 'this' means something different. - Consumer onFailure = - t -> { - synchronized (this) { - decrementOutstanding(); - errorsSinceLastFinish.push(ExtractStatus.toCanonical(t)); - } - }; - ApiFutures.addCallback( - future, - new ApiFutureCallback() { - @Override - public void onSuccess(MessageMetadata messageMetadata) { - decrementOutstanding(); - } - - @Override - public void onFailure(Throwable t) { - onFailure.accept(t); - } - }, - SystemExecutors.getFuturesExecutor()); + runState.publish(message); } // Intentionally don't flush on bundle finish to allow multi-sink client reuse. @FinishBundle - public synchronized void finishBundle() throws CheckedApiException, InterruptedException { - while (outstanding > 0) { - wait(); - } - if (!errorsSinceLastFinish.isEmpty()) { - CheckedApiException canonical = errorsSinceLastFinish.pop(); - while (!errorsSinceLastFinish.isEmpty()) { - canonical.addSuppressed(errorsSinceLastFinish.pop()); - } - throw canonical; - } - if (publisherOrError.getKind() == Kind.ERROR) { - throw publisherOrError.error(); - } + public synchronized void finishBundle() throws Exception { + runState.waitForDone(); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSinkTest.java index df638ff50d67..5c4e731a2128 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/internal/PubsubLiteSinkTest.java @@ -23,14 +23,11 @@ import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.api.core.ApiFutures; -import com.google.api.core.ApiService; import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.pubsublite.CloudRegion; @@ -52,7 +49,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.stream.Collectors; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions; @@ -68,7 +64,6 @@ import org.mockito.Captor; import org.mockito.MockitoAnnotations; import org.mockito.Spy; -import org.mockito.stubbing.Answer; @RunWith(JUnit4.class) public class PubsubLiteSinkTest { @@ -92,9 +87,6 @@ private PublisherOptions defaultOptions() { private final PubsubLiteSink sink = new PubsubLiteSink(defaultOptions()); - // Initialized in setUp. - private ApiService.Listener listener; - @Captor final ArgumentCaptor publishedMessageCaptor = ArgumentCaptor.forClass(Message.class); @@ -110,16 +102,6 @@ private void runWith(Message... messages) { public void setUp() throws Exception { MockitoAnnotations.initMocks(this); PerServerPublisherCache.PUBLISHER_CACHE.set(defaultOptions(), publisher); - doAnswer( - (Answer) - args -> { - listener = args.getArgument(0); - return null; - }) - .when(publisher) - .addListener(any(), any()); - sink.setup(); - verify(publisher).addListener(any(), any()); } @Test @@ -207,32 +189,4 @@ public void exceptionMixedWithOK() throws Exception { assertThat(statusOr.get().code(), equalTo(Code.INTERNAL)); exec.shutdownNow(); } - - @Test - public void listenerExceptionOnBundleFinish() throws Exception { - Message message1 = Message.builder().build(); - SettableApiFuture future = SettableApiFuture.create(); - - SettableApiFuture publishFuture = SettableApiFuture.create(); - when(publisher.publish(message1)) - .thenAnswer( - args -> { - publishFuture.set(null); - return future; - }); - Future executorFuture = - Executors.newSingleThreadExecutor() - .submit( - () -> { - PipelineExecutionException e = - assertThrows(PipelineExecutionException.class, () -> runWith(message1)); - Optional statusOr = ExtractStatus.extract(e.getCause()); - assertTrue(statusOr.isPresent()); - assertThat(statusOr.get().code(), equalTo(Code.INTERNAL)); - }); - publishFuture.get(); - listener.failed(null, new CheckedApiException(Code.INTERNAL).underlying); - future.set(MessageMetadata.of(Partition.of(1), Offset.of(2))); - executorFuture.get(); - } }