Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

feat: support resolving meta host through DNS #60

Merged
merged 14 commits into from
Oct 26, 2019
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@
<version>4.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-validator</groupId>
<artifactId>commons-validator</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.24.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
Expand Down
1 change: 1 addition & 0 deletions scripts/format-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ SRC_FILES=(src/main/java/com/xiaomi/infra/pegasus/client/*.java
src/test/java/com/xiaomi/infra/pegasus/metrics/*.java
src/test/java/com/xiaomi/infra/pegasus/rpc/async/*.java
src/test/java/com/xiaomi/infra/pegasus/tools/*.java
src/test/java/com/xiaomi/infra/pegasus/base/*.java
)

if [ ! -f "${PROJECT_DIR}"/google-java-format-1.7-all-deps.jar ]; then
Expand Down
40 changes: 20 additions & 20 deletions src/main/java/com/xiaomi/infra/pegasus/base/rpc_address.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,19 @@
*/
package com.xiaomi.infra.pegasus.base;

import com.xiaomi.infra.pegasus.thrift.*;
import com.xiaomi.infra.pegasus.thrift.async.*;
import com.xiaomi.infra.pegasus.thrift.meta_data.*;
import com.xiaomi.infra.pegasus.thrift.protocol.*;
import com.xiaomi.infra.pegasus.thrift.transport.*;
import com.xiaomi.infra.pegasus.thrift.TBase;
import com.xiaomi.infra.pegasus.thrift.TException;
import com.xiaomi.infra.pegasus.thrift.TFieldIdEnum;
import com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData;
import com.xiaomi.infra.pegasus.thrift.protocol.TProtocol;
import com.xiaomi.infra.pegasus.thrift.protocol.TStruct;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.*;

public class rpc_address
public final class rpc_address
implements TBase<rpc_address, rpc_address._Fields>, java.io.Serializable, Cloneable {
private static final TStruct STRUCT_DESC = new TStruct("rpc_address");

Expand Down Expand Up @@ -121,12 +120,9 @@ public boolean fromString(String ipPort) {
}

try {
// TODO(wutao1): getByName will query DNS if the given address is not valid ip:port.
byte[] byteArray = InetAddress.getByName(pairs[0]).getAddress();
ip =
((int) (byteArray[0] & 0xff) << 24)
| ((int) (byteArray[1] & 0xff) << 16)
| ((int) (byteArray[2] & 0xff) << 8)
| ((int) (byteArray[3] & 0xff));
ip = ByteBuffer.wrap(byteArray).order(ByteOrder.BIG_ENDIAN).getInt();
} catch (UnknownHostException e) {
return false;
}
Expand All @@ -135,6 +131,12 @@ public boolean fromString(String ipPort) {
address = ((long) ip << 32) + ((long) port << 16) + 1;
return true;
}

public static rpc_address fromIpPort(String ipPort) {
rpc_address addr = new rpc_address();
return addr.fromString(ipPort) ? addr : null;
}

/** Performs a deep copy on <i>other</i>. */
public rpc_address(rpc_address other) {
this.address = other.address;
Expand Down Expand Up @@ -192,10 +194,8 @@ public int hashCode() {
}

public int compareTo(rpc_address other) {
if (!getClass().equals(other.getClass())) {
return getClass().getName().compareTo(other.getClass().getName());
}

if (address < other.address) return -1;
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
if (address > other.address) return 1;
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2019, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.rpc.async;

import com.xiaomi.infra.pegasus.base.rpc_address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

/*
* Resolves host:port into a set of ip addresses.
* The intention of this class is to mock DNS.
*/
public class HostNameResolver {

public rpc_address[] resolve(String hostPort) throws IllegalArgumentException {
String[] pairs = hostPort.split(":");
if (pairs.length != 2) {
throw new IllegalArgumentException("Meta server host name format error!");
}

try {
Integer port = Integer.valueOf(pairs[1]);
InetAddress[] resolvedAddresses = InetAddress.getAllByName(pairs[0]);
rpc_address[] results = new rpc_address[resolvedAddresses.length];
int size = 0;
for (InetAddress addr : resolvedAddresses) {
rpc_address rpcAddr = new rpc_address();
int ip = ByteBuffer.wrap(addr.getAddress()).order(ByteOrder.BIG_ENDIAN).getInt();
rpcAddr.address = ((long) ip << 32) + ((long) port << 16) + 1;
results[size++] = rpcAddr;
}
return results;
} catch (UnknownHostException e) {
return null;
}
}
}
113 changes: 94 additions & 19 deletions src/main/java/com/xiaomi/infra/pegasus/rpc/async/MetaSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,40 @@
import com.xiaomi.infra.pegasus.operator.query_cfg_operator;
import com.xiaomi.infra.pegasus.replication.partition_configuration;
import io.netty.channel.EventLoopGroup;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import org.apache.commons.validator.routines.InetAddressValidator;

/** Created by weijiesun on 17-9-13. */
public class MetaSession {
public class MetaSession extends HostNameResolver {
public MetaSession(
ClusterManager manager,
String addrList[],
String[] addrList,
int eachQueryTimeoutInMills,
int defaultMaxQueryCount,
EventLoopGroup g)
throws IllegalArgumentException {
clusterManager = manager;
metaList = new ArrayList<ReplicaSession>();
for (String addr : addrList) {
rpc_address rpc_addr = new rpc_address();
if (rpc_addr.fromString(addr)) {
logger.info("add {} as meta server", addr);
metaList.add(clusterManager.getReplicaSession(rpc_addr));
} else {
logger.error("invalid address {}", addr);

if (addrList.length == 1 && !InetAddressValidator.getInstance().isValid(addrList[0])) {
// if the given string is not a valid ip address,
// then take it as a hostname for a try.
resolveHost(addrList[0]);
if (!metaList.isEmpty()) {
hostPort = addrList[0];
}
} else {
for (String addr : addrList) {
rpc_address rpcAddr = new rpc_address();
if (rpcAddr.fromString(addr)) {
logger.info("add {} as meta server", addr);
metaList.add(clusterManager.getReplicaSession(rpcAddr));
} else {
logger.error("invalid address {}", addr);
}
}
}
if (metaList.isEmpty()) {
Expand All @@ -46,13 +55,13 @@ public MetaSession(
this.group = g;
}

public static final error_types getMetaServiceError(client_operator metaQueryOp) {
public static error_types getMetaServiceError(client_operator metaQueryOp) {
if (metaQueryOp.rpc_error.errno != error_types.ERR_OK) return metaQueryOp.rpc_error.errno;
query_cfg_operator op = (query_cfg_operator) metaQueryOp;
return op.get_response().getErr().errno;
}

public static final rpc_address getMetaServiceForwardAddress(client_operator metaQueryOp) {
public static rpc_address getMetaServiceForwardAddress(client_operator metaQueryOp) {
if (metaQueryOp.rpc_error.errno != error_types.ERR_OK) return null;
query_cfg_operator op = (query_cfg_operator) metaQueryOp;
if (op.get_response().getErr().errno != error_types.ERR_FORWARD_TO_OTHERS) return null;
Expand Down Expand Up @@ -104,7 +113,7 @@ public final void closeSession() {
}
}

private final void asyncCall(final MetaRequestRound round) {
private void asyncCall(final MetaRequestRound round) {
round.lastSession.asyncSend(
round.op,
new Runnable() {
Expand All @@ -116,7 +125,7 @@ public void run() {
eachQueryTimeoutInMills);
}

private final void onFinishQueryMeta(final MetaRequestRound round) {
void onFinishQueryMeta(final MetaRequestRound round) {
client_operator op = round.op;

boolean needDelay = false;
Expand Down Expand Up @@ -177,6 +186,17 @@ private final void onFinishQueryMeta(final MetaRequestRound round) {
}
} else if (metaList.get(curLeader) == round.lastSession) {
curLeader = (curLeader + 1) % metaList.size();
// try refresh the meta list from DNS
// maxResolveCount and "maxQueryCount refresh" is necessary:
// for example, maxQueryCount=5, the first error metalist size = 3, when trigger dns
// refresh, the "maxQueryCount" may change to 2, the client may can't choose the right
// leader when the new metaList size > 2 after retry 2 time. but if the "maxQueryCount"
// refresh, the retry will not stop if no maxResolveCount when the meta is error.
if (curLeader == 0 && hostPort != null && round.maxResolveCount != 0) {
resolveHost(hostPort);
round.maxResolveCount--;
round.maxQueryCount = metaList.size();
}
}
}
round.lastSession = metaList.get(curLeader);
Expand All @@ -187,6 +207,10 @@ private final void onFinishQueryMeta(final MetaRequestRound round) {
return;
}

retryQueryMeta(round, needDelay);
}

void retryQueryMeta(final MetaRequestRound round, boolean needDelay) {
group.schedule(
new Runnable() {
@Override
Expand All @@ -198,26 +222,77 @@ public void run() {
TimeUnit.SECONDS);
}

private static final class MetaRequestRound {
static final class MetaRequestRound {
public int maxResolveCount = 2;

public client_operator op;
public Runnable callbackFunc;
public int maxQueryCount;
public ReplicaSession lastSession;

public MetaRequestRound(client_operator o, Runnable r, int q, ReplicaSession l) {
public MetaRequestRound(client_operator o, Runnable r, int c, ReplicaSession l) {
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
op = o;
callbackFunc = r;
maxQueryCount = q;
maxQueryCount = c;
lastSession = l;
}
}

/*
* Resolves hostname:port into a set of ip addresses.
*/
void resolveHost(String hostPort) throws IllegalArgumentException {
rpc_address[] addrs = resolve(hostPort);
if (addrs == null) {
logger.error("failed to resolve address \"{}\" as host:port", hostPort);
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
return;
}

Set<rpc_address> newSet = new TreeSet<rpc_address>(Arrays.asList(addrs));
Set<rpc_address> oldSet = new TreeSet<rpc_address>();
for (ReplicaSession meta : metaList) {
oldSet.add(meta.getAddress());
}

// fast path: do nothing if meta list is unchanged.
if (newSet.equals(oldSet)) {
return;
}

// removed metas
Set<rpc_address> removed = new HashSet<rpc_address>(oldSet);
removed.removeAll(newSet);
for (rpc_address addr : removed) {
logger.info("meta server {} was removed", addr);
for (int i = 0; i < metaList.size(); i++) {
if (metaList.get(i).getAddress().equals(addr)) {
ReplicaSession session = metaList.remove(i);
session.closeSession();
}
}
}

// newly added metas
Set<rpc_address> added = new HashSet<rpc_address>(newSet);
added.removeAll(oldSet);
for (rpc_address addr : added) {
metaList.add(clusterManager.getReplicaSession(addr));
logger.info("add {} as meta server", addr);
}
}

// Only for test.
List<ReplicaSession> getMetaList() {
return metaList;
}

private ClusterManager clusterManager;
private List<ReplicaSession> metaList;
private int curLeader;
private int eachQueryTimeoutInMills;
private int defaultMaxQueryCount;
private EventLoopGroup group;
private String hostPort;

private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(MetaSession.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ public void closeSession() {
VolatileFields f = fields;
if (f.state == ConnState.CONNECTED && f.nettyChannel != null) {
try {
// close().sync() means calling system API `close()` synchronously,
// but the connection may not be completely closed then, that is,
// the state may not be marked as DISCONNECTED immediately.
f.nettyChannel.close().sync();
logger.info("channel to {} closed", address.toString());
} catch (Exception ex) {
Expand All @@ -146,7 +149,7 @@ public final rpc_address getAddress() {
return address;
}

private void doConnect() {
void doConnect() {
try {
// we will receive the channel connect event in DefaultHandler.ChannelActive
boot.connect(address.get_ip(), address.get_port())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ public void onCompletion(client_operator op) throws Throwable {
} catch (ExecutionException e) {
logger.info("got exception: " + e);
throw new ReplicationException(e);

foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
} catch (TimeoutException e) {
op.rpc_error.errno = error_types.ERR_TIMEOUT;
}
Expand Down
Loading