Skip to content

Commit

Permalink
Full Node Streaming Recurring snapshots (#2079)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed Aug 15, 2024
1 parent 5297fbd commit 0fa54f4
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 9 deletions.
6 changes: 5 additions & 1 deletion protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1917,12 +1917,16 @@ func getFullNodeStreamingManagerFromOptions(
logger log.Logger,
) (manager streamingtypes.FullNodeStreamingManager) {
if appFlags.GrpcStreamingEnabled {
logger.Info("GRPC streaming is enabled")
logger.Info("GRPC streaming is enabled", log.ModuleKey, "full-node-streaming")
if appFlags.FullNodeStreamingSnapshotInterval > 0 {
logger.Info("Interval snapshots enabled", log.ModuleKey, "full-node-streaming")
}
return streaming.NewFullNodeStreamingManager(
logger,
appFlags.GrpcStreamingFlushIntervalMs,
appFlags.GrpcStreamingMaxBatchSize,
appFlags.GrpcStreamingMaxChannelBufferSize,
appFlags.FullNodeStreamingSnapshotInterval,
)
}
return streaming.NewNoopGrpcStreamingManager()
Expand Down
20 changes: 20 additions & 0 deletions protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Flags struct {
GrpcStreamingFlushIntervalMs uint32
GrpcStreamingMaxBatchSize uint32
GrpcStreamingMaxChannelBufferSize uint32
FullNodeStreamingSnapshotInterval uint32

VEOracleEnabled bool // Slinky Vote Extensions
}
Expand All @@ -45,6 +46,7 @@ const (
GrpcStreamingFlushIntervalMs = "grpc-streaming-flush-interval-ms"
GrpcStreamingMaxBatchSize = "grpc-streaming-max-batch-size"
GrpcStreamingMaxChannelBufferSize = "grpc-streaming-max-channel-buffer-size"
FullNodeStreamingSnapshotInterval = "fns-snapshot-interval"

// Slinky VEs enabled
VEOracleEnabled = "slinky-vote-extension-oracle-enabled"
Expand All @@ -61,6 +63,7 @@ const (
DefaultGrpcStreamingFlushIntervalMs = 50
DefaultGrpcStreamingMaxBatchSize = 2000
DefaultGrpcStreamingMaxChannelBufferSize = 2000
DefaultFullNodeStreamingSnapshotInterval = 0

DefaultVEOracleEnabled = true
)
Expand Down Expand Up @@ -111,6 +114,12 @@ func AddFlagsToCmd(cmd *cobra.Command) {
DefaultGrpcStreamingMaxChannelBufferSize,
"Maximum per-subscription channel size before grpc streaming cancels a singular subscription",
)
cmd.Flags().Uint32(
FullNodeStreamingSnapshotInterval,
DefaultFullNodeStreamingSnapshotInterval,
"If set to positive number, number of blocks between each periodic snapshot will be sent out. "+
"Defaults to zero for regular behavior of one initial snapshot.",
)
cmd.Flags().Bool(
VEOracleEnabled,
DefaultVEOracleEnabled,
Expand Down Expand Up @@ -140,6 +149,10 @@ func (f *Flags) Validate() error {
return fmt.Errorf("grpc streaming channel size must be positive number")
}
}
if f.FullNodeStreamingSnapshotInterval > 0 && f.FullNodeStreamingSnapshotInterval < 50 {
return fmt.Errorf("full node streaming snapshot interval must be >= 50 blocks or zero")
}

return nil
}

Expand All @@ -163,6 +176,7 @@ func GetFlagValuesFromOptions(
GrpcStreamingFlushIntervalMs: DefaultGrpcStreamingFlushIntervalMs,
GrpcStreamingMaxBatchSize: DefaultGrpcStreamingMaxBatchSize,
GrpcStreamingMaxChannelBufferSize: DefaultGrpcStreamingMaxChannelBufferSize,
FullNodeStreamingSnapshotInterval: DefaultFullNodeStreamingSnapshotInterval,

VEOracleEnabled: true,
}
Expand Down Expand Up @@ -228,6 +242,12 @@ func GetFlagValuesFromOptions(
}
}

if option := appOpts.Get(FullNodeStreamingSnapshotInterval); option != nil {
if v, err := cast.ToUint32E(option); err == nil {
result.FullNodeStreamingSnapshotInterval = v
}
}

if option := appOpts.Get(VEOracleEnabled); option != nil {
if v, err := cast.ToBoolE(option); err == nil {
result.VEOracleEnabled = v
Expand Down
46 changes: 41 additions & 5 deletions protocol/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func TestAddFlagsToCommand(t *testing.T) {
fmt.Sprintf("Has %s flag", flags.GrpcStreamingMaxBatchSize): {
flagName: flags.GrpcStreamingMaxBatchSize,
},
fmt.Sprintf("Has %s flag", flags.FullNodeStreamingSnapshotInterval): {
flagName: flags.FullNodeStreamingSnapshotInterval,
},
fmt.Sprintf("Has %s flag", flags.GrpcStreamingMaxChannelBufferSize): {
flagName: flags.GrpcStreamingMaxChannelBufferSize,
},
Expand All @@ -57,11 +60,12 @@ func TestValidate(t *testing.T) {
}{
"success (default values)": {
flags: flags.Flags{
NonValidatingFullNode: flags.DefaultNonValidatingFullNode,
DdAgentHost: flags.DefaultDdAgentHost,
DdTraceAgentPort: flags.DefaultDdTraceAgentPort,
GrpcAddress: config.DefaultGRPCAddress,
GrpcEnable: true,
NonValidatingFullNode: flags.DefaultNonValidatingFullNode,
DdAgentHost: flags.DefaultDdAgentHost,
DdTraceAgentPort: flags.DefaultDdTraceAgentPort,
GrpcAddress: config.DefaultGRPCAddress,
GrpcEnable: true,
FullNodeStreamingSnapshotInterval: flags.DefaultFullNodeStreamingSnapshotInterval,
},
},
"success - full node & gRPC disabled": {
Expand Down Expand Up @@ -125,6 +129,29 @@ func TestValidate(t *testing.T) {
},
expectedErr: fmt.Errorf("grpc streaming channel size must be positive number"),
},
"failure - full node streaming enabled with <= 49 snapshot interval": {
flags: flags.Flags{
NonValidatingFullNode: true,
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxChannelBufferSize: 2000,
FullNodeStreamingSnapshotInterval: 49,
},
expectedErr: fmt.Errorf("full node streaming snapshot interval must be >= 50 blocks or zero"),
},
"success - full node streaming enabled with 50 snapshot interval": {
flags: flags.Flags{
NonValidatingFullNode: true,
GrpcEnable: true,
GrpcStreamingEnabled: true,
GrpcStreamingFlushIntervalMs: 100,
GrpcStreamingMaxBatchSize: 2000,
GrpcStreamingMaxChannelBufferSize: 2000,
FullNodeStreamingSnapshotInterval: 50,
},
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -153,6 +180,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcStreamingFlushMs uint32
expectedGrpcStreamingBatchSize uint32
expectedGrpcStreamingMaxChannelBufferSize uint32
expectedFullNodeStreamingSnapshotInterval uint32
}{
"Sets to default if unset": {
expectedNonValidatingFullNodeFlag: false,
Expand All @@ -164,6 +192,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcStreamingFlushMs: 50,
expectedGrpcStreamingBatchSize: 2000,
expectedGrpcStreamingMaxChannelBufferSize: 2000,
expectedFullNodeStreamingSnapshotInterval: 0,
},
"Sets values from options": {
optsMap: map[string]any{
Expand All @@ -176,6 +205,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
flags.GrpcStreamingFlushIntervalMs: uint32(408),
flags.GrpcStreamingMaxBatchSize: uint32(650),
flags.GrpcStreamingMaxChannelBufferSize: uint32(972),
flags.FullNodeStreamingSnapshotInterval: uint32(123),
},
expectedNonValidatingFullNodeFlag: true,
expectedDdAgentHost: "agentHostTest",
Expand All @@ -186,6 +216,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
expectedGrpcStreamingFlushMs: 408,
expectedGrpcStreamingBatchSize: 650,
expectedGrpcStreamingMaxChannelBufferSize: 972,
expectedFullNodeStreamingSnapshotInterval: 123,
},
}

Expand Down Expand Up @@ -238,6 +269,11 @@ func TestGetFlagValuesFromOptions(t *testing.T) {
tc.expectedGrpcStreamingBatchSize,
flags.GrpcStreamingMaxBatchSize,
)
require.Equal(
t,
tc.expectedFullNodeStreamingSnapshotInterval,
flags.FullNodeStreamingSnapshotInterval,
)
require.Equal(
t,
tc.expectedGrpcStreamingMaxChannelBufferSize,
Expand Down
28 changes: 25 additions & 3 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package streaming

import (
"fmt"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
"sync"
"time"

satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
Expand Down Expand Up @@ -44,14 +45,18 @@ type FullNodeStreamingManagerImpl struct {

maxUpdatesInCache uint32
maxSubscriptionChannelSize uint32

// Block interval in which snapshot info should be sent out in.
// Defaults to 0, which means only one snapshot will be sent out.
snapshotBlockInterval uint32
}

// OrderbookSubscription represents a active subscription to the orderbook updates stream.
type OrderbookSubscription struct {
subscriptionId uint32

// Initialize the subscription with orderbook snapshots.
initialize sync.Once
initialize *sync.Once

// Clob pair ids to subscribe to.
clobPairIds []uint32
Expand All @@ -64,13 +69,18 @@ type OrderbookSubscription struct {

// Channel to buffer writes before the stream
updatesChannel chan []clobtypes.StreamUpdate

// If interval snapshots are turned on, the next block height at which
// a snapshot should be sent out.
nextSnapshotBlock uint32
}

func NewFullNodeStreamingManager(
logger log.Logger,
flushIntervalMs uint32,
maxUpdatesInCache uint32,
maxSubscriptionChannelSize uint32,
snapshotBlockInterval uint32,
) *FullNodeStreamingManagerImpl {
logger = logger.With(log.ModuleKey, "full-node-streaming")
fullNodeStreamingManager := &FullNodeStreamingManagerImpl{
Expand All @@ -87,6 +97,7 @@ func NewFullNodeStreamingManager(

maxUpdatesInCache: maxUpdatesInCache,
maxSubscriptionChannelSize: maxSubscriptionChannelSize,
snapshotBlockInterval: snapshotBlockInterval,
}

// Start the goroutine for pushing order updates through.
Expand Down Expand Up @@ -149,6 +160,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
}
subscription := &OrderbookSubscription{
subscriptionId: sm.nextSubscriptionId,
initialize: &sync.Once{},
clobPairIds: clobPairIds,
subaccountIds: sIds,
messageSender: messageSender,
Expand Down Expand Up @@ -674,7 +686,15 @@ func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(
sm.FlushStreamUpdatesWithLock()

updatesByClobPairId := make(map[uint32]*clobtypes.OffchainUpdates)

for subscriptionId, subscription := range sm.orderbookSubscriptions {
// If the snapshot block interval is enabled, reset the sync.Once in order to
// re-send snapshots out.
if sm.snapshotBlockInterval > 0 &&
blockHeight == subscription.nextSnapshotBlock {
subscription.initialize = &sync.Once{}
}

subscription.initialize.Do(
func() {
allUpdates := clobtypes.NewOffchainUpdates()
Expand All @@ -688,8 +708,10 @@ func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams(
for _, subaccountId := range subscription.subaccountIds {
saUpdates = append(saUpdates, getSubaccountSnapshot(subaccountId))
}

sm.SendCombinedSnapshot(allUpdates, saUpdates, subscriptionId, blockHeight, execMode)
if sm.snapshotBlockInterval != 0 {
subscription.nextSnapshotBlock = blockHeight + sm.snapshotBlockInterval
}
},
)
}
Expand Down

0 comments on commit 0fa54f4

Please sign in to comment.