Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trace transport request/response sizes in APM #94845

Open
DaveCTurner opened this issue Mar 28, 2023 · 1 comment
Open

Trace transport request/response sizes in APM #94845

DaveCTurner opened this issue Mar 28, 2023 · 1 comment
Labels
:Distributed Coordination/Network Http and internode communication implementations >enhancement Supportability Improve our (devs, SREs, support eng, users) ability to troubleshoot/self-service product better. Team:Distributed Meta label for distributed team (obsolete)

Comments

@DaveCTurner
Copy link
Contributor

DaveCTurner commented Mar 28, 2023

With #94543 we expose some stats about transport request/response sizes. We also expose information about tasks to the APM tracer. And every transport request/response pair begets a task. So we should be able to expose information about transport request/response sizes to APM too. This would be awfully useful when digging deeply into network traffic investigations.

It looks like we can add arbitrary attributes to a span via Tracer#setAttribute. Getting the request size there isn't too bad (see below) but the response size looks harder because we close the task before serialising the response (edit:see below). Also I don't know if there are any standard attribute names we should use here, or whether it works in APM if we invent attribute names like es.transport.request_bytes.

Patch that adds a request size attribute
diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java
index 4d43bfdfa86..90b5096ef9b 100644
--- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java
+++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java
@@ -492,6 +492,10 @@ public class TaskManager implements ClusterStateApplier {
         return cancellableTasks.assertConsistent();
     }
 
+    public void traceRequestSize(Task task, int requestSizeInBytes) {
+        tracer.setAttribute(getSpanId(task), Tracer.AttributeKeys.TRANSPORT_REQUEST_BYTES, requestSizeInBytes);
+    }
+
     private class Ban {
         final String reason;
         final Set<ChannelPendingTaskTracker> channels;
diff --git a/server/src/main/java/org/elasticsearch/tracing/Tracer.java b/server/src/main/java/org/elasticsearch/tracing/Tracer.java
index 5c61e897222..4e61161d267 100644
--- a/server/src/main/java/org/elasticsearch/tracing/Tracer.java
+++ b/server/src/main/java/org/elasticsearch/tracing/Tracer.java
@@ -190,5 +190,6 @@ public interface Tracer {
         String PARENT_TASK_ID = "es.task.parent.id";
         String CLUSTER_NAME = "es.cluster.name";
         String NODE_NAME = "es.node.name";
+        String TRANSPORT_REQUEST_BYTES = "es.transport.request_bytes";
     }
 }
diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java
index 503d2c3be81..f8e67c10469 100644
--- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java
+++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java
@@ -222,7 +222,9 @@ public class InboundHandler {
         } else {
             final TransportChannel transportChannel;
             final RequestHandlerRegistry<T> reg;
+            final int requestSizeInBytes;
             try {
+                requestSizeInBytes = header.getNetworkMessageSize() + TcpHeader.BYTES_REQUIRED_FOR_MESSAGE_SIZE;
                 reg = requestHandlers.getHandler(action);
                 assert message.isShortCircuit() || reg != null : action;
                 transportChannel = new TcpTransportChannel(
@@ -297,7 +299,7 @@ public class InboundHandler {
                         if (ThreadPool.Names.SAME.equals(executor)) {
                             try (var ignored = threadPool.getThreadContext().newTraceContext()) {
                                 try {
-                                    reg.processMessageReceived(request, transportChannel);
+                                    reg.processMessageReceived(request, transportChannel, requestSizeInBytes);
                                 } catch (Exception e) {
                                     sendErrorResponse(reg.getAction(), transportChannel, e);
                                 }
@@ -310,7 +312,7 @@ public class InboundHandler {
                                     .execute(threadPool.getThreadContext().preserveContextWithTracing(new AbstractRunnable() {
                                         @Override
                                         protected void doRun() throws Exception {
-                                            reg.processMessageReceived(request, transportChannel);
+                                            reg.processMessageReceived(request, transportChannel, requestSizeInBytes);
                                         }
 
                                         @Override
diff --git a/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java b/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java
index 8dc67fb3896..1234b53fae3 100644
--- a/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java
+++ b/server/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java
@@ -61,8 +61,9 @@ public class RequestHandlerRegistry<Request extends TransportRequest> implements
         return requestReader.read(in);
     }
 
-    public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
+    public void processMessageReceived(Request request, TransportChannel channel, int requestSizeInBytes) throws Exception {
         final Task task = taskManager.register(channel.getChannelType(), action, request);
+        taskManager.traceRequestSize(task, requestSizeInBytes);
         Releasable unregisterTask = () -> taskManager.unregister(task);
         try {
             if (channel instanceof TcpTransportChannel tcpTransportChannel && task instanceof CancellableTask cancellableTask) {
diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java
index b22f4436e93..13928c10aee 100644
--- a/server/src/main/java/org/elasticsearch/transport/TransportService.java
+++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java
@@ -976,7 +976,7 @@ public class TransportService extends AbstractLifecycleComponent
             if (ThreadPool.Names.SAME.equals(executor)) {
                 try (var ignored = threadPool.getThreadContext().newTraceContext()) {
                     try {
-                        reg.processMessageReceived(request, channel);
+                        reg.processMessageReceived(request, channel, 0);
                     } catch (Exception e) {
                         handleSendToLocalException(channel, e, action);
                     }
@@ -988,7 +988,7 @@ public class TransportService extends AbstractLifecycleComponent
                     threadPool.executor(executor).execute(threadPool.getThreadContext().preserveContextWithTracing(new AbstractRunnable() {
                         @Override
                         protected void doRun() throws Exception {
-                            reg.processMessageReceived(request, channel);
+                            reg.processMessageReceived(request, channel, 0);
                         }
 
                         @Override

Edit: #94865 keeps the task alive until after serializing the response, so maybe this is easier to achieve now.

@DaveCTurner DaveCTurner added >enhancement :Distributed Coordination/Network Http and internode communication implementations labels Mar 28, 2023
@elasticsearchmachine elasticsearchmachine added the Team:Distributed Meta label for distributed team (obsolete) label Mar 28, 2023
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@DaveCTurner DaveCTurner added the Supportability Improve our (devs, SREs, support eng, users) ability to troubleshoot/self-service product better. label Mar 28, 2023
DaveCTurner added a commit to DaveCTurner/elasticsearch that referenced this issue Mar 29, 2023
Today we unregister transport tasks before calling
`TransportChannel#sendResponse` in order to send back the response. Yet
the serialization and transmission work is still really part of the
task's work, so we should keep the task around until these parts are
done too.

This commit delays the task unregistration until after `sendResponse`
returns, which is simple to achieve. Delaying it until the end of
transmission is a little harder and will be the subject of future work.

Relates elastic#94845
elasticsearchmachine pushed a commit that referenced this issue Mar 29, 2023
Today we unregister transport tasks before calling
`TransportChannel#sendResponse` in order to send back the response. Yet
the serialization and transmission work is still really part of the
task's work, so we should keep the task around until these parts are
done too.

This commit delays the task unregistration until after `sendResponse`
returns, which is simple to achieve. Delaying it until the end of
transmission is a little harder and will be the subject of future work.

Relates #94845
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Coordination/Network Http and internode communication implementations >enhancement Supportability Improve our (devs, SREs, support eng, users) ability to troubleshoot/self-service product better. Team:Distributed Meta label for distributed team (obsolete)
Projects
None yet
Development

No branches or pull requests

2 participants