Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make pubsub use gapic client #3581

Merged
merged 6 commits into from
Nov 7, 2018
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package com.google.cloud.pubsub.v1;

import com.google.api.gax.rpc.ApiException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;

/** Utilities for handling gRPC {@link Status}. */
final class StatusUtil {
Expand All @@ -26,23 +26,20 @@ private StatusUtil() {
}

public static boolean isRetryable(Throwable error) {
if (!(error instanceof StatusRuntimeException)) {
if (!(error instanceof ApiException)) {
return true;
}
StatusRuntimeException statusRuntimeException = (StatusRuntimeException) error;
switch (statusRuntimeException.getStatus().getCode()) {
ApiException apiException = (ApiException) error;
switch (apiException.getStatusCode().getCode()) {
case DEADLINE_EXCEEDED:
case INTERNAL:
case CANCELLED:
case RESOURCE_EXHAUSTED:
case ABORTED:
return true;
case UNAVAILABLE:
if (statusRuntimeException.getMessage().contains("Server shutdownNow invoked")) {
return false;
} else {
return true;
}
// TODO(pongad): check that ApiException propagate message properly.
return !apiException.getMessage().contains("Server shutdownNow invoked");
default:
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,31 @@

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiClock;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.Distribution;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptionFactory;
import com.google.api.gax.rpc.ClientStream;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.pubsub.v1.MessageDispatcher.AckProcessor;
import com.google.cloud.pubsub.v1.MessageDispatcher.PendingModifyAckDeadline;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.StreamingPullRequest;
import com.google.pubsub.v1.StreamingPullResponse;
import com.google.pubsub.v1.SubscriberGrpc.SubscriberStub;
import io.grpc.Status;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
Expand All @@ -62,9 +64,9 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100);
private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10);
private static final int MAX_PER_REQUEST_CHANGES = 1000;
private static final Duration UNARY_TIMEOUT = Duration.ofSeconds(60);

private final SubscriberStub stub;
private final int chanAffinity;
private final String subscription;
private final ScheduledExecutorService systemExecutor;
private final MessageDispatcher messageDispatcher;
Expand All @@ -73,7 +75,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());

private final Lock lock = new ReentrantLock();
private ClientCallStreamObserver<StreamingPullRequest> requestObserver;
private ClientStream<StreamingPullRequest> clientStream;

public StreamingSubscriberConnection(
String subscription,
Expand All @@ -82,6 +84,7 @@ public StreamingSubscriberConnection(
Duration maxAckExtensionPeriod,
Distribution ackLatencyDistribution,
SubscriberStub stub,
int chanAffinity,
FlowController flowController,
Deque<MessageDispatcher.OutstandingMessageBatch> outstandingMessageBatches,
ScheduledExecutorService executor,
Expand All @@ -90,6 +93,7 @@ public StreamingSubscriberConnection(
this.subscription = subscription;
this.systemExecutor = systemExecutor;
this.stub = stub;
this.chanAffinity = chanAffinity;
this.messageDispatcher =
new MessageDispatcher(
receiver,
Expand Down Expand Up @@ -118,15 +122,14 @@ protected void doStop() {

lock.lock();
try {
requestObserver.onError(Status.CANCELLED.asException());
clientStream.closeSendWithError(Status.CANCELLED.asException());
} finally {
lock.unlock();
notifyStopped();
}
}

private class StreamingPullResponseObserver
implements ClientResponseObserver<StreamingPullRequest, StreamingPullResponse> {
private class StreamingPullResponseObserver implements ResponseObserver<StreamingPullResponse> {

final SettableApiFuture<Void> errorFuture;

Expand All @@ -137,20 +140,21 @@ private class StreamingPullResponseObserver
* the user can deal with -- so we save the request observer this response observer is "paired
* with". If the stream has already errored, requesting more messages is a no-op.
*/
ClientCallStreamObserver<StreamingPullRequest> thisRequestObserver;
StreamController thisController;

StreamingPullResponseObserver(SettableApiFuture<Void> errorFuture) {
this.errorFuture = errorFuture;
}

@Override
public void beforeStart(ClientCallStreamObserver<StreamingPullRequest> requestObserver) {
thisRequestObserver = requestObserver;
requestObserver.disableAutoInboundFlowControl();
public void onStart(StreamController controller) {
thisController = controller;
thisController.disableAutoInboundFlowControl();
thisController.request(1);
}

@Override
public void onNext(StreamingPullResponse response) {
public void onResponse(StreamingPullResponse response) {
channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
messageDispatcher.processReceivedMessages(
response.getReceivedMessagesList(),
Expand All @@ -163,7 +167,7 @@ public void run() {
if (isAlive() && !errorFuture.isDone()) {
lock.lock();
try {
thisRequestObserver.request(1);
thisController.request(1);
} catch (Exception e) {
logger.log(Level.WARNING, "cannot request more messages", e);
} finally {
Expand All @@ -180,29 +184,30 @@ public void onError(Throwable t) {
}

@Override
public void onCompleted() {
public void onComplete() {
logger.fine("Streaming pull terminated successfully!");
errorFuture.set(null);
}
}

private void initialize() {
final SettableApiFuture<Void> errorFuture = SettableApiFuture.create();
final ClientResponseObserver<StreamingPullRequest, StreamingPullResponse> responseObserver =
final ResponseObserver<StreamingPullResponse> responseObserver =
new StreamingPullResponseObserver(errorFuture);
final ClientCallStreamObserver<StreamingPullRequest> requestObserver =
(ClientCallStreamObserver<StreamingPullRequest>) (stub.streamingPull(responseObserver));
logger.log(
Level.FINER,
"Initializing stream to subscription {0}",subscription);
// We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt anyway.
// Set to some big-ish value in case we modack late.
requestObserver.onNext(
ClientStream<StreamingPullRequest> initClientStream =
stub.streamingPullCallable()
.splitCall(
responseObserver,
GrpcCallContext.createDefault().withChannelAffinity(chanAffinity));

logger.log(Level.FINER, "Initializing stream to subscription {0}", subscription);
// We need to set streaming ack deadline, but it's not useful since we'll modack to send receipt
// anyway. Set to some big-ish value in case we modack late.
initClientStream.send(
StreamingPullRequest.newBuilder()
.setSubscription(subscription)
.setStreamAckDeadlineSeconds(60)
.build());
requestObserver.request(1);

/**
* Must make sure we do this after sending the subscription name and deadline. Otherwise, some
Expand All @@ -211,7 +216,7 @@ private void initialize() {
*/
lock.lock();
try {
this.requestObserver = requestObserver;
this.clientStream = initClientStream;
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -273,45 +278,43 @@ private boolean isAlive() {
@Override
public void sendAckOperations(
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions) {
SubscriberStub timeoutStub =
stub.withDeadlineAfter(UNARY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
StreamObserver<Empty> loggingObserver = new StreamObserver<Empty>() {
@Override
public void onCompleted() {
// noop
}

@Override
public void onNext(Empty e) {
// noop
}
ApiFutureCallback<Empty> loggingCallback =
new ApiFutureCallback<Empty>() {
@Override
public void onSuccess(Empty empty) {
// noop
}

@Override
public void onError(Throwable t) {
Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);
}
};
@Override
public void onFailure(Throwable t) {
Level level = isAlive() ? Level.WARNING : Level.FINER;
logger.log(level, "failed to send operations", t);
}
};

for (PendingModifyAckDeadline modack : ackDeadlineExtensions) {
for (List<String> idChunk : Lists.partition(modack.ackIds, MAX_PER_REQUEST_CHANGES)) {
timeoutStub.modifyAckDeadline(
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(subscription)
.addAllAckIds(idChunk)
.setAckDeadlineSeconds(modack.deadlineExtensionSeconds)
.build(),
loggingObserver);
ApiFuture<Empty> future =
stub.modifyAckDeadlineCallable()
.futureCall(
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(subscription)
.addAllAckIds(idChunk)
.setAckDeadlineSeconds(modack.deadlineExtensionSeconds)
.build());
ApiFutures.addCallback(future, loggingCallback);

This comment was marked as spam.

}
}

for (List<String> idChunk : Lists.partition(acksToSend, MAX_PER_REQUEST_CHANGES)) {
timeoutStub.acknowledge(
AcknowledgeRequest.newBuilder()
.setSubscription(subscription)
.addAllAckIds(idChunk)
.build(),
loggingObserver);
ApiFuture<Empty> future =
stub.acknowledgeCallable()
.futureCall(
AcknowledgeRequest.newBuilder()
.setSubscription(subscription)
.addAllAckIds(idChunk)
.build());
ApiFutures.addCallback(future, loggingCallback);
}
}

Expand Down
Loading