diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java index a253ab2fe259..455bfbd37ba8 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java @@ -74,18 +74,13 @@ public PollingSubscriberConnection( Distribution ackLatencyDistribution, Channel channel, FlowController flowController, - boolean compressionEnabled, ScheduledExecutorService executor, Clock clock) { this.subscription = subscription; this.executor = executor; - SubscriberFutureStub subscriberStub = + stub = SubscriberGrpc.newFutureStub(channel) .withCallCredentials(MoreCallCredentials.from(credentials)); - if (compressionEnabled) { - subscriberStub = subscriberStub.withCompression("gzip"); - } - stub = subscriberStub; messageDispatcher = new MessageDispatcher( receiver, diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java index c37fd637e103..e72cae3bd7a9 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Publisher.java @@ -39,7 +39,6 @@ import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; import com.google.pubsub.v1.PublisherGrpc; -import com.google.pubsub.v1.PublisherGrpc.PublisherFutureStub; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.TopicName; import io.grpc.ManagedChannel; @@ -90,8 +89,6 @@ public class Publisher { private final FlowControlSettings flowControlSettings; private final boolean failOnFlowControlLimits; - - private final boolean compressionEnabled; private final Lock messagesBundleLock; private List messagesBundle; @@ -130,8 +127,6 @@ private Publisher(Builder builder) throws IOException { flowControlSettings = builder.flowControlSettings; failOnFlowControlLimits = builder.failOnFlowControlLimits; this.flowController = new FlowController(flowControlSettings, failOnFlowControlLimits); - - compressionEnabled = builder.compressionEnabled; messagesBundle = new LinkedList<>(); messagesBundleLock = new ReentrantLock(); @@ -373,12 +368,10 @@ private void publishOutstandingBundle(final OutstandingBundle outstandingBundle) * Math.pow(retrySettings.getRpcTimeoutMultiplier(), outstandingBundle.attempt - 1)); rpcTimeoutMs = Math.min(rpcTimeoutMs, retrySettings.getMaxRpcTimeout().getMillis()); - PublisherFutureStub stub = PublisherGrpc.newFutureStub(channels[currentChannel]) - .withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS); - if (compressionEnabled) { - stub = stub.withCompression("gzip"); - } - Futures.addCallback(stub.publish(publishRequest.build()), + Futures.addCallback( + PublisherGrpc.newFutureStub(channels[currentChannel]) + .withDeadlineAfter(rpcTimeoutMs, TimeUnit.MILLISECONDS) + .publish(publishRequest.build()), new FutureCallback() { @Override public void onSuccess(PublishResponse result) { @@ -633,8 +626,6 @@ public long nextLong(long least, long bound) { ChannelProvider channelProvider = PublisherSettings.defaultChannelProviderBuilder().build(); ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER; - - boolean compressionEnabled = true; // client-side compression enabled by default private Builder(TopicName topic) { this.topicName = Preconditions.checkNotNull(topic); @@ -708,15 +699,6 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) { this.executorProvider = Preconditions.checkNotNull(executorProvider); return this; } - - /** - * Gives the ability to disable client-side compression. - * Note compression is enabled by default. - */ - public Builder setCompressionEnabled(boolean enabled) { - this.compressionEnabled = enabled; - return this; - } public Publisher build() throws IOException { return new Publisher(this); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java index 57352bc7c43b..6b4b0ef495a0 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java @@ -60,7 +60,6 @@ final class StreamingSubscriberConnection extends AbstractService implements Ack private final Channel channel; private final Credentials credentials; - private final boolean compressionEnabled; private final String subscription; private final ScheduledExecutorService executor; @@ -76,7 +75,6 @@ public StreamingSubscriberConnection( Distribution ackLatencyDistribution, Channel channel, FlowController flowController, - boolean compressionEnabled, ScheduledExecutorService executor, Clock clock) { this.subscription = subscription; @@ -93,7 +91,6 @@ public StreamingSubscriberConnection( executor, clock); messageDispatcher.setMessageDeadlineSeconds(streamAckDeadlineSeconds); - this.compressionEnabled = compressionEnabled; } @Override @@ -151,16 +148,12 @@ private void initialize() { final SettableFuture errorFuture = SettableFuture.create(); final ClientResponseObserver responseObserver = new StreamingPullResponseObserver(errorFuture); - - CallOptions callOptions = - CallOptions.DEFAULT.withCallCredentials(MoreCallCredentials.from(credentials)); - if (compressionEnabled) { - callOptions = callOptions.withCompression("gzip"); - } final ClientCallStreamObserver requestObserver = (ClientCallStreamObserver) (ClientCalls.asyncBidiStreamingCall( - channel.newCall(SubscriberGrpc.METHOD_STREAMING_PULL, callOptions), + channel.newCall( + SubscriberGrpc.METHOD_STREAMING_PULL, + CallOptions.DEFAULT.withCallCredentials(MoreCallCredentials.from(credentials))), responseObserver)); logger.log( Level.INFO, diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java index 80898747316e..902226db408f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/Subscriber.java @@ -24,7 +24,6 @@ import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.Clock; -import com.google.cloud.pubsub.spi.v1.Publisher.Builder; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -269,7 +268,6 @@ private static class SubscriberImpl extends AbstractService { private final List pollingSubscriberConnections; private final Clock clock; private final List closeables = new ArrayList<>(); - private final boolean compressionEnabled; private ScheduledFuture ackDeadlineUpdater; private int streamAckDeadlineSeconds; @@ -319,8 +317,6 @@ public void close() throws IOException { numChannels = Math.max(1, Runtime.getRuntime().availableProcessors()) * CHANNELS_PER_CORE; streamingSubscriberConnections = new ArrayList(numChannels); pollingSubscriberConnections = new ArrayList(numChannels); - - compressionEnabled = builder.compressionEnabled; } @Override @@ -359,7 +355,6 @@ private void startStreamingConnections() { ackLatencyDistribution, channelBuilder.build(), flowController, - compressionEnabled, executor, clock)); } @@ -435,7 +430,6 @@ private void startPollingConnections() { ackLatencyDistribution, channelBuilder.build(), flowController, - compressionEnabled, executor, clock)); } @@ -541,8 +535,6 @@ public static final class Builder { Optional.absent(); Optional clock = Optional.absent(); - boolean compressionEnabled = true; // client-side compression enabled by default - Builder(SubscriptionName subscriptionName, MessageReceiver receiver) { this.subscriptionName = subscriptionName; this.receiver = receiver; @@ -606,15 +598,6 @@ Builder setClock(Clock clock) { this.clock = Optional.of(clock); return this; } - - /** - * Gives the ability to disable client-side compression. - * Note compression is enabled by default. - */ - public Builder setCompressionEnabled(boolean enabled) { - this.compressionEnabled = enabled; - return this; - } public Subscriber build() throws IOException { return new Subscriber(this);