Skip to content

Commit

Permalink
Enabling client-side compression in the library, with an option to (g…
Browse files Browse the repository at this point in the history
  • Loading branch information
davidtorres authored and garrettjonesgoogle committed Feb 22, 2017
1 parent 0fb03eb commit a599972
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,18 @@ public PollingSubscriberConnection(
Distribution ackLatencyDistribution,
Channel channel,
FlowController flowController,
boolean compressionEnabled,
ScheduledExecutorService executor,
Clock clock) {
this.subscription = subscription;
this.executor = executor;
stub =
SubscriberFutureStub subscriberStub =
SubscriberGrpc.newFutureStub(channel)
.withCallCredentials(MoreCallCredentials.from(credentials));
if (compressionEnabled) {
subscriberStub = subscriberStub.withCompression("gzip");
}
stub = subscriberStub;
messageDispatcher =
new MessageDispatcher(
receiver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.grpc.ManagedChannel;
Expand Down Expand Up @@ -89,6 +90,8 @@ public class Publisher {

private final FlowControlSettings flowControlSettings;
private final boolean failOnFlowControlLimits;

private final boolean compressionEnabled;

private final Lock messagesBundleLock;
private List<OutstandingPublish> messagesBundle;
Expand Down Expand Up @@ -127,6 +130,8 @@ private Publisher(Builder builder) throws IOException {
flowControlSettings = builder.flowControlSettings;
failOnFlowControlLimits = builder.failOnFlowControlLimits;
this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits);

compressionEnabled = builder.compressionEnabled;

messagesBundle = new LinkedList<>();
messagesBundleLock = new ReentrantLock();
Expand Down Expand Up @@ -368,10 +373,12 @@ private void publishOutstandingBundle(final OutstandingBundle outstandingBundle)
* Math.pow(retrySettings.getRpcTimeoutMultiplier(), outstandingBundle.attempt - 1));
rpcTimeoutMs = Math.min(rpcTimeoutMs, retrySettings.getMaxRpcTimeout().getMillis());

Futures.addCallback(
PublisherGrpc.newFutureStub(channels[currentChannel])
.withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS)
.publish(publishRequest.build()),
PublisherFutureStub stub = PublisherGrpc.newFutureStub(channels[currentChannel])
.withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS);
if (compressionEnabled) {
stub = stub.withCompression("gzip");
}
Futures.addCallback(stub.publish(publishRequest.build()),
new FutureCallback<PublishResponse>() {
@Override
public void onSuccess(PublishResponse result) {
Expand Down Expand Up @@ -626,6 +633,8 @@ public long nextLong(long least, long bound) {

ChannelProvider channelProvider = PublisherSettings.defaultChannelProviderBuilder().build();
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;

boolean compressionEnabled = true; // client-side compression enabled by default

private Builder(TopicName topic) {
this.topicName = Preconditions.checkNotNull(topic);
Expand Down Expand Up @@ -699,6 +708,15 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = Preconditions.checkNotNull(executorProvider);
return this;
}

/**
* Gives the ability to disable client-side compression.
* Note compression is enabled by default.
*/
public Builder setCompressionEnabled(boolean enabled) {
this.compressionEnabled = enabled;
return this;
}

public Publisher build() throws IOException {
return new Publisher(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ final class StreamingSubscriberConnection extends AbstractService implements Ack

private final Channel channel;
private final Credentials credentials;
private final boolean compressionEnabled;

private final String subscription;
private final ScheduledExecutorService executor;
Expand All @@ -75,6 +76,7 @@ public StreamingSubscriberConnection(
Distribution ackLatencyDistribution,
Channel channel,
FlowController flowController,
boolean compressionEnabled,
ScheduledExecutorService executor,
Clock clock) {
this.subscription = subscription;
Expand All @@ -91,6 +93,7 @@ public StreamingSubscriberConnection(
executor,
clock);
messageDispatcher.setMessageDeadlineSeconds(streamAckDeadlineSeconds);
this.compressionEnabled = compressionEnabled;
}

@Override
Expand Down Expand Up @@ -148,12 +151,16 @@ private void initialize() {
final SettableFuture<Void> errorFuture = SettableFuture.create();
final ClientResponseObserver<StreamingPullRequest, StreamingPullResponse> responseObserver =
new StreamingPullResponseObserver(errorFuture);

CallOptions callOptions =
CallOptions.DEFAULT.withCallCredentials(MoreCallCredentials.from(credentials));
if (compressionEnabled) {
callOptions = callOptions.withCompression("gzip");
}
final ClientCallStreamObserver<StreamingPullRequest> requestObserver =
(ClientCallStreamObserver<StreamingPullRequest>)
(ClientCalls.asyncBidiStreamingCall(
channel.newCall(
SubscriberGrpc.METHOD_STREAMING_PULL,
CallOptions.DEFAULT.withCallCredentials(MoreCallCredentials.from(credentials))),
channel.newCall(SubscriberGrpc.METHOD_STREAMING_PULL, callOptions),
responseObserver));
logger.log(
Level.INFO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.spi.v1.Publisher.Builder;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -268,6 +269,7 @@ private static class SubscriberImpl extends AbstractService {
private final List<PollingSubscriberConnection> pollingSubscriberConnections;
private final Clock clock;
private final List<AutoCloseable> closeables = new ArrayList<>();
private final boolean compressionEnabled;
private ScheduledFuture<?> ackDeadlineUpdater;
private int streamAckDeadlineSeconds;

Expand Down Expand Up @@ -317,6 +319,8 @@ public void close() throws IOException {
numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE;
streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(numChannels);
pollingSubscriberConnections = new ArrayList<PollingSubscriberConnection>(numChannels);

compressionEnabled = builder.compressionEnabled;
}

@Override
Expand Down Expand Up @@ -355,6 +359,7 @@ private void startStreamingConnections() {
ackLatencyDistribution,
channelBuilder.build(),
flowController,
compressionEnabled,
executor,
clock));
}
Expand Down Expand Up @@ -430,6 +435,7 @@ private void startPollingConnections() {
ackLatencyDistribution,
channelBuilder.build(),
flowController,
compressionEnabled,
executor,
clock));
}
Expand Down Expand Up @@ -535,6 +541,8 @@ public static final class Builder {
Optional.absent();
Optional<Clock> clock = Optional.absent();

boolean compressionEnabled = true; // client-side compression enabled by default

Builder(SubscriptionName subscriptionName, MessageReceiver receiver) {
this.subscriptionName = subscriptionName;
this.receiver = receiver;
Expand Down Expand Up @@ -598,6 +606,15 @@ Builder setClock(Clock clock) {
this.clock = Optional.of(clock);
return this;
}

/**
* Gives the ability to disable client-side compression.
* Note compression is enabled by default.
*/
public Builder setCompressionEnabled(boolean enabled) {
this.compressionEnabled = enabled;
return this;
}

public Subscriber build() throws IOException {
return new Subscriber(this);
Expand Down

0 comments on commit a599972

Please sign in to comment.