Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-13144. Enhancing IPC client throughput via multiple connections per user #4542

Merged
merged 2 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1716,7 +1716,7 @@ public static class ConnectionId {
private String saslQop; // here for testing
private final Configuration conf; // used to get the expected kerberos principal name

ConnectionId(InetSocketAddress address, Class<?> protocol,
public ConnectionId(InetSocketAddress address, Class<?> protocol,
UserGroupInformation ticket, int rpcTimeout,
RetryPolicy connectionRetryPolicy, Configuration conf) {
this.protocol = protocol;
Expand Down Expand Up @@ -1760,7 +1760,7 @@ UserGroupInformation getTicket() {
return ticket;
}

private int getRpcTimeout() {
int getRpcTimeout() {
return rpcTimeout;
}

Expand Down Expand Up @@ -1794,6 +1794,10 @@ boolean getDoPing() {
int getPingInterval() {
return pingInterval;
}

RetryPolicy getRetryPolicy() {
return connectionRetryPolicy;
}

@VisibleForTesting
String getSaslQop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ public static AsyncGet<Message, Exception> getAsyncReturnMessage() {
return ASYNC_RETURN_MESSAGE.get();
}

@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
final Invoker invoker = new Invoker(protocol, connId, conf, factory);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
}

public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ public <T> ProtocolProxy<T> getProxy(
rpcTimeout, connectionRetryPolicy, null, null);
}

@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
final Invoker invoker = new Invoker(protocol, connId, conf, factory);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] {protocol}, invoker), false);
}

@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,29 @@ public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
return getProtocolProxy(protocol, clientVersion, addr, ticket, conf,
factory, getRpcTimeout(conf), null);
}

/**
* Get a protocol proxy that contains a proxy connection to a remote server
* and a set of methods that are supported by the server.
*
* @param <T> Generics Type T
* @param protocol protocol class
* @param clientVersion client's version
* @param connId client connection identifier
* @param conf configuration
* @param factory socket factory
* @return the protocol proxy
* @throws IOException if the far end through a RemoteException
*/
public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
long clientVersion, ConnectionId connId, Configuration conf,
SocketFactory factory) throws IOException {
if (UserGroupInformation.isSecurityEnabled()) {
SaslRpcServer.init(conf);
}
return getProtocolEngine(protocol, conf).getProxy(
protocol, clientVersion, connId, conf, factory);
}

/**
* Construct a client-side proxy that implements the named protocol,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ <T> ProtocolProxy<T> getProxy(Class<T> protocol,
SocketFactory factory, int rpcTimeout,
RetryPolicy connectionRetryPolicy) throws IOException;

/**
* Construct a client-side proxy object with a ConnectionId.
*
* @param <T> Generics Type T.
* @param protocol input protocol.
* @param clientVersion input clientVersion.
* @param connId input ConnectionId.
* @param conf input Configuration.
* @param factory input factory.
* @throws IOException raised on errors performing I/O.
* @return ProtocolProxy.
*/
<T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
Client.ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException;

/**
* Construct a client-side proxy object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,27 @@ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
rpcTimeout, connectionRetryPolicy, null, null);
}

/**
* Construct a client-side proxy object with a ConnectionId.
*
* @param <T> Generics Type T.
* @param protocol input protocol.
* @param clientVersion input clientVersion.
* @param connId input ConnectionId.
* @param conf input Configuration.
* @param factory input factory.
* @throws IOException raised on errors performing I/O.
* @return ProtocolProxy.
*/
@Override
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
Client.ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
return getProxy(protocol, clientVersion, connId.getAddress(),
connId.ticket, conf, factory, connId.getRpcTimeout(),
connId.getRetryPolicy(), null, null);
}

/**
* Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.ipc;

import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.metrics.RpcMetrics;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -289,6 +291,13 @@ public <T> ProtocolProxy<T> getProxy(
rpcTimeout, connectionRetryPolicy, null, null);
}

@Override
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
ConnectionId connId, Configuration conf, SocketFactory factory)
throws IOException {
throw new UnsupportedOperationException("This proxy is not supported");
}

@SuppressWarnings("unchecked")
@Override
public <T> ProtocolProxy<T> getProxy(
Expand Down Expand Up @@ -390,6 +399,53 @@ public void testProxyAddress() throws Exception {
}
}

@Test
public void testConnectionWithSocketFactory() throws IOException, ServiceException {
TestRpcService firstProxy = null;
TestRpcService secondProxy = null;

Configuration newConf = new Configuration(conf);
newConf.set(CommonConfigurationKeysPublic.
HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, "");

RetryPolicy retryPolicy = RetryUtils.getDefaultRetryPolicy(
newConf, "Test.No.Such.Key",
true,
"Test.No.Such.Key", "10000,6",
null);

// create a server with two handlers
Server server = setupTestServer(newConf, 2);
try {
// create the first client
firstProxy = getClient(addr, newConf);
// create the second client
secondProxy = getClient(addr, newConf);

firstProxy.ping(null, newEmptyRequest());
secondProxy.ping(null, newEmptyRequest());

Client client = ProtobufRpcEngine2.getClient(newConf);
assertEquals(1, client.getConnectionIds().size());

stop(null, firstProxy, secondProxy);
ProtobufRpcEngine2.clearClientCache();

// create the first client with index 1
firstProxy = getMultipleClientWithIndex(addr, newConf, retryPolicy, 1);
// create the second client with index 2
secondProxy = getMultipleClientWithIndex(addr, newConf, retryPolicy, 2);
firstProxy.ping(null, newEmptyRequest());
secondProxy.ping(null, newEmptyRequest());

Client client2 = ProtobufRpcEngine2.getClient(newConf);
assertEquals(2, client2.getConnectionIds().size());
} finally {
System.out.println("Down slow rpc testing");
stop(server, firstProxy, secondProxy);
}
}

@Test
public void testSlowRpc() throws IOException, ServiceException {
Server server;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.ipc;

import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
Expand All @@ -26,6 +28,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
import org.apache.hadoop.net.NetUtils;
Expand Down Expand Up @@ -154,11 +157,53 @@ protected static TestRpcService getClient(InetSocketAddress serverAddr,
}
}

protected static void stop(Server server, TestRpcService proxy) {
if (proxy != null) {
try {
RPC.stopProxy(proxy);
} catch (Exception ignored) {}
/**
* Try to obtain a proxy of TestRpcService with an index.
* @param serverAddr input server address
* @param clientConf input client configuration
* @param retryPolicy input retryPolicy
* @param index input index
* @return one proxy of TestRpcService
*/
protected static TestRpcService getMultipleClientWithIndex(InetSocketAddress serverAddr,
Configuration clientConf, RetryPolicy retryPolicy, int index)
throws ServiceException, IOException {
MockConnectionId connectionId = new MockConnectionId(serverAddr,
TestRpcService.class, UserGroupInformation.getCurrentUser(),
RPC.getRpcTimeout(clientConf), retryPolicy, clientConf, index);
return getClient(connectionId, clientConf);
}

/**
* Obtain a TestRpcService Proxy by a connectionId.
* @param connId input connectionId
* @param clientConf input configuration
* @return a TestRpcService Proxy
* @throws ServiceException a ServiceException
*/
protected static TestRpcService getClient(ConnectionId connId,
Configuration clientConf) throws ServiceException {
try {
return RPC.getProtocolProxy(
TestRpcService.class,
0,
connId,
clientConf,
NetUtils.getDefaultSocketFactory(clientConf)).getProxy();
} catch (IOException e) {
throw new ServiceException(e);
}
}

protected static void stop(Server server, TestRpcService... proxies) {
if (proxies != null) {
for (TestRpcService proxy : proxies) {
goiri marked this conversation as resolved.
Show resolved Hide resolved
if (proxy != null) {
try {
RPC.stopProxy(proxy);
} catch (Exception ignored) {}
}
}
}

if (server != null) {
Expand Down Expand Up @@ -189,6 +234,40 @@ protected static int countThreads(String search) {
return count;
}

public static class MockConnectionId extends ConnectionId {
private static final int PRIME = 16777619;
private final int index;

public MockConnectionId(InetSocketAddress address, Class<?> protocol,
UserGroupInformation ticket, int rpcTimeout, RetryPolicy connectionRetryPolicy,
Configuration conf, int index) {
super(address, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf);
this.index = index;
}

@Override
public int hashCode() {
goiri marked this conversation as resolved.
Show resolved Hide resolved
return new HashCodeBuilder()
.append(PRIME * super.hashCode())
.append(this.index)
.toHashCode();
}

@Override
public boolean equals(Object obj) {
if (!super.equals(obj)) {
return false;
}
if (obj instanceof MockConnectionId) {
MockConnectionId other = (MockConnectionId)obj;
return new EqualsBuilder()
.append(this.index, other.index)
.isEquals();
}
return false;
}
}

public static class TestTokenIdentifier extends TokenIdentifier {
private Text tokenid;
private Text realUser;
Expand Down