Skip to content

Commit

Permalink
Refresh name resolution if all addresses failed to connect.
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangkun83 committed Mar 24, 2016
1 parent 88d3174 commit 3be48b4
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 35 deletions.
55 changes: 42 additions & 13 deletions core/src/main/java/io/grpc/DnsNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.ExecutorService;

import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

/**
* A DNS-based {@link NameResolver}.
Expand All @@ -53,7 +54,12 @@ final class DnsNameResolver extends NameResolver {
private final String authority;
private final String host;
private final int port;
@GuardedBy("this")
private ExecutorService executor;
@GuardedBy("this")
private boolean resolving;
@GuardedBy("this")
private Listener listener;

DnsNameResolver(@Nullable String nsAuthority, String name, Attributes params) {
// TODO: if a DNS server is provided as nsAuthority, use it.
Expand Down Expand Up @@ -84,27 +90,50 @@ public String getServiceAuthority() {
}

@Override
public synchronized void start(final Listener listener) {
public synchronized void start(Listener listener) {
Preconditions.checkState(executor == null, "already started");
executor = SharedResourceHolder.get(GrpcUtil.SHARED_CHANNEL_EXECUTOR);
this.listener = listener;
resolve();
}

@Override
public synchronized void refresh() {
Preconditions.checkState(executor != null, "not started");
resolve();
}

@GuardedBy("this")
private void resolve() {
if (resolving) {
return;
}
resolving = true;
final Listener savedListener = Preconditions.checkNotNull(listener);
executor.execute(new Runnable() {
@Override
public void run() {
InetAddress[] inetAddrs;
try {
inetAddrs = InetAddress.getAllByName(host);
} catch (Exception e) {
listener.onError(Status.UNAVAILABLE.withCause(e));
return;
}
ArrayList<ResolvedServerInfo> servers
= new ArrayList<ResolvedServerInfo>(inetAddrs.length);
for (int i = 0; i < inetAddrs.length; i++) {
InetAddress inetAddr = inetAddrs[i];
servers.add(
new ResolvedServerInfo(new InetSocketAddress(inetAddr, port), Attributes.EMPTY));
try {
inetAddrs = InetAddress.getAllByName(host);
} catch (Exception e) {
savedListener.onError(Status.UNAVAILABLE.withCause(e));
return;
}
ArrayList<ResolvedServerInfo> servers =
new ArrayList<ResolvedServerInfo>(inetAddrs.length);
for (int i = 0; i < inetAddrs.length; i++) {
InetAddress inetAddr = inetAddrs[i];
servers.add(
new ResolvedServerInfo(new InetSocketAddress(inetAddr, port), Attributes.EMPTY));
}
savedListener.onUpdate(servers, Attributes.EMPTY);
} finally {
synchronized (DnsNameResolver.this) {
resolving = false;
}
}
listener.onUpdate(servers, Attributes.EMPTY);
}
});
}
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/io/grpc/NameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ public abstract class NameResolver {
*/
public abstract void shutdown();

/**
* Re-resolve the name.
*
* <p>Can only be called after {@link #start} has been called.
*
* <p>This is only a hint. Implementation takes it as a signal but may not start resolution
* immediately.
*/
public abstract void refresh();

public abstract static class Factory {
/**
* The port number used in case the target or the underlying naming system doesn't provide a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ public void start(final Listener listener) {
Attributes.EMPTY);
}

@Override
public void refresh() {}

@Override
public void shutdown() {}
};
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,11 @@ public void onTerminated() {
maybeTerminateChannel();
}
}

@Override
public void onAllAddressesFailed() {
nameResolver.refresh();
}
});
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] {1} created for {2}",
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/io/grpc/internal/TransportSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ public void transportReady() {

@Override
public void transportShutdown(Status s) {
boolean allAddressesFailed = false;
if (log.isLoggable(Level.FINE)) {
log.log(Level.FINE, "[{0}] {1} for {2} is being shutdown with status {3}",
new Object[] {getLogId(), transport.getLogId(), address, s});
Expand All @@ -359,6 +360,7 @@ public void transportShutdown(Status s) {
// Continue reconnect if there are still addresses to try.
// Fail if all addresses have been tried and failed in a row.
if (nextAddressIndex == 0) {
allAddressesFailed = true;
delayedTransport.setTransport(new FailingClientTransport(s));
delayedTransport.shutdown();
activeTransport = null;
Expand All @@ -368,6 +370,9 @@ public void transportShutdown(Status s) {
}
}
loadBalancer.handleTransportShutdown(addressGroup, s);
if (allAddressesFailed) {
callback.onAllAddressesFailed();
}
}

@Override
Expand All @@ -385,5 +390,7 @@ public void transportTerminated() {

interface Callback {
void onTerminated();

void onAllAddressesFailed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ private static class FakeNameResolver extends NameResolver {

@Override public void start(final Listener listener) {}

@Override public void refresh() {}

@Override public void shutdown() {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,8 @@ private class FakeNameResolver extends NameResolver {
}
}

@Override public void refresh() {}

void resolved() {
listener.onUpdate(servers, Attributes.EMPTY);
}
Expand Down Expand Up @@ -639,6 +641,8 @@ public NameResolver newNameResolver(URI notUsedUri, Attributes params) {
listener.onError(error);
}

@Override public void refresh() {}

@Override public void shutdown() {}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,27 +87,6 @@
public class ManagedChannelImplTransportManagerTest {

private static final String authority = "fakeauthority";
private static final NameResolver.Factory nameResolverFactory = new NameResolver.Factory() {
@Override
public NameResolver newNameResolver(final URI targetUri, Attributes params) {
return new NameResolver() {
@Override public void start(final Listener listener) {
}

@Override public String getServiceAuthority() {
return authority;
}

@Override public void shutdown() {
}
};
}

@Override
public String getDefaultScheme() {
return "fake";
}
};

private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final MethodDescriptor<String, String> method = MethodDescriptor.create(
Expand All @@ -121,6 +100,8 @@ public String getDefaultScheme() {

@Mock private ClientTransportFactory mockTransportFactory;
@Mock private LoadBalancer.Factory mockLoadBalancerFactory;
@Mock private NameResolver mockNameResolver;
@Mock private NameResolver.Factory mockNameResolverFactory;
@Mock private BackoffPolicy.Provider mockBackoffPolicyProvider;
@Mock private BackoffPolicy mockBackoffPolicy;

Expand All @@ -133,20 +114,26 @@ public void setUp() {
MockitoAnnotations.initMocks(this);

when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy);
when(mockNameResolver.getServiceAuthority()).thenReturn(authority);
when(mockNameResolverFactory
.newNameResolver(any(URI.class), any(Attributes.class)))
.thenReturn(mockNameResolver);
@SuppressWarnings("unchecked")
LoadBalancer<ClientTransport> loadBalancer = mock(LoadBalancer.class);
when(mockLoadBalancerFactory
.newLoadBalancer(anyString(), Matchers.<TransportManager<ClientTransport>>any()))
.thenReturn(loadBalancer);

channel = new ManagedChannelImpl("fake://target", mockBackoffPolicyProvider,
nameResolverFactory, Attributes.EMPTY, mockLoadBalancerFactory,
mockNameResolverFactory, Attributes.EMPTY, mockLoadBalancerFactory,
mockTransportFactory, DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance(), executor, null,
Collections.<ClientInterceptor>emptyList());

ArgumentCaptor<TransportManager<ClientTransport>> tmCaptor
= ArgumentCaptor.forClass(null);
verify(mockNameResolverFactory).newNameResolver(any(URI.class), any(Attributes.class));
verify(mockNameResolver).start(any(NameResolver.Listener.class));
verify(mockLoadBalancerFactory).newLoadBalancer(anyString(), tmCaptor.capture());
tm = tmCaptor.getValue();
transports = TestUtils.captureTransports(mockTransportFactory);
Expand Down Expand Up @@ -226,6 +213,8 @@ public void reconnect() throws Exception {
// Back-off policy was never consulted.
verify(mockBackoffPolicy, times(0)).nextBackoffMillis();
verifyNoMoreInteractions(mockTransportFactory);
// Never refreshed NameResolver
verify(mockNameResolver, times(0)).refresh();
}

@Test
Expand All @@ -239,6 +228,7 @@ public void reconnectWithBackoff() throws Exception {
int transportsAddr2 = 0;
int backoffConsulted = 0;
int backoffReset = 0;
int nameResolverRefresh = 0;

// First pick succeeds
ClientTransport t1 = tm.getTransport(addressGroup);
Expand All @@ -251,6 +241,7 @@ public void reconnectWithBackoff() throws Exception {
transportInfo.listener.transportReady();
// Then close it
transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
verify(mockNameResolver, times(nameResolverRefresh)).refresh();

// Second pick fails. This is the beginning of a series of failures.
ClientTransport t2 = tm.getTransport(addressGroup);
Expand All @@ -260,6 +251,7 @@ public void reconnectWithBackoff() throws Exception {
// Back-off policy was reset.
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
transports.poll(1, TimeUnit.SECONDS).listener.transportShutdown(Status.UNAVAILABLE);
verify(mockNameResolver, times(nameResolverRefresh)).refresh();

// Third pick fails too
ClientTransport t3 = tm.getTransport(addressGroup);
Expand All @@ -269,6 +261,7 @@ public void reconnectWithBackoff() throws Exception {
// Back-off policy was not reset.
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
transports.poll(1, TimeUnit.SECONDS).listener.transportShutdown(Status.UNAVAILABLE);
verify(mockNameResolver, times(++nameResolverRefresh)).refresh();

// Forth pick is on the first address, back-off policy kicks in.
ClientTransport t4 = tm.getTransport(addressGroup);
Expand All @@ -278,6 +271,7 @@ public void reconnectWithBackoff() throws Exception {
// Back-off policy was not reset, but was consulted.
verify(mockBackoffPolicyProvider, times(backoffReset)).get();
verify(mockBackoffPolicy, times(++backoffConsulted)).nextBackoffMillis();
verify(mockNameResolver, times(nameResolverRefresh)).refresh();
}

@Test
Expand Down
10 changes: 10 additions & 0 deletions core/src/test/java/io/grpc/internal/TransportSetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,15 @@ public class TransportSetTest {
int backoff1Consulted = 0;
int backoff2Consulted = 0;
int backoffReset = 0;
int onAllAddressesFailed = 0;

// First attempt
transportSet.obtainActiveTransport();
verify(mockBackoffPolicyProvider, times(++backoffReset)).get();
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Fail this one
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
verify(mockTransportSetCallback, times(++onAllAddressesFailed)).onAllAddressesFailed();

// Second attempt uses the first back-off value interval.
transportSet.obtainActiveTransport();
Expand All @@ -137,6 +139,7 @@ public class TransportSetTest {
verify(mockTransportFactory, times(++transportsCreated)).newClientTransport(addr, authority);
// Fail this one too
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
verify(mockTransportSetCallback, times(++onAllAddressesFailed)).onAllAddressesFailed();

// Third attempt uses the second back-off interval.
transportSet.obtainActiveTransport();
Expand All @@ -151,6 +154,7 @@ public class TransportSetTest {
transports.peek().listener.transportReady();
// And close it
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
verify(mockTransportSetCallback, times(onAllAddressesFailed)).onAllAddressesFailed();

// Back-off is reset, and the next attempt will happen immediately
transportSet.obtainActiveTransport();
Expand All @@ -174,6 +178,7 @@ public class TransportSetTest {
int backoff2Consulted = 0;
int backoff3Consulted = 0;
int backoffReset = 0;
int onAllAddressesFailed = 0;

// First attempt
DelayedClientTransport delayedTransport1 =
Expand All @@ -183,6 +188,7 @@ public class TransportSetTest {
// Let this one fail without success
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertNull(delayedTransport1.getTransportSupplier());
verify(mockTransportSetCallback, times(onAllAddressesFailed)).onAllAddressesFailed();

// Second attempt will start immediately. Keep back-off policy.
DelayedClientTransport delayedTransport2 =
Expand All @@ -194,6 +200,7 @@ public class TransportSetTest {
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// All addresses have failed. Delayed transport will see an error.
assertTrue(delayedTransport2.getTransportSupplier().get() instanceof FailingClientTransport);
verify(mockTransportSetCallback, times(++onAllAddressesFailed)).onAllAddressesFailed();

// Third attempt is the first address, thus controlled by the first back-off interval.
DelayedClientTransport delayedTransport3 =
Expand All @@ -208,6 +215,7 @@ public class TransportSetTest {
// Fail this one too
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
assertNull(delayedTransport3.getTransportSupplier());
verify(mockTransportSetCallback, times(onAllAddressesFailed)).onAllAddressesFailed();

// Forth attempt will start immediately. Keep back-off policy.
DelayedClientTransport delayedTransport4 =
Expand All @@ -219,6 +227,7 @@ public class TransportSetTest {
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
// All addresses have failed again. Delayed transport will see an error
assertTrue(delayedTransport4.getTransportSupplier().get() instanceof FailingClientTransport);
verify(mockTransportSetCallback, times(++onAllAddressesFailed)).onAllAddressesFailed();

// Fifth attempt for the first address, thus controlled by the second back-off interval.
DelayedClientTransport delayedTransport5 =
Expand All @@ -236,6 +245,7 @@ public class TransportSetTest {
assertSame(transports.peek().transport, delayedTransport5.getTransportSupplier().get());
// Then close it.
transports.poll().listener.transportShutdown(Status.UNAVAILABLE);
verify(mockTransportSetCallback, times(onAllAddressesFailed)).onAllAddressesFailed();

// First attempt after a successful connection. Reset back-off policy, and start from the first
// address.
Expand Down

0 comments on commit 3be48b4

Please sign in to comment.