Skip to content

Commit

Permalink
feat(aggregateeventrecorder): send per sp protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
hannahhoward authored and kylehuntsman committed May 25, 2023
1 parent 08d25b3 commit e7bdb6f
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 8 deletions.
4 changes: 4 additions & 0 deletions pkg/aggregateeventrecorder/aggregateeventrecorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type tempData struct {
type RetrievalAttempt struct {
Error string `json:"error,omitempty"`
TimeToFirstByte string `json:"timeToFirstByte,omitempty"`
Protocol string `json:"protocol,omitempty"`
}

type AggregateEvent struct {
Expand Down Expand Up @@ -163,6 +164,9 @@ func (a *aggregateEventRecorder) ingestEvents() {
case types.RetrievalPhase:
// Create a retrieval attempt
var attempt RetrievalAttempt
if len(event.Protocols()) > 0 {
attempt.Protocol = event.Protocols()[0].String()
}
spid := types.Identifier(event)

// Save the retrieval attempt
Expand Down
15 changes: 9 additions & 6 deletions pkg/aggregateeventrecorder/aggregateeventrecorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ func TestAggregateEventRecorder(t *testing.T) {
clock.Add(10 * time.Millisecond)
subscriber(events.CandidatesFound(clock.Now(), id, indexerStartTime, testCid1, graphsyncCandidates))
subscriber(events.CandidatesFiltered(clock.Now(), id, indexerStartTime, testCid1, graphsyncCandidates[:2]))
subscriber(events.Started(clock.Now(), id, clock.Now(), types.RetrievalPhase, graphsyncCandidates[0]))
subscriber(events.Started(clock.Now(), id, clock.Now(), types.RetrievalPhase, graphsyncCandidates[1]))
subscriber(events.Started(clock.Now(), id, clock.Now(), types.RetrievalPhase, graphsyncCandidates[0], multicodec.TransportGraphsyncFilecoinv1))
subscriber(events.Started(clock.Now(), id, clock.Now(), types.RetrievalPhase, graphsyncCandidates[1], multicodec.TransportGraphsyncFilecoinv1))
graphsyncCandidateStartTime := clock.Now()
clock.Add(10 * time.Millisecond)
subscriber(events.CandidatesFound(clock.Now(), id, indexerStartTime, testCid1, bitswapCandidates[:2]))
subscriber(events.CandidatesFiltered(clock.Now(), id, indexerStartTime, testCid1, bitswapCandidates[:1]))
bitswapPeer := types.NewRetrievalCandidate(peer.ID(""), nil, testCid1, &metadata.Bitswap{})
subscriber(events.Started(clock.Now(), id, clock.Now(), types.RetrievalPhase, bitswapPeer))
subscriber(events.Started(clock.Now(), id, clock.Now(), types.RetrievalPhase, bitswapPeer, multicodec.TransportBitswap))
bitswapCandidateStartTime := clock.Now()
clock.Add(20 * time.Millisecond)
subscriber(events.FirstByte(clock.Now(), id, bitswapCandidateStartTime, bitswapPeer))
Expand Down Expand Up @@ -107,16 +107,19 @@ func TestAggregateEventRecorder(t *testing.T) {
require.Equal(t, int64(3), retrievalAttempts.Length())
sp1Attempt, err := retrievalAttempts.LookupByString(graphsyncCandidates[0].MinerPeer.ID.String())
require.NoError(t, err)
require.Equal(t, int64(1), sp1Attempt.Length())
require.Equal(t, int64(2), sp1Attempt.Length())
verifyStringNode(t, sp1Attempt, "protocol", multicodec.TransportGraphsyncFilecoinv1.String())
verifyStringNode(t, sp1Attempt, "error", "failed to dial")
sp2Attempt, err := retrievalAttempts.LookupByString(graphsyncCandidates[1].MinerPeer.ID.String())
require.NoError(t, err)
require.Equal(t, int64(1), sp2Attempt.Length())
require.Equal(t, int64(2), sp2Attempt.Length())
verifyStringNode(t, sp2Attempt, "protocol", multicodec.TransportGraphsyncFilecoinv1.String())
verifyStringNode(t, sp2Attempt, "timeToFirstByte", "60ms")
bitswapAttempt, err := retrievalAttempts.LookupByString(types.BitswapIndentifier)
require.NoError(t, err)
require.Equal(t, int64(1), bitswapAttempt.Length())
require.Equal(t, int64(2), bitswapAttempt.Length())
verifyStringNode(t, bitswapAttempt, "timeToFirstByte", "40ms")
verifyStringNode(t, bitswapAttempt, "protocol", multicodec.TransportBitswap.String())

},
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/retriever/bitswapretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb
return nil, nil
}

br.events(events.Started(br.clock.Now(), br.request.RetrievalID, phaseStartTime, types.RetrievalPhase, bitswapCandidate))
br.events(events.Started(br.clock.Now(), br.request.RetrievalID, phaseStartTime, types.RetrievalPhase, bitswapCandidate, multicodec.TransportBitswap))

// set initial providers, then start a goroutine to add more as they come in
br.routing.AddProviders(br.request.RetrievalID, nextCandidates)
Expand Down
2 changes: 1 addition & 1 deletion pkg/retriever/parallelpeerretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (retrieval *retrieval) runRetrievalCandidate(
var retrievalErr error
var done func()

shared.sendEvent(events.Started(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate))
shared.sendEvent(events.Started(retrieval.parallelPeerRetriever.Clock.Now(), retrieval.request.RetrievalID, phaseStartTime, types.RetrievalPhase, candidate, retrieval.Protocol.Code()))
connectCtx := ctx
if timeout != 0 {
var timeoutFunc func()
Expand Down

0 comments on commit e7bdb6f

Please sign in to comment.