From e7bdb6fbc3f638c47e66065c7594dfa03def72c1 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 25 May 2023 15:37:56 -0700 Subject: [PATCH] feat(aggregateeventrecorder): send per sp protocol --- .../aggregateeventrecorder.go | 4 ++++ .../aggregateeventrecorder_test.go | 15 +++++++++------ pkg/retriever/bitswapretriever.go | 2 +- pkg/retriever/parallelpeerretriever.go | 2 +- 4 files changed, 15 insertions(+), 8 deletions(-) diff --git a/pkg/aggregateeventrecorder/aggregateeventrecorder.go b/pkg/aggregateeventrecorder/aggregateeventrecorder.go index d8f3a07e..b5a5286c 100644 --- a/pkg/aggregateeventrecorder/aggregateeventrecorder.go +++ b/pkg/aggregateeventrecorder/aggregateeventrecorder.go @@ -38,6 +38,7 @@ type tempData struct { type RetrievalAttempt struct { Error string `json:"error,omitempty"` TimeToFirstByte string `json:"timeToFirstByte,omitempty"` + Protocol string `json:"protocol,omitempty"` } type AggregateEvent struct { @@ -163,6 +164,9 @@ func (a *aggregateEventRecorder) ingestEvents() { case types.RetrievalPhase: // Create a retrieval attempt var attempt RetrievalAttempt + if len(event.Protocols()) > 0 { + attempt.Protocol = event.Protocols()[0].String() + } spid := types.Identifier(event) // Save the retrieval attempt diff --git a/pkg/aggregateeventrecorder/aggregateeventrecorder_test.go b/pkg/aggregateeventrecorder/aggregateeventrecorder_test.go index 0e6859f2..cff881a9 100644 --- a/pkg/aggregateeventrecorder/aggregateeventrecorder_test.go +++ b/pkg/aggregateeventrecorder/aggregateeventrecorder_test.go @@ -57,14 +57,14 @@ func TestAggregateEventRecorder(t *testing.T) { clock.Add(10 * time.Millisecond) subscriber(events.CandidatesFound(clock.Now(), id, indexerStartTime, testCid1, graphsyncCandidates)) subscriber(events.CandidatesFiltered(clock.Now(), id, indexerStartTime, testCid1, graphsyncCandidates[:2])) - subscriber(events.Started(clock.Now(), id, clock.Now(), types.RetrievalPhase, graphsyncCandidates[0])) - subscriber(events.Started(clock.Now(), id, clock.Now(), types.RetrievalPhase, graphsyncCandidates[1])) + subscriber(events.Started(clock.Now(), id, clock.Now(), types.RetrievalPhase, graphsyncCandidates[0], multicodec.TransportGraphsyncFilecoinv1)) + subscriber(events.Started(clock.Now(), id, clock.Now(), types.RetrievalPhase, graphsyncCandidates[1], multicodec.TransportGraphsyncFilecoinv1)) graphsyncCandidateStartTime := clock.Now() clock.Add(10 * time.Millisecond) subscriber(events.CandidatesFound(clock.Now(), id, indexerStartTime, testCid1, bitswapCandidates[:2])) subscriber(events.CandidatesFiltered(clock.Now(), id, indexerStartTime, testCid1, bitswapCandidates[:1])) bitswapPeer := types.NewRetrievalCandidate(peer.ID(""), nil, testCid1, &metadata.Bitswap{}) - subscriber(events.Started(clock.Now(), id, clock.Now(), types.RetrievalPhase, bitswapPeer)) + subscriber(events.Started(clock.Now(), id, clock.Now(), types.RetrievalPhase, bitswapPeer, multicodec.TransportBitswap)) bitswapCandidateStartTime := clock.Now() clock.Add(20 * time.Millisecond) subscriber(events.FirstByte(clock.Now(), id, bitswapCandidateStartTime, bitswapPeer)) @@ -107,16 +107,19 @@ func TestAggregateEventRecorder(t *testing.T) { require.Equal(t, int64(3), retrievalAttempts.Length()) sp1Attempt, err := retrievalAttempts.LookupByString(graphsyncCandidates[0].MinerPeer.ID.String()) require.NoError(t, err) - require.Equal(t, int64(1), sp1Attempt.Length()) + require.Equal(t, int64(2), sp1Attempt.Length()) + verifyStringNode(t, sp1Attempt, "protocol", multicodec.TransportGraphsyncFilecoinv1.String()) verifyStringNode(t, sp1Attempt, "error", "failed to dial") sp2Attempt, err := retrievalAttempts.LookupByString(graphsyncCandidates[1].MinerPeer.ID.String()) require.NoError(t, err) - require.Equal(t, int64(1), sp2Attempt.Length()) + require.Equal(t, int64(2), sp2Attempt.Length()) + verifyStringNode(t, sp2Attempt, "protocol", multicodec.TransportGraphsyncFilecoinv1.String()) verifyStringNode(t, sp2Attempt, "timeToFirstByte", "60ms") bitswapAttempt, err := retrievalAttempts.LookupByString(types.BitswapIndentifier) require.NoError(t, err) - require.Equal(t, int64(1), bitswapAttempt.Length()) + require.Equal(t, int64(2), bitswapAttempt.Length()) verifyStringNode(t, bitswapAttempt, "timeToFirstByte", "40ms") + verifyStringNode(t, bitswapAttempt, "protocol", multicodec.TransportBitswap.String()) }, }, diff --git a/pkg/retriever/bitswapretriever.go b/pkg/retriever/bitswapretriever.go index 664af3ef..3f63f504 100644 --- a/pkg/retriever/bitswapretriever.go +++ b/pkg/retriever/bitswapretriever.go @@ -166,7 +166,7 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb return nil, nil } - br.events(events.Started(br.clock.Now(), br.request.RetrievalID, phaseStartTime, types.RetrievalPhase, bitswapCandidate)) + br.events(events.Started(br.clock.Now(), br.request.RetrievalID, phaseStartTime, types.RetrievalPhase, bitswapCandidate, multicodec.TransportBitswap)) // set initial providers, then start a goroutine to add more as they come in br.routing.AddProviders(br.request.RetrievalID, nextCandidates) diff --git a/pkg/retriever/parallelpeerretriever.go b/pkg/retriever/parallelpeerretriever.go index 3754277d..f82b8cab 100644 --- a/pkg/retriever/parallelpeerretriever.go +++ b/pkg/retriever/parallelpeerretriever.go @@ -293,7 +293,7 @@ func (retrieval *retrieval) runRetrievalCandidate( var retrievalErr error var done func() - shared.sendEvent(events.Started(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate)) + shared.sendEvent(events.Started(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrieval.Protocol.Code())) connectCtx := ctx if timeout != 0 { var timeoutFunc func()