Skip to content

Commit

Permalink
Reverting compression changes (googleapis#1651)
Browse files Browse the repository at this point in the history
* Revert "Enabling client-side compression in the library, with an option to (googleapis#1645)".

Compression is not fully supported in gRPC, can't have it in the library
yet.

This reverts commit a599972.
  • Loading branch information
davidtorres authored and garrettjonesgoogle committed Feb 23, 2017
1 parent d7267f5 commit e1ca0d5
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,13 @@ public PollingSubscriberConnection(
Distribution ackLatencyDistribution,
Channel channel,
FlowController flowController,
boolean compressionEnabled,
ScheduledExecutorService executor,
Clock clock) {
this.subscription = subscription;
this.executor = executor;
SubscriberFutureStub subscriberStub =
stub =
SubscriberGrpc.newFutureStub(channel)
.withCallCredentials(MoreCallCredentials.from(credentials));
if (compressionEnabled) {
subscriberStub = subscriberStub.withCompression("gzip");
}
stub = subscriberStub;
messageDispatcher =
new MessageDispatcher(
receiver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import com.google.pubsub.v1.PublishRequest;
import com.google.pubsub.v1.PublishResponse;
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import io.grpc.ManagedChannel;
Expand Down Expand Up @@ -90,8 +89,6 @@ public class Publisher {

private final FlowControlSettings flowControlSettings;
private final boolean failOnFlowControlLimits;

private final boolean compressionEnabled;

private final Lock messagesBundleLock;
private List<OutstandingPublish> messagesBundle;
Expand Down Expand Up @@ -130,8 +127,6 @@ private Publisher(Builder builder) throws IOException {
flowControlSettings = builder.flowControlSettings;
failOnFlowControlLimits = builder.failOnFlowControlLimits;
this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits);

compressionEnabled = builder.compressionEnabled;

messagesBundle = new LinkedList<>();
messagesBundleLock = new ReentrantLock();
Expand Down Expand Up @@ -373,12 +368,10 @@ private void publishOutstandingBundle(final OutstandingBundle outstandingBundle)
* Math.pow(retrySettings.getRpcTimeoutMultiplier(), outstandingBundle.attempt - 1));
rpcTimeoutMs = Math.min(rpcTimeoutMs, retrySettings.getMaxRpcTimeout().getMillis());

PublisherFutureStub stub = PublisherGrpc.newFutureStub(channels[currentChannel])
.withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS);
if (compressionEnabled) {
stub = stub.withCompression("gzip");
}
Futures.addCallback(stub.publish(publishRequest.build()),
Futures.addCallback(
PublisherGrpc.newFutureStub(channels[currentChannel])
.withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS)
.publish(publishRequest.build()),
new FutureCallback<PublishResponse>() {
@Override
public void onSuccess(PublishResponse result) {
Expand Down Expand Up @@ -633,8 +626,6 @@ public long nextLong(long least, long bound) {

ChannelProvider channelProvider = PublisherSettings.defaultChannelProviderBuilder().build();
ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;

boolean compressionEnabled = true; // client-side compression enabled by default

private Builder(TopicName topic) {
this.topicName = Preconditions.checkNotNull(topic);
Expand Down Expand Up @@ -708,15 +699,6 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = Preconditions.checkNotNull(executorProvider);
return this;
}

/**
* Gives the ability to disable client-side compression.
* Note compression is enabled by default.
*/
public Builder setCompressionEnabled(boolean enabled) {
this.compressionEnabled = enabled;
return this;
}

public Publisher build() throws IOException {
return new Publisher(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ final class StreamingSubscriberConnection extends AbstractService implements Ack

private final Channel channel;
private final Credentials credentials;
private final boolean compressionEnabled;

private final String subscription;
private final ScheduledExecutorService executor;
Expand All @@ -76,7 +75,6 @@ public StreamingSubscriberConnection(
Distribution ackLatencyDistribution,
Channel channel,
FlowController flowController,
boolean compressionEnabled,
ScheduledExecutorService executor,
Clock clock) {
this.subscription = subscription;
Expand All @@ -93,7 +91,6 @@ public StreamingSubscriberConnection(
executor,
clock);
messageDispatcher.setMessageDeadlineSeconds(streamAckDeadlineSeconds);
this.compressionEnabled = compressionEnabled;
}

@Override
Expand Down Expand Up @@ -151,16 +148,12 @@ private void initialize() {
final SettableFuture<Void> errorFuture = SettableFuture.create();
final ClientResponseObserver<StreamingPullRequest, StreamingPullResponse> responseObserver =
new StreamingPullResponseObserver(errorFuture);

CallOptions callOptions =
CallOptions.DEFAULT.withCallCredentials(MoreCallCredentials.from(credentials));
if (compressionEnabled) {
callOptions = callOptions.withCompression("gzip");
}
final ClientCallStreamObserver<StreamingPullRequest> requestObserver =
(ClientCallStreamObserver<StreamingPullRequest>)
(ClientCalls.asyncBidiStreamingCall(
channel.newCall(SubscriberGrpc.METHOD_STREAMING_PULL, callOptions),
channel.newCall(
SubscriberGrpc.METHOD_STREAMING_PULL,
CallOptions.DEFAULT.withCallCredentials(MoreCallCredentials.from(credentials))),
responseObserver));
logger.log(
Level.INFO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.Clock;
import com.google.cloud.pubsub.spi.v1.Publisher.Builder;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -269,7 +268,6 @@ private static class SubscriberImpl extends AbstractService {
private final List<PollingSubscriberConnection> pollingSubscriberConnections;
private final Clock clock;
private final List<AutoCloseable> closeables = new ArrayList<>();
private final boolean compressionEnabled;
private ScheduledFuture<?> ackDeadlineUpdater;
private int streamAckDeadlineSeconds;

Expand Down Expand Up @@ -319,8 +317,6 @@ public void close() throws IOException {
numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE;
streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(numChannels);
pollingSubscriberConnections = new ArrayList<PollingSubscriberConnection>(numChannels);

compressionEnabled = builder.compressionEnabled;
}

@Override
Expand Down Expand Up @@ -359,7 +355,6 @@ private void startStreamingConnections() {
ackLatencyDistribution,
channelBuilder.build(),
flowController,
compressionEnabled,
executor,
clock));
}
Expand Down Expand Up @@ -435,7 +430,6 @@ private void startPollingConnections() {
ackLatencyDistribution,
channelBuilder.build(),
flowController,
compressionEnabled,
executor,
clock));
}
Expand Down Expand Up @@ -541,8 +535,6 @@ public static final class Builder {
Optional.absent();
Optional<Clock> clock = Optional.absent();

boolean compressionEnabled = true; // client-side compression enabled by default

Builder(SubscriptionName subscriptionName, MessageReceiver receiver) {
this.subscriptionName = subscriptionName;
this.receiver = receiver;
Expand Down Expand Up @@ -606,15 +598,6 @@ Builder setClock(Clock clock) {
this.clock = Optional.of(clock);
return this;
}

/**
* Gives the ability to disable client-side compression.
* Note compression is enabled by default.
*/
public Builder setCompressionEnabled(boolean enabled) {
this.compressionEnabled = enabled;
return this;
}

public Subscriber build() throws IOException {
return new Subscriber(this);
Expand Down

0 comments on commit e1ca0d5

Please sign in to comment.