Skip to content

Commit

Permalink
use RpcFuture and remove old BundlingSettings (#1572)
Browse files Browse the repository at this point in the history
* use RpcFuture and remove old BundlingSettings
  • Loading branch information
pongad authored Jan 30, 2017
1 parent da93da7 commit cc6796a
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 82 deletions.
2 changes: 1 addition & 1 deletion google-cloud-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<version>0.0.25</version>
<version>0.0.28</version>
<exclusions>
<exclusion>
<groupId>io.grpc</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.logging.v2.CreateLogMetricRequest;
import com.google.logging.v2.CreateSinkRequest;
Expand Down Expand Up @@ -107,9 +106,6 @@ private static <V> V get(Future<V> future) {

private static <I, O> Future<O> transform(Future<I> future,
Function<? super I, ? extends O> function) {
if (future instanceof ListenableFuture) {
return Futures.transform((ListenableFuture<I>) future, function);
}
return Futures.lazyTransform(future, function);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.logging.spi;

import com.google.api.gax.core.Function;
import com.google.api.gax.core.RpcFuture;
import com.google.api.gax.grpc.ApiException;
import com.google.api.gax.grpc.ChannelProvider;
import com.google.api.gax.grpc.ExecutorProvider;
Expand All @@ -33,10 +35,6 @@
import com.google.cloud.logging.spi.v2.LoggingServiceV2Settings;
import com.google.cloud.logging.spi.v2.MetricsServiceV2Client;
import com.google.cloud.logging.spi.v2.MetricsServiceV2Settings;
import com.google.common.base.Function;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.logging.v2.CreateLogMetricRequest;
import com.google.logging.v2.CreateSinkRequest;
import com.google.logging.v2.DeleteLogMetricRequest;
Expand All @@ -59,13 +57,13 @@
import com.google.logging.v2.WriteLogEntriesRequest;
import com.google.logging.v2.WriteLogEntriesResponse;
import com.google.protobuf.Empty;

import io.grpc.ManagedChannel;
import io.grpc.Status.Code;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;

import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -148,21 +146,25 @@ public DefaultLoggingRpc(LoggingOptions options) throws IOException {
}
}

private static <V> Future<V> translate(ListenableFuture<V> from, final boolean idempotent,
int... returnNullOn) {
final Set<Integer> returnNullOnSet = Sets.newHashSetWithExpectedSize(returnNullOn.length);
for (int value : returnNullOn) {
returnNullOnSet.add(value);
private static <V> Future<V> translate(
RpcFuture<V> from, final boolean idempotent, Code... returnNullOn) {
final Set<Code> returnNullOnSet;
if (returnNullOn.length > 0) {
returnNullOnSet = EnumSet.of(returnNullOn[0], returnNullOn);
} else {
returnNullOnSet = Collections.<Code>emptySet();
}
return Futures.catching(from, ApiException.class, new Function<ApiException, V>() {
@Override
public V apply(ApiException exception) {
if (returnNullOnSet.contains(exception.getStatusCode().value())) {
return null;
}
throw new LoggingException(exception, idempotent);
}
});
return from.catching(
ApiException.class,
new Function<ApiException, V>() {
@Override
public V apply(ApiException exception) {
if (returnNullOnSet.contains(exception.getStatusCode().value())) {
return null;
}
throw new LoggingException(exception, idempotent);
}
});
}

@Override
Expand All @@ -177,7 +179,7 @@ public Future<LogSink> update(UpdateSinkRequest request) {

@Override
public Future<LogSink> get(GetSinkRequest request) {
return translate(configClient.getSinkCallable().futureCall(request), true, Code.NOT_FOUND.value());
return translate(configClient.getSinkCallable().futureCall(request), true, Code.NOT_FOUND);
}

@Override
Expand All @@ -187,14 +189,12 @@ public Future<ListSinksResponse> list(ListSinksRequest request) {

@Override
public Future<Empty> delete(DeleteSinkRequest request) {
return translate(configClient.deleteSinkCallable().futureCall(request), true,
Code.NOT_FOUND.value());
return translate(configClient.deleteSinkCallable().futureCall(request), true, Code.NOT_FOUND);
}

@Override
public Future<Empty> delete(DeleteLogRequest request) {
return translate(loggingClient.deleteLogCallable().futureCall(request), true,
Code.NOT_FOUND.value());
return translate(loggingClient.deleteLogCallable().futureCall(request), true, Code.NOT_FOUND);
}

@Override
Expand Down Expand Up @@ -226,8 +226,8 @@ public Future<LogMetric> update(UpdateLogMetricRequest request) {

@Override
public Future<LogMetric> get(GetLogMetricRequest request) {
return translate(metricsClient.getLogMetricCallable().futureCall(request), true,
Code.NOT_FOUND.value());
return translate(
metricsClient.getLogMetricCallable().futureCall(request), true, Code.NOT_FOUND);
}

@Override
Expand All @@ -237,8 +237,8 @@ public Future<ListLogMetricsResponse> list(ListLogMetricsRequest request) {

@Override
public Future<Empty> delete(DeleteLogMetricRequest request) {
return translate(metricsClient.deleteLogMetricCallable().futureCall(request), true,
Code.NOT_FOUND.value());
return translate(
metricsClient.deleteLogMetricCallable().futureCall(request), true, Code.NOT_FOUND);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,11 +546,8 @@ private static Builder createDefault() {
.writeLogEntriesSettings()
.getBundlingSettingsBuilder()
.setElementCountThreshold(1)
.setElementCountLimit(1000)
.setRequestByteThreshold(1024)
.setRequestByteLimit(10485760)
.setDelayThreshold(Duration.millis(10))
.setBlockingCallCountThreshold(1);
.setDelayThreshold(Duration.millis(10));
builder
.writeLogEntriesSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("non_idempotent"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.iam.v1.SetIamPolicyRequest;
import com.google.iam.v1.TestIamPermissionsRequest;
Expand Down Expand Up @@ -222,9 +221,6 @@ private static <V> V get(Future<V> future) {

private static <I, O> Future<O> transform(Future<I> future,
Function<? super I, ? extends O> function) {
if (future instanceof ListenableFuture) {
return Futures.transform((ListenableFuture<I>) future, function);
}
return Futures.lazyTransform(future, function);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package com.google.cloud.pubsub.spi;

import com.google.api.gax.core.ForwardingRpcFuture;
import com.google.api.gax.core.Function;
import com.google.api.gax.core.RpcFuture;
import com.google.api.gax.core.RpcFutureCallback;
import com.google.api.gax.grpc.ApiException;
import com.google.api.gax.grpc.ChannelProvider;
import com.google.api.gax.grpc.ExecutorProvider;
Expand All @@ -31,12 +35,7 @@
import com.google.cloud.pubsub.spi.v1.PublisherSettings;
import com.google.cloud.pubsub.spi.v1.SubscriberClient;
import com.google.cloud.pubsub.spi.v1.SubscriberSettings;
import com.google.common.base.Function;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.iam.v1.GetIamPolicyRequest;
import com.google.iam.v1.Policy;
import com.google.iam.v1.SetIamPolicyRequest;
Expand All @@ -62,18 +61,15 @@
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;

import io.grpc.ManagedChannel;
import io.grpc.Status.Code;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;

import org.joda.time.Duration;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import org.joda.time.Duration;

public class DefaultPubSubRpc implements PubSubRpc {

Expand Down Expand Up @@ -110,27 +106,26 @@ protected ChannelProvider getChannelProvider() {
}
}

private static final class PullFutureImpl
extends ForwardingListenableFuture.SimpleForwardingListenableFuture<PullResponse>
private static final class PullFutureImpl extends ForwardingRpcFuture<PullResponse>
implements PullFuture {

PullFutureImpl(ListenableFuture<PullResponse> delegate) {
PullFutureImpl(RpcFuture<PullResponse> delegate) {
super(delegate);
}

@Override
public void addCallback(final PullCallback callback) {
Futures.addCallback(delegate(), new FutureCallback<PullResponse>() {
@Override
public void onSuccess(PullResponse result) {
callback.success(result);
}

@Override
public void onFailure(Throwable error) {
callback.failure(error);
}
});
addCallback(
new RpcFutureCallback<PullResponse>() {
@Override
public void onSuccess(PullResponse response) {
callback.success(response);
}

@Override
public void onFailure(Throwable error) {
callback.failure(error);
}
});
}
}

Expand Down Expand Up @@ -178,21 +173,23 @@ public DefaultPubSubRpc(PubSubOptions options) throws IOException {
}
}

private static <V> ListenableFuture<V> translate(ListenableFuture<V> from,
final boolean idempotent, int... returnNullOn) {
private static <V> RpcFuture<V> translate(
RpcFuture<V> from, final boolean idempotent, int... returnNullOn) {
final Set<Integer> returnNullOnSet = Sets.newHashSetWithExpectedSize(returnNullOn.length);
for (int value : returnNullOn) {
returnNullOnSet.add(value);
}
return Futures.catching(from, ApiException.class, new Function<ApiException, V>() {
@Override
public V apply(ApiException exception) {
if (returnNullOnSet.contains(exception.getStatusCode().value())) {
return null;
}
throw new PubSubException(exception, idempotent);
}
});
return from.catching(
ApiException.class,
new Function<ApiException, V>() {
@Override
public V apply(ApiException exception) {
if (returnNullOnSet.contains(exception.getStatusCode().value())) {
return null;
}
throw new PubSubException(exception, idempotent);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.Topic;

import java.util.concurrent.Future;

public interface PubSubRpc extends AutoCloseable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,11 +527,8 @@ private static Builder createDefault() {
.publishSettings()
.getBundlingSettingsBuilder()
.setElementCountThreshold(10)
.setElementCountLimit(1000)
.setRequestByteThreshold(1024)
.setRequestByteLimit(10485760)
.setDelayThreshold(Duration.millis(10))
.setBlockingCallCountThreshold(1);
.setDelayThreshold(Duration.millis(10));
builder
.publishSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("one_plus_delivery"))
Expand Down

0 comments on commit cc6796a

Please sign in to comment.