Skip to content

Commit

Permalink
Router (#6)
Browse files Browse the repository at this point in the history
async router
  • Loading branch information
KeeProMise authored May 18, 2024
1 parent 41eacf4 commit 58f7ed9
Show file tree
Hide file tree
Showing 53 changed files with 8,953 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
private static final ThreadLocal<Object> EXTERNAL_CALL_HANDLER
= new ThreadLocal<>();
public static final ThreadLocal<CompletableFuture<Object>> COMPLETABLE_FUTURE_THREAD_LOCAL
= new ThreadLocal<>();
private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
ASYNC_RPC_RESPONSE = new ThreadLocal<>();
private static final ThreadLocal<Boolean> asynchronousMode =
Expand Down Expand Up @@ -283,6 +285,7 @@ static class Call {
boolean done; // true when call is done
private final Object externalHandler;
private AlignmentContext alignmentContext;
private CompletableFuture<Object> completableFuture;

private Call(RPC.RpcKind rpcKind, Writable param) {
this.rpcKind = rpcKind;
Expand All @@ -304,6 +307,7 @@ private Call(RPC.RpcKind rpcKind, Writable param) {
}

this.externalHandler = EXTERNAL_CALL_HANDLER.get();
this.completableFuture = COMPLETABLE_FUTURE_THREAD_LOCAL.get();
}

@Override
Expand All @@ -322,6 +326,9 @@ protected synchronized void callComplete() {
externalHandler.notify();
}
}
if (completableFuture != null) {
completableFuture.complete(this);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,9 +384,6 @@ public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
*/
public static class Server extends ProtobufRpcEngine2.Server {

static final ThreadLocal<ProtobufRpcEngineCallback> currentCallback =
new ThreadLocal<>();

static class ProtobufRpcEngineCallbackImpl
implements ProtobufRpcEngineCallback {

Expand All @@ -397,32 +394,33 @@ static class ProtobufRpcEngineCallbackImpl

public ProtobufRpcEngineCallbackImpl() {
this.server = CURRENT_CALL_INFO.get().getServer();
this.call = Server.getCurCall().get();
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
this.setupTime = Time.now();
this.call = Server.getCurCall().get();
this.call.deferResponse();
this.setupTime = Time.monotonicNowNanos();
}

@Override
public void setResponse(Message message) {
long processingTime = Time.now() - setupTime;
call.setDeferredResponse(RpcWritable.wrap(message));
long processingTime =
Time.monotonicNow() - setupTime / Time.NANOSECONDS_PER_MILLISECOND;
call.setDeferredResponse(RpcWritable.wrap(message), setupTime);
server.updateDeferredMetrics(methodName, processingTime);
}

@Override
public void error(Throwable t) {
long processingTime = Time.now() - setupTime;
long processingTime =
Time.monotonicNow() - setupTime / Time.NANOSECONDS_PER_MILLISECOND;
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(detailedMetricsName, processingTime);
call.setDeferredError(t);
call.setDeferredError(t, setupTime);
}
}

@InterfaceStability.Unstable
public static ProtobufRpcEngineCallback registerForDeferredResponse() {
ProtobufRpcEngineCallback callback = new ProtobufRpcEngineCallbackImpl();
currentCallback.set(callback);
return callback;
return new ProtobufRpcEngineCallbackImpl();
}

/**
Expand Down Expand Up @@ -482,11 +480,7 @@ static RpcWritable processCall(RPC.Server server,
CURRENT_CALL_INFO.set(new CallInfo(server, methodName));
currentCall.setDetailedMetricsName(methodName);
result = service.callBlockingMethod(methodDescriptor, null, param);
// Check if this needs to be a deferred response,
// by checking the ThreadLocal callback being set
if (currentCallback.get() != null) {
currentCall.deferResponse();
currentCallback.set(null);
if (currentCall.isResponseDeferred()) {
return null;
}
} catch (ServiceException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,6 @@ public static void clearClientCache() {

public static class Server extends RPC.Server {

static final ThreadLocal<ProtobufRpcEngineCallback2> CURRENT_CALLBACK =
new ThreadLocal<>();

static final ThreadLocal<CallInfo> CURRENT_CALL_INFO = new ThreadLocal<>();

static class CallInfo {
Expand Down Expand Up @@ -429,32 +426,44 @@ static class ProtobufRpcEngineCallbackImpl

ProtobufRpcEngineCallbackImpl() {
this.server = CURRENT_CALL_INFO.get().getServer();
this.call = Server.getCurCall().get();
this.methodName = CURRENT_CALL_INFO.get().getMethodName();
this.setupTime = Time.now();
this.call = Server.getCurCall().get();
this.call.deferResponse();
this.setupTime = Time.monotonicNowNanos();
}

@Override
public void setResponse(Message message) {
long processingTime = Time.now() - setupTime;
call.setDeferredResponse(RpcWritable.wrap(message));
long processingTime =
Time.monotonicNow() - setupTime / Time.NANOSECONDS_PER_MILLISECOND;
call.setDeferredResponse(RpcWritable.wrap(message), setupTime);
server.updateDeferredMetrics(methodName, processingTime);
}

@Override
public void error(Throwable t) {
long processingTime = Time.now() - setupTime;
long processingTime =
Time.monotonicNow() - setupTime / Time.NANOSECONDS_PER_MILLISECOND;
String detailedMetricsName = t.getClass().getSimpleName();
server.updateDeferredMetrics(detailedMetricsName, processingTime);
call.setDeferredError(t);
call.setDeferredError(t, setupTime);
LOG.info("zjtest {} error:", this.toString(), t);
}

@Override
public String toString() {
return "ProtobufRpcEngineCallbackImpl{" +
"server=" + server +
", call=" + call +
", methodName='" + methodName + '\'' +
", setupTime=" + setupTime +
'}';
}
}

@InterfaceStability.Unstable
public static ProtobufRpcEngineCallback2 registerForDeferredResponse2() {
ProtobufRpcEngineCallback2 callback = new ProtobufRpcEngineCallbackImpl();
CURRENT_CALLBACK.set(callback);
return callback;
return new ProtobufRpcEngineCallbackImpl();
}

/**
Expand Down Expand Up @@ -619,11 +628,7 @@ private RpcWritable call(RPC.Server server,
CURRENT_CALL_INFO.set(new CallInfo(server, methodName));
currentCall.setDetailedMetricsName(methodName);
result = service.callBlockingMethod(methodDescriptor, null, param);
// Check if this needs to be a deferred response,
// by checking the ThreadLocal callback being set
if (CURRENT_CALLBACK.get() != null) {
currentCall.deferResponse();
CURRENT_CALLBACK.set(null);
if (currentCall.isResponseDeferred()) {
return null;
}
} catch (ServiceException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1157,10 +1157,10 @@ public boolean isResponseDeferred() {
return this.deferredResponse;
}

public void setDeferredResponse(Writable response) {
public void setDeferredResponse(Writable response, long setupTime) {
}

public void setDeferredError(Throwable t) {
public void setDeferredError(Throwable t, long setupTime) {
}

public long getTimestampNanos() {
Expand Down Expand Up @@ -1252,24 +1252,30 @@ public Void run() throws Exception {
populateResponseParamsOnError(e, responseParams);
}
if (!isResponseDeferred()) {
long deltaNanos = Time.monotonicNowNanos() - startNanos;
ProcessingDetails details = getProcessingDetails();

details.set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);

setResponseFields(value, responseParams);
sendResponse();
details.setReturnStatus(responseParams.returnStatus);
sendResponse(value, responseParams, startNanos);
} else {
LOG.debug("Deferring response for callId: {}", this.callId);
}
return null;
}

private void sendResponse(
Writable response,
ResponseParams responseParams, long startNanos) throws IOException {
long deltaNanos = Time.monotonicNowNanos() - startNanos;
ProcessingDetails details = getProcessingDetails();

details.set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);

setResponseFields(response, responseParams);
sendResponse();
details.setReturnStatus(responseParams.returnStatus);
}

/**
* @param t the {@link java.lang.Throwable} to use to set
* errorInfo
Expand Down Expand Up @@ -1326,29 +1332,11 @@ void doResponse(Throwable t, RpcStatusProto status) throws IOException {
connection.sendResponse(call);
}

/**
* Send a deferred response, ignoring errors.
*/
private void sendDeferedResponse() {
try {
connection.sendResponse(this);
} catch (Exception e) {
// For synchronous calls, application code is done once it's returned
// from a method. It does not expect to receive an error.
// This is equivalent to what happens in synchronous calls when the
// Responder is not able to send out the response.
LOG.error("Failed to send deferred response. ThreadName=" + Thread
.currentThread().getName() + ", CallId="
+ callId + ", hostname=" + getHostAddress());
}
}

@Override
public void setDeferredResponse(Writable response) {
public void setDeferredResponse(Writable response, long setupTime) {
if (this.connection.getServer().running) {
try {
setupResponse(this, RpcStatusProto.SUCCESS, null, response,
null, null);
sendResponse(response, new ResponseParams(), setupTime);
} catch (IOException e) {
// For synchronous calls, application code is done once it has
// returned from a method. It does not expect to receive an error.
Expand All @@ -1357,14 +1345,20 @@ public void setDeferredResponse(Writable response) {
LOG.error(
"Failed to setup deferred successful response. ThreadName=" +
Thread.currentThread().getName() + ", Call=" + this);
return;
} catch (Exception e) {
// For synchronous calls, application code is done once it's returned
// from a method. It does not expect to receive an error.
// This is equivalent to what happens in synchronous calls when the
// Responder is not able to send out the response.
LOG.error("Failed to send deferred response. ThreadName=" + Thread
.currentThread().getName() + ", CallId="
+ callId + ", hostname=" + getHostAddress());
}
sendDeferedResponse();
}
}

@Override
public void setDeferredError(Throwable t) {
public void setDeferredError(Throwable t, long setupTime) {
if (this.connection.getServer().running) {
if (t == null) {
t = new IOException(
Expand All @@ -1373,9 +1367,7 @@ public void setDeferredError(Throwable t) {
try {
ResponseParams responseParams = new ResponseParams();
populateResponseParamsOnError(t, responseParams);
setupResponse(this, responseParams.returnStatus,
responseParams.detailedErr,
null, responseParams.errorClass, responseParams.error);
sendResponse(null, responseParams, setupTime);
} catch (IOException e) {
// For synchronous calls, application code is done once it has
// returned from a method. It does not expect to receive an error.
Expand All @@ -1384,8 +1376,15 @@ public void setDeferredError(Throwable t) {
LOG.error(
"Failed to setup deferred error response. ThreadName=" +
Thread.currentThread().getName() + ", Call=" + this);
} catch (Exception e) {
// For synchronous calls, application code is done once it's returned
// from a method. It does not expect to receive an error.
// This is equivalent to what happens in synchronous calls when the
// Responder is not able to send out the response.
LOG.error("Failed to send deferred response. ThreadName=" + Thread
.currentThread().getName() + ", CallId="
+ callId + ", hostname=" + getHostAddress());
}
sendDeferedResponse();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ public class RefreshUserMappingsProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, RefreshUserMappingsProtocol, Closeable {

/** RpcController is not used and hence is set to null */
private final static RpcController NULL_CONTROLLER = null;
protected final static RpcController NULL_CONTROLLER = null;
private final RefreshUserMappingsProtocolPB rpcProxy;

private final static RefreshUserToGroupsMappingsRequestProto
protected final static RefreshUserToGroupsMappingsRequestProto
VOID_REFRESH_USER_TO_GROUPS_MAPPING_REQUEST =
RefreshUserToGroupsMappingsRequestProto.newBuilder().build();

private final static RefreshSuperUserGroupsConfigurationRequestProto
protected final static RefreshSuperUserGroupsConfigurationRequestProto
VOID_REFRESH_SUPERUSER_GROUPS_CONFIGURATION_REQUEST =
RefreshSuperUserGroupsConfigurationRequestProto.newBuilder().build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ public class RefreshUserMappingsProtocolServerSideTranslatorPB implements Refres

private final RefreshUserMappingsProtocol impl;

private final static RefreshUserToGroupsMappingsResponseProto
VOID_REFRESH_USER_GROUPS_MAPPING_RESPONSE =
protected final static RefreshUserToGroupsMappingsResponseProto
VOID_REFRESH_USER_GROUPS_MAPPING_RESPONSE =
RefreshUserToGroupsMappingsResponseProto.newBuilder().build();

private final static RefreshSuperUserGroupsConfigurationResponseProto
VOID_REFRESH_SUPERUSER_GROUPS_CONFIGURATION_RESPONSE =
protected final static RefreshSuperUserGroupsConfigurationResponseProto
VOID_REFRESH_SUPERUSER_GROUPS_CONFIGURATION_RESPONSE =
RefreshSuperUserGroupsConfigurationResponseProto.newBuilder()
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class GetUserMappingsProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, GetUserMappingsProtocol, Closeable {

/** RpcController is not used and hence is set to null */
private final static RpcController NULL_CONTROLLER = null;
protected final static RpcController NULL_CONTROLLER = null;
private final GetUserMappingsProtocolPB rpcProxy;

public GetUserMappingsProtocolClientSideTranslatorPB(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public final class Time {
/**
* number of nano seconds in 1 millisecond
*/
private static final long NANOSECONDS_PER_MILLISECOND = 1000000;
public static final long NANOSECONDS_PER_MILLISECOND = 1000000;

private static final TimeZone UTC_ZONE = TimeZone.getTimeZone("UTC");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ void awaitInvocation() throws InterruptedException {
}

void sendResponse() {
deferredCall.setDeferredResponse(request);
deferredCall.setDeferredResponse(request, 0);
}

void sendError() {
deferredCall.setDeferredError(new IOException("DeferredError"));
deferredCall.setDeferredError(new IOException("DeferredError"), 0);
}
}

Expand Down
Loading

0 comments on commit 58f7ed9

Please sign in to comment.