Skip to content

Commit

Permalink
Add proper handling of content-read and content-write logs within…
Browse files Browse the repository at this point in the history
… `HTTP Request` tracing span (#8105)

* Add proper handling of content-read and content-write logs within HTTP Request tracing span

* Improve table column sizing
  • Loading branch information
tjquinno authored Dec 7, 2023
1 parent df64dfa commit 7092015
Show file tree
Hide file tree
Showing 2 changed files with 225 additions and 36 deletions.
2 changes: 1 addition & 1 deletion docs/includes/tracing/common-spans.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ is used.
Some of these spans `log` to the span. These log events can be (in most cases) configured:
[cols="2,2,2,4", role="flex, sm10"]
[cols="2,2,1,1,8", role="flex, sm10"]
|===
|span name |log name |configurable |enabled by default |description
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.helidon.webserver.observe.tracing;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.LinkedList;
Expand Down Expand Up @@ -62,6 +63,8 @@
*/
@RuntimeType.PrototypedBy(TracingObserverConfig.class)
public class TracingObserver implements Observer, RuntimeType.Api<TracingObserverConfig> {
private static final String CONTENT_READ_SPAN_NAME = "content-read";
private static final String CONTENT_WRITE_SPAN_NAME = "content-write";
private final TracingObserverConfig config;

private TracingObserver(TracingObserverConfig config) {
Expand Down Expand Up @@ -215,13 +218,24 @@ Find configuration of the web server span (can customize name, disable etc.)
context.register(span.context());
context.register(TracingConfig.class, span.context());

/*
Register an input stream filter to handle content read span.
*/
if (spanConfig.logEnabled(CONTENT_READ_SPAN_NAME, true)) {
// Invoked when the input stream is read. Our implementation does tracing, then delegates to the real stream.
req.streamFilter(is -> new TracingStreamInputDelegate(tracer, span, is));

}

/*
Register an output stream filter to correctly handle content write span
*/
res.streamFilter(os -> {
// this is invoked when the user requests output stream, we just replace it with our own delegate
return new TracingStreamDelegate(tracer, span, os);
});
if (spanConfig.logEnabled(CONTENT_WRITE_SPAN_NAME, true)) {
res.streamFilter(os -> {
// this is invoked when the user requests output stream, we just replace it with our own delegate
return new TracingStreamOutputDelegate(tracer, span, os);
});
}

try (Scope ignored = span.activate()) {
span.tag(Tag.COMPONENT.create("helidon-webserver"));
Expand Down Expand Up @@ -273,51 +287,244 @@ private TracingConfig configureTracingConfig(RoutingRequest req, Context context
}
}

private static final class TracingStreamDelegate extends OutputStream {
private static final class TracingStreamDelegate {

private final Tracer tracer;
private final Span requestSpan;
private final OutputStream delegate;

private final String logName;
private boolean started;
private boolean stopped;
private Span span;
private Throwable thrown;

private TracingStreamDelegate(Tracer tracer, Span requestSpan, OutputStream delegate) {


private TracingStreamDelegate(Tracer tracer, Span requestSpan, String logName) {
this.tracer = tracer;
this.requestSpan = requestSpan;
this.logName = logName;
}

void thrown(Throwable t) {
thrown = t;
}

void start() {
if (!started) {
started = true;
span(tracer.spanBuilder(logName)
.parent(requestSpan.context())
.start());
}
}

void stop() {
if (started && !stopped) {
stopped = true;
if (thrown == null) {
span.end();
} else {
span.end(thrown);
}
}
}
void span(Span span) {
this.span = span;
}
}

private static final class TracingStreamInputDelegate extends InputStream {
private final TracingStreamDelegate tracingStream;
private final InputStream delegate;

private TracingStreamInputDelegate(Tracer tracer, Span requestSpan, InputStream delegate) {
this.tracingStream = new TracingStreamDelegate(tracer, requestSpan, CONTENT_READ_SPAN_NAME);
this.delegate = delegate;
}

@Override
public int available() throws IOException {
try {
return delegate.available();
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
} finally {
tracingStream.stop();
}
}

@Override
public void close() throws IOException {
try {
delegate.close();
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
} finally {
tracingStream.stop();
}
}

@Override
public void mark(int readlimit) {
delegate.mark(readlimit);
}

@Override
public boolean markSupported() {
return delegate.markSupported();
}

@Override
public int read() throws IOException {
tracingStream.start();
try {
return delegate.read();
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public int read(byte[] b) throws IOException {
tracingStream.start();
try {
return delegate.read(b);
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
tracingStream.start();
try {
return delegate.read(b, off, len);
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public byte[] readAllBytes() throws IOException {
tracingStream.start();
try {
return delegate.readAllBytes();
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public byte[] readNBytes(int len) throws IOException {
tracingStream.start();
try {
return delegate.readNBytes(len);
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public int readNBytes(byte[] b, int off, int len) throws IOException {
tracingStream.start();
try {
return delegate.readNBytes(b, off, len);
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public void reset() throws IOException {
tracingStream.start();
try {
delegate.reset();
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public long skip(long n) throws IOException {
tracingStream.start();
try {
return delegate.skip(n);
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public void skipNBytes(long n) throws IOException {
tracingStream.start();
try {
delegate.skipNBytes(n);
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}

@Override
public long transferTo(OutputStream out) throws IOException {
tracingStream.start();
try {
return delegate.transferTo(out);
} catch (IOException e) {
tracingStream.thrown(e);
throw e;
}
}
}

private static final class TracingStreamOutputDelegate extends OutputStream {
private final TracingStreamDelegate tracingStream;
private final OutputStream delegate;

private TracingStreamOutputDelegate(Tracer tracer, Span requestSpan, OutputStream delegate) {
this.tracingStream = new TracingStreamDelegate(tracer, requestSpan, CONTENT_WRITE_SPAN_NAME);
this.delegate = delegate;
}

@Override
public void write(int b) throws IOException {
start();
tracingStream.start();
try {
this.delegate.write(b);
} catch (IOException e) {
thrown = e;
tracingStream.thrown(e);
throw e;
}
}

@Override
public void write(byte[] b) throws IOException {
start();
tracingStream.start();
try {
this.delegate.write(b);
} catch (IOException e) {
thrown = e;
tracingStream.thrown(e);
throw e;
}
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
start();
tracingStream.start();
try {
this.delegate.write(b, off, len);
} catch (IOException e) {
thrown = e;
tracingStream.thrown(e);
throw e;
}
}
Expand All @@ -327,7 +534,7 @@ public void flush() throws IOException {
try {
this.delegate.flush();
} catch (IOException e) {
thrown = e;
tracingStream.thrown(e);
throw e;
}
}
Expand All @@ -337,32 +544,14 @@ public void close() throws IOException {
try {
this.delegate.close();
} catch (IOException e) {
thrown = e;
tracingStream.thrown(e);
throw e;
} finally {
stop();
tracingStream.stop();
}
}

private void start() {
if (!started) {
started = true;
span = tracer.spanBuilder("content-write")
.parent(requestSpan.context())
.start();
}
}

private void stop() {
if (started && !stopped) {
stopped = true;
if (thrown == null) {
span.end();
} else {
span.end(thrown);
}
}
}
}

private static class HeaderProviderImpl implements HeaderProvider {
Expand Down

0 comments on commit 7092015

Please sign in to comment.