Skip to content

Commit

Permalink
Track stats for all kinds of error responses too
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Mar 22, 2023
1 parent f2e42f1 commit 905e5ad
Showing 1 changed file with 30 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,26 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
channel.close();
}
} else {
final TransportChannel transportChannel;
final RequestHandlerRegistry<T> reg;
try {
messageListener.onRequestReceived(requestId, action);
if (message.isShortCircuit()) {
final TransportChannel transportChannel = new TcpTransportChannel(
reg = requestHandlers.getHandler(action);
assert reg != null;
transportChannel = new TcpTransportChannel(
outboundHandler,
channel,
action,
requestId,
version,
header.getCompressionScheme(),
reg,
header.isHandshake(),
message.takeBreakerReleaseControl()
);
} catch (Exception e) {
sendErrorResponse(
action,
new TcpTransportChannel(
outboundHandler,
channel,
action,
Expand All @@ -233,25 +249,21 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
ResponseStatsConsumer.NONE,
header.isHandshake(),
message.takeBreakerReleaseControl()
);
),
e
);
return;
}

try {
messageListener.onRequestReceived(requestId, action);
reg.addRequestStats(header.getNetworkMessageSize() + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE);

if (message.isShortCircuit()) {
sendErrorResponse(action, transportChannel, message.getException());
} else {
final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput());
assertRemoteVersion(stream, header.getVersion());
final RequestHandlerRegistry<T> reg = requestHandlers.getHandler(action);
assert reg != null;
reg.addRequestStats(header.getNetworkMessageSize() + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE);
final TransportChannel transportChannel = new TcpTransportChannel(
outboundHandler,
channel,
action,
requestId,
version,
header.getCompressionScheme(),
reg,
header.isHandshake(),
message.takeBreakerReleaseControl()
);
final T request;
try {
request = reg.newRequest(stream);
Expand Down Expand Up @@ -324,17 +336,6 @@ public void onAfter() {
}
}
} catch (Exception e) {
final TransportChannel transportChannel = new TcpTransportChannel(
outboundHandler,
channel,
action,
requestId,
version,
header.getCompressionScheme(),
ResponseStatsConsumer.NONE,
header.isHandshake(),
message.takeBreakerReleaseControl()
);
sendErrorResponse(action, transportChannel, e);
}
}
Expand Down

0 comments on commit 905e5ad

Please sign in to comment.