Skip to content

Commit

Permalink
feat(responsemanager): trace full messages via links to responses
Browse files Browse the repository at this point in the history
Fixes: #318
  • Loading branch information
rvagg committed Jan 6, 2022
1 parent 8e9f6cf commit 8cac33c
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 19 deletions.
20 changes: 20 additions & 0 deletions impl/graphsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func TestRejectRequestsByDefault(t *testing.T) {

tracing := collectTracing(t)
require.ElementsMatch(t, []string{
"message(0)",
"response(0)",
"request(0)->newRequest(0)",
"request(0)->executeTask(0)",
Expand Down Expand Up @@ -549,6 +550,7 @@ func TestGraphsyncRoundTripIgnoreCids(t *testing.T) {

tracing := collectTracing(t)
require.ElementsMatch(t, append(append([]string{
"message(0)",
"response(0)->executeTask(0)",
"request(0)->newRequest(0)",
"request(0)->executeTask(0)",
Expand Down Expand Up @@ -622,6 +624,7 @@ func TestGraphsyncRoundTripIgnoreNBlocks(t *testing.T) {

tracing := collectTracing(t)
require.ElementsMatch(t, append(append([]string{
"message(0)",
"response(0)->executeTask(0)",
"request(0)->newRequest(0)",
"request(0)->executeTask(0)",
Expand Down Expand Up @@ -872,6 +875,8 @@ func TestPauseResumeViaUpdate(t *testing.T) {

tracing := collectTracing(t)
require.ElementsMatch(t, append(append([]string{
"message(0)",
"message(1)",
"response(0)->executeTask(0)",
"response(0)->processUpdate(0)",
"response(0)->executeTask(1)",
Expand All @@ -887,6 +892,16 @@ func TestPauseResumeViaUpdate(t *testing.T) {
require.Equal(t, []string{string(td.extensionName)}, testutil.AttributeValueInTraceSpan(t, *processUpdateSpan, "extensions").AsStringSlice())
// pause recorded
tracing.SingleExceptionEvent(t, "response(0)->executeTask(0)", "github.com/ipfs/go-graphsync/responsemanager/hooks.ErrPaused", hooks.ErrPaused{}.Error(), false)

message0Span := tracing.FindSpanByTraceString("message(0)")
message1Span := tracing.FindSpanByTraceString("message(1)")
responseSpan := tracing.FindSpanByTraceString("response(0)")
// response(0) originates in message(0)
require.Len(t, responseSpan.Links, 1)
require.Equal(t, responseSpan.Links[0].SpanContext.SpanID(), message0Span.SpanContext.SpanID())
// response(0)->processUpdate(0) occurs thanks to message(1)
require.Len(t, processUpdateSpan.Links, 1)
require.Equal(t, processUpdateSpan.Links[0].SpanContext.SpanID(), message1Span.SpanContext.SpanID())
}

func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {
Expand Down Expand Up @@ -963,6 +978,8 @@ func TestPauseResumeViaUpdateOnBlockHook(t *testing.T) {

tracing := collectTracing(t)
require.ElementsMatch(t, append(append([]string{
"message(0)",
"message(1)",
"response(0)->executeTask(0)",
"response(0)->processUpdate(0)",
"response(0)->executeTask(1)",
Expand Down Expand Up @@ -1057,6 +1074,7 @@ func TestNetworkDisconnect(t *testing.T) {
tracing := collectTracing(t)

traceStrings := tracing.TracesToStrings()
require.Contains(t, traceStrings, "message(0)")
require.Contains(t, traceStrings, "response(0)->executeTask(0)")
require.Contains(t, traceStrings, "response(0)->abortRequest(0)")
require.Contains(t, traceStrings, "response(0)->executeTask(1)")
Expand Down Expand Up @@ -1344,6 +1362,7 @@ func TestRoundTripLargeBlocksSlowNetwork(t *testing.T) {

tracing := collectTracing(t)
require.ElementsMatch(t, append(append([]string{
"message(0)",
"response(0)->executeTask(0)",
"request(0)->newRequest(0)",
"request(0)->executeTask(0)",
Expand Down Expand Up @@ -1580,6 +1599,7 @@ func TestGraphsyncBlockListeners(t *testing.T) {
tracing := collectTracing(t)
require.ElementsMatch(t, append(append(
[]string{
"message(0)",
"response(0)->executeTask(0)",
"request(0)->newRequest(0)",
"request(0)->executeTask(0)",
Expand Down
2 changes: 1 addition & 1 deletion responsemanager/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type errorRequestMessage struct {
}

func (erm *errorRequestMessage) handle(rm *ResponseManager) {
err := rm.abortRequest(erm.p, erm.requestID, erm.err)
err := rm.abortRequest(rm.ctx, erm.p, erm.requestID, erm.err)
select {
case <-rm.ctx.Done():
case erm.response <- err:
Expand Down
23 changes: 23 additions & 0 deletions responsemanager/responsemanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,13 @@ func TestIncomingQuery(t *testing.T) {

tracing := td.collectTracing(t)
require.ElementsMatch(t, []string{
"message(0)",
"TestIncomingQuery(0)->response(0)->executeTask(0)",
}, tracing.TracesToStrings())
messageSpan := tracing.FindSpanByTraceString("message(0)")
responseSpan := tracing.FindSpanByTraceString("TestIncomingQuery(0)->response(0)")
require.Len(t, responseSpan.Links, 1)
require.Equal(t, responseSpan.Links[0].SpanContext.SpanID(), messageSpan.SpanContext.SpanID())
}

func TestCancellationQueryInProgress(t *testing.T) {
Expand Down Expand Up @@ -128,6 +133,24 @@ func TestCancellationQueryInProgress(t *testing.T) {
td.connManager.RefuteProtected(t, td.p)

td.assertRequestCleared()

tracing := td.collectTracing(t)
require.ElementsMatch(t, []string{
"message(0)",
"response(0)->executeTask(0)",
"response(0)->abortRequest(0)",
"message(1)",
}, tracing.TracesToStrings())
message0Span := tracing.FindSpanByTraceString("message(0)")
message1Span := tracing.FindSpanByTraceString("message(1)")
responseSpan := tracing.FindSpanByTraceString("response(0)")
abortRequestSpan := tracing.FindSpanByTraceString("response(0)->abortRequest(0)")
// response(0) originates in message(0)
require.Len(t, responseSpan.Links, 1)
require.Equal(t, responseSpan.Links[0].SpanContext.SpanID(), message0Span.SpanContext.SpanID())
// response(0)->abortRequest(0) occurs thanks to message(1)
require.Len(t, abortRequestSpan.Links, 1)
require.Equal(t, abortRequestSpan.Links[0].SpanContext.SpanID(), message1Span.SpanContext.SpanID())
}

func TestCancellationViaCommand(t *testing.T) {
Expand Down
57 changes: 39 additions & 18 deletions responsemanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,22 @@ func (rm *ResponseManager) terminateRequest(key responseKey) {
ipr.span.End()
}

func (rm *ResponseManager) processUpdate(key responseKey, update gsmsg.GraphSyncRequest) {
func (rm *ResponseManager) processUpdate(ctx context.Context, key responseKey, update gsmsg.GraphSyncRequest) {
response, ok := rm.inProgressResponses[key]
if !ok || response.state == graphsync.CompletingSend {
log.Warnf("received update for non existent request, peer %s, request ID %d", key.p.Pretty(), key.requestID)
return
}

_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, response.span), "processUpdate", trace.WithAttributes(
attribute.Int("id", int(update.ID())),
attribute.StringSlice("extensions", update.ExtensionNames()),
))
_, span := otel.Tracer("graphsync").Start(
trace.ContextWithSpan(ctx, response.span),
"processUpdate",
trace.WithLinks(trace.LinkFromContext(ctx)),
trace.WithAttributes(
attribute.Int("id", int(update.ID())),
attribute.StringSlice("extensions", update.ExtensionNames()),
))

defer span.End()

if response.state != graphsync.Paused {
Expand Down Expand Up @@ -125,15 +130,18 @@ func (rm *ResponseManager) unpauseRequest(p peer.ID, requestID graphsync.Request
return nil
}

func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID, err error) error {
func (rm *ResponseManager) abortRequest(ctx context.Context, p peer.ID, requestID graphsync.RequestID, err error) error {
key := responseKey{p, requestID}
rm.responseQueue.Remove(key, key.p)
response, ok := rm.inProgressResponses[key]
if !ok || response.state == graphsync.CompletingSend {
return errors.New("could not find request")
}

_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(rm.ctx, response.span), "abortRequest")
_, span := otel.Tracer("graphsync").Start(trace.ContextWithSpan(ctx, response.span),
"abortRequest",
trace.WithLinks(trace.LinkFromContext(ctx)),
)
defer span.End()
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
Expand Down Expand Up @@ -166,25 +174,38 @@ func (rm *ResponseManager) abortRequest(p peer.ID, requestID graphsync.RequestID
}

func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSyncRequest) {
ctx, messageSpan := otel.Tracer("graphsync").Start(
rm.ctx,
"requestMessage",
trace.WithAttributes(attribute.String("peerID", p.Pretty())),
)
defer messageSpan.End()

for _, request := range requests {
key := responseKey{p: p, requestID: request.ID()}
if request.IsCancel() {
_ = rm.abortRequest(p, request.ID(), ipldutil.ContextCancelError{})
_ = rm.abortRequest(ctx, p, request.ID(), ipldutil.ContextCancelError{})
continue
}
if request.IsUpdate() {
rm.processUpdate(key, request)
rm.processUpdate(ctx, key, request)
continue
}
rm.connManager.Protect(p, request.ID().Tag())
ctx := rm.requestQueuedHooks.ProcessRequestQueuedHooks(p, request, rm.ctx)
ctx, responseSpan := otel.Tracer("graphsync").Start(ctx, "response", trace.WithAttributes(
attribute.Int("id", int(request.ID())),
attribute.Int("priority", int(request.Priority())),
attribute.String("root", request.Root().String()),
attribute.StringSlice("extensions", request.ExtensionNames()),
))
ctx, cancelFn := context.WithCancel(ctx)
// don't use `ctx` which has the "message" trace, but rm.ctx for a fresh trace which allows
// for a request hook to join this particular response up to an existing external trace
rctx := rm.requestQueuedHooks.ProcessRequestQueuedHooks(p, request, rm.ctx)
rctx, responseSpan := otel.Tracer("graphsync").Start(
rctx,
"response",
trace.WithLinks(trace.LinkFromContext(ctx)),
trace.WithAttributes(
attribute.Int("id", int(request.ID())),
attribute.Int("priority", int(request.Priority())),
attribute.String("root", request.Root().String()),
attribute.StringSlice("extensions", request.ExtensionNames()),
))
rctx, cancelFn := context.WithCancel(rctx)
sub := &subscriber{
p: key.p,
request: request,
Expand All @@ -202,7 +223,7 @@ func (rm *ResponseManager) processRequests(p peer.ID, requests []gsmsg.GraphSync

rm.inProgressResponses[key] =
&inProgressResponseStatus{
ctx: ctx,
ctx: rctx,
span: responseSpan,
cancelFn: cancelFn,
request: request,
Expand Down

0 comments on commit 8cac33c

Please sign in to comment.