Skip to content

Commit

Permalink
Merge pull request #16215 from dpcollins-google/publish-no-desync
Browse files Browse the repository at this point in the history
[BEAM-13402] Simplify PubsubLiteSink
  • Loading branch information
ibzib authored Dec 28, 2021
2 parents d7ccd0f + 7dbbd09 commit b33bebc
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
import java.util.HashMap;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** A map of working publishers by PublisherOptions. */
class PublisherCache implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(PublisherCache.class);

@GuardedBy("this")
private final HashMap<PublisherOptions, Publisher<MessageMetadata>> livePublishers =
new HashMap<>();
Expand All @@ -49,6 +53,7 @@ synchronized Publisher<MessageMetadata> get(PublisherOptions options) throws Api
new Listener() {
@Override
public void failed(State s, Throwable t) {
logger.warn("Publisher failed.", t);
evict(options);
}
},
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,20 @@
*/
package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import static java.util.concurrent.TimeUnit.MINUTES;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService.Listener;
import com.google.api.core.ApiService.State;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.function.Consumer;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.PublisherOrError.Kind;
import org.apache.beam.sdk.transforms.DoFn;

/** A sink which publishes messages to Pub/Sub Lite. */
Expand All @@ -46,98 +41,44 @@ public class PubsubLiteSink extends DoFn<PubSubMessage, Void> {
private final PublisherOptions options;

@GuardedBy("this")
private transient PublisherOrError publisherOrError;

// Whenever outstanding is decremented, notify() must be called.
@GuardedBy("this")
private transient int outstanding;

@GuardedBy("this")
private transient Deque<CheckedApiException> errorsSinceLastFinish;
private transient RunState runState;

public PubsubLiteSink(PublisherOptions options) {
this.options = options;
}

@Setup
public void setup() throws ApiException {
Publisher<MessageMetadata> publisher;
publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(options);
synchronized (this) {
outstanding = 0;
errorsSinceLastFinish = new ArrayDeque<>();
publisherOrError = PublisherOrError.ofPublisher(publisher);
private static class RunState {
private final Deque<ApiFuture<MessageMetadata>> futures = new ArrayDeque<>();

private final Publisher<MessageMetadata> publisher;

RunState(PublisherOptions options) {
publisher = PerServerPublisherCache.PUBLISHER_CACHE.get(options);
}

void publish(PubSubMessage message) {
futures.add(publisher.publish(Message.fromProto(message)));
}

void waitForDone() throws Exception {
ApiFutures.allAsList(futures).get(1, MINUTES);
}
// cannot declare in inner class since 'this' means something different.
Consumer<Throwable> onFailure =
t -> {
synchronized (this) {
publisherOrError = PublisherOrError.ofError(ExtractStatus.toCanonical(t));
}
};
publisher.addListener(
new Listener() {
@Override
public void failed(State s, Throwable t) {
onFailure.accept(t);
}
},
SystemExecutors.getFuturesExecutor());
}

private synchronized void decrementOutstanding() {
--outstanding;
notify();
@StartBundle
public synchronized void startBundle() throws ApiException {
runState = new RunState(options);
}

@ProcessElement
public synchronized void processElement(@Element PubSubMessage message)
throws CheckedApiException {
++outstanding;
if (publisherOrError.getKind() == Kind.ERROR) {
throw publisherOrError.error();
}
ApiFuture<MessageMetadata> future =
publisherOrError.publisher().publish(Message.fromProto(message));
// cannot declare in inner class since 'this' means something different.
Consumer<Throwable> onFailure =
t -> {
synchronized (this) {
decrementOutstanding();
errorsSinceLastFinish.push(ExtractStatus.toCanonical(t));
}
};
ApiFutures.addCallback(
future,
new ApiFutureCallback<MessageMetadata>() {
@Override
public void onSuccess(MessageMetadata messageMetadata) {
decrementOutstanding();
}

@Override
public void onFailure(Throwable t) {
onFailure.accept(t);
}
},
SystemExecutors.getFuturesExecutor());
runState.publish(message);
}

// Intentionally don't flush on bundle finish to allow multi-sink client reuse.
@FinishBundle
public synchronized void finishBundle() throws CheckedApiException, InterruptedException {
while (outstanding > 0) {
wait();
}
if (!errorsSinceLastFinish.isEmpty()) {
CheckedApiException canonical = errorsSinceLastFinish.pop();
while (!errorsSinceLastFinish.isEmpty()) {
canonical.addSuppressed(errorsSinceLastFinish.pop());
}
throw canonical;
}
if (publisherOrError.getKind() == Kind.ERROR) {
throw publisherOrError.error();
}
public synchronized void finishBundle() throws Exception {
runState.waitForDone();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.CloudRegion;
Expand All @@ -52,7 +49,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOptions;
Expand All @@ -68,7 +64,6 @@
import org.mockito.Captor;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;
import org.mockito.stubbing.Answer;

@RunWith(JUnit4.class)
public class PubsubLiteSinkTest {
Expand All @@ -92,9 +87,6 @@ private PublisherOptions defaultOptions() {

private final PubsubLiteSink sink = new PubsubLiteSink(defaultOptions());

// Initialized in setUp.
private ApiService.Listener listener;

@Captor
final ArgumentCaptor<Message> publishedMessageCaptor = ArgumentCaptor.forClass(Message.class);

Expand All @@ -110,16 +102,6 @@ private void runWith(Message... messages) {
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
PerServerPublisherCache.PUBLISHER_CACHE.set(defaultOptions(), publisher);
doAnswer(
(Answer<Void>)
args -> {
listener = args.getArgument(0);
return null;
})
.when(publisher)
.addListener(any(), any());
sink.setup();
verify(publisher).addListener(any(), any());
}

@Test
Expand Down Expand Up @@ -207,32 +189,4 @@ public void exceptionMixedWithOK() throws Exception {
assertThat(statusOr.get().code(), equalTo(Code.INTERNAL));
exec.shutdownNow();
}

@Test
public void listenerExceptionOnBundleFinish() throws Exception {
Message message1 = Message.builder().build();
SettableApiFuture<MessageMetadata> future = SettableApiFuture.create();

SettableApiFuture<Void> publishFuture = SettableApiFuture.create();
when(publisher.publish(message1))
.thenAnswer(
args -> {
publishFuture.set(null);
return future;
});
Future<?> executorFuture =
Executors.newSingleThreadExecutor()
.submit(
() -> {
PipelineExecutionException e =
assertThrows(PipelineExecutionException.class, () -> runWith(message1));
Optional<CheckedApiException> statusOr = ExtractStatus.extract(e.getCause());
assertTrue(statusOr.isPresent());
assertThat(statusOr.get().code(), equalTo(Code.INTERNAL));
});
publishFuture.get();
listener.failed(null, new CheckedApiException(Code.INTERNAL).underlying);
future.set(MessageMetadata.of(Partition.of(1), Offset.of(2)));
executorFuture.get();
}
}

0 comments on commit b33bebc

Please sign in to comment.