Skip to content

Commit

Permalink
fix merge conflict and metric emissions
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed Aug 8, 2024
1 parent 2d5dfa5 commit d340113
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
clobPairIds = append(clobPairIds, clobPairId)
}

sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(updates)))
sm.AddUpdatesToCache(streamUpdates, clobPairIds)
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
Expand Down Expand Up @@ -393,7 +393,7 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
clobPairIds = append(clobPairIds, clobPairId)
}

sm.AddUpdatesToCache(streamUpdates, clobPairIds, uint32(len(orderbookFills)))
sm.AddUpdatesToCache(streamUpdates, clobPairIds)
}

// SendTakerOrderStatus sends out a taker order and its status to the full node streaming service.
Expand All @@ -411,30 +411,31 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(
}

sm.AddUpdatesToCache(
map[uint32][]clobtypes.StreamUpdate{
clobPairId: {
{
UpdateMessage: &clobtypes.StreamUpdate_TakerOrder{
TakerOrder: &streamTakerOrder,
},
[]clobtypes.StreamUpdate{
{
UpdateMessage: &clobtypes.StreamUpdate_TakerOrder{
TakerOrder: &streamTakerOrder,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
},
1,
[]uint32{clobPairId},
)
}

// AddUpdatesToCache adds a series of updates to the full node streaming cache.
// Clob pair ids are the clob pair id each update is relevant to.
func (sm *FullNodeStreamingManagerImpl) AddUpdatesToCache(
updates []clobtypes.StreamUpdate,
clobPairIds []uint32,
numUpdatesToAdd uint32,
) {
sm.Lock()
defer sm.Unlock()

metrics.IncrCounter(
metrics.GrpcAddUpdateToBufferCount,
float32(numUpdatesToAdd),
float32(len(updates)),
)

sm.streamUpdateCache = append(sm.streamUpdateCache, updates...)
Expand Down

0 comments on commit d340113

Please sign in to comment.