From 0fa54f4abe0676333246d2fb5f14980ee4004db1 Mon Sep 17 00:00:00 2001 From: Jonathan Fung <121899091+jonfung-dydx@users.noreply.github.com> Date: Tue, 13 Aug 2024 13:51:07 -0700 Subject: [PATCH] Full Node Streaming Recurring snapshots (#2079) --- protocol/app/app.go | 6 ++- protocol/app/flags/flags.go | 20 ++++++++ protocol/app/flags/flags_test.go | 46 +++++++++++++++++-- .../streaming/full_node_streaming_manager.go | 28 +++++++++-- 4 files changed, 91 insertions(+), 9 deletions(-) diff --git a/protocol/app/app.go b/protocol/app/app.go index 46e9f3c976..a6f7bf3b43 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -1917,12 +1917,16 @@ func getFullNodeStreamingManagerFromOptions( logger log.Logger, ) (manager streamingtypes.FullNodeStreamingManager) { if appFlags.GrpcStreamingEnabled { - logger.Info("GRPC streaming is enabled") + logger.Info("GRPC streaming is enabled", log.ModuleKey, "full-node-streaming") + if appFlags.FullNodeStreamingSnapshotInterval > 0 { + logger.Info("Interval snapshots enabled", log.ModuleKey, "full-node-streaming") + } return streaming.NewFullNodeStreamingManager( logger, appFlags.GrpcStreamingFlushIntervalMs, appFlags.GrpcStreamingMaxBatchSize, appFlags.GrpcStreamingMaxChannelBufferSize, + appFlags.FullNodeStreamingSnapshotInterval, ) } return streaming.NewNoopGrpcStreamingManager() diff --git a/protocol/app/flags/flags.go b/protocol/app/flags/flags.go index 5756b69162..e68a22743e 100644 --- a/protocol/app/flags/flags.go +++ b/protocol/app/flags/flags.go @@ -25,6 +25,7 @@ type Flags struct { GrpcStreamingFlushIntervalMs uint32 GrpcStreamingMaxBatchSize uint32 GrpcStreamingMaxChannelBufferSize uint32 + FullNodeStreamingSnapshotInterval uint32 VEOracleEnabled bool // Slinky Vote Extensions } @@ -45,6 +46,7 @@ const ( GrpcStreamingFlushIntervalMs = "grpc-streaming-flush-interval-ms" GrpcStreamingMaxBatchSize = "grpc-streaming-max-batch-size" GrpcStreamingMaxChannelBufferSize = "grpc-streaming-max-channel-buffer-size" + FullNodeStreamingSnapshotInterval = "fns-snapshot-interval" // Slinky VEs enabled VEOracleEnabled = "slinky-vote-extension-oracle-enabled" @@ -61,6 +63,7 @@ const ( DefaultGrpcStreamingFlushIntervalMs = 50 DefaultGrpcStreamingMaxBatchSize = 2000 DefaultGrpcStreamingMaxChannelBufferSize = 2000 + DefaultFullNodeStreamingSnapshotInterval = 0 DefaultVEOracleEnabled = true ) @@ -111,6 +114,12 @@ func AddFlagsToCmd(cmd *cobra.Command) { DefaultGrpcStreamingMaxChannelBufferSize, "Maximum per-subscription channel size before grpc streaming cancels a singular subscription", ) + cmd.Flags().Uint32( + FullNodeStreamingSnapshotInterval, + DefaultFullNodeStreamingSnapshotInterval, + "If set to positive number, number of blocks between each periodic snapshot will be sent out. "+ + "Defaults to zero for regular behavior of one initial snapshot.", + ) cmd.Flags().Bool( VEOracleEnabled, DefaultVEOracleEnabled, @@ -140,6 +149,10 @@ func (f *Flags) Validate() error { return fmt.Errorf("grpc streaming channel size must be positive number") } } + if f.FullNodeStreamingSnapshotInterval > 0 && f.FullNodeStreamingSnapshotInterval < 50 { + return fmt.Errorf("full node streaming snapshot interval must be >= 50 blocks or zero") + } + return nil } @@ -163,6 +176,7 @@ func GetFlagValuesFromOptions( GrpcStreamingFlushIntervalMs: DefaultGrpcStreamingFlushIntervalMs, GrpcStreamingMaxBatchSize: DefaultGrpcStreamingMaxBatchSize, GrpcStreamingMaxChannelBufferSize: DefaultGrpcStreamingMaxChannelBufferSize, + FullNodeStreamingSnapshotInterval: DefaultFullNodeStreamingSnapshotInterval, VEOracleEnabled: true, } @@ -228,6 +242,12 @@ func GetFlagValuesFromOptions( } } + if option := appOpts.Get(FullNodeStreamingSnapshotInterval); option != nil { + if v, err := cast.ToUint32E(option); err == nil { + result.FullNodeStreamingSnapshotInterval = v + } + } + if option := appOpts.Get(VEOracleEnabled); option != nil { if v, err := cast.ToBoolE(option); err == nil { result.VEOracleEnabled = v diff --git a/protocol/app/flags/flags_test.go b/protocol/app/flags/flags_test.go index 4b5da76819..99f4f2478b 100644 --- a/protocol/app/flags/flags_test.go +++ b/protocol/app/flags/flags_test.go @@ -38,6 +38,9 @@ func TestAddFlagsToCommand(t *testing.T) { fmt.Sprintf("Has %s flag", flags.GrpcStreamingMaxBatchSize): { flagName: flags.GrpcStreamingMaxBatchSize, }, + fmt.Sprintf("Has %s flag", flags.FullNodeStreamingSnapshotInterval): { + flagName: flags.FullNodeStreamingSnapshotInterval, + }, fmt.Sprintf("Has %s flag", flags.GrpcStreamingMaxChannelBufferSize): { flagName: flags.GrpcStreamingMaxChannelBufferSize, }, @@ -57,11 +60,12 @@ func TestValidate(t *testing.T) { }{ "success (default values)": { flags: flags.Flags{ - NonValidatingFullNode: flags.DefaultNonValidatingFullNode, - DdAgentHost: flags.DefaultDdAgentHost, - DdTraceAgentPort: flags.DefaultDdTraceAgentPort, - GrpcAddress: config.DefaultGRPCAddress, - GrpcEnable: true, + NonValidatingFullNode: flags.DefaultNonValidatingFullNode, + DdAgentHost: flags.DefaultDdAgentHost, + DdTraceAgentPort: flags.DefaultDdTraceAgentPort, + GrpcAddress: config.DefaultGRPCAddress, + GrpcEnable: true, + FullNodeStreamingSnapshotInterval: flags.DefaultFullNodeStreamingSnapshotInterval, }, }, "success - full node & gRPC disabled": { @@ -125,6 +129,29 @@ func TestValidate(t *testing.T) { }, expectedErr: fmt.Errorf("grpc streaming channel size must be positive number"), }, + "failure - full node streaming enabled with <= 49 snapshot interval": { + flags: flags.Flags{ + NonValidatingFullNode: true, + GrpcEnable: true, + GrpcStreamingEnabled: true, + GrpcStreamingFlushIntervalMs: 100, + GrpcStreamingMaxBatchSize: 2000, + GrpcStreamingMaxChannelBufferSize: 2000, + FullNodeStreamingSnapshotInterval: 49, + }, + expectedErr: fmt.Errorf("full node streaming snapshot interval must be >= 50 blocks or zero"), + }, + "success - full node streaming enabled with 50 snapshot interval": { + flags: flags.Flags{ + NonValidatingFullNode: true, + GrpcEnable: true, + GrpcStreamingEnabled: true, + GrpcStreamingFlushIntervalMs: 100, + GrpcStreamingMaxBatchSize: 2000, + GrpcStreamingMaxChannelBufferSize: 2000, + FullNodeStreamingSnapshotInterval: 50, + }, + }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { @@ -153,6 +180,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcStreamingFlushMs uint32 expectedGrpcStreamingBatchSize uint32 expectedGrpcStreamingMaxChannelBufferSize uint32 + expectedFullNodeStreamingSnapshotInterval uint32 }{ "Sets to default if unset": { expectedNonValidatingFullNodeFlag: false, @@ -164,6 +192,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcStreamingFlushMs: 50, expectedGrpcStreamingBatchSize: 2000, expectedGrpcStreamingMaxChannelBufferSize: 2000, + expectedFullNodeStreamingSnapshotInterval: 0, }, "Sets values from options": { optsMap: map[string]any{ @@ -176,6 +205,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) { flags.GrpcStreamingFlushIntervalMs: uint32(408), flags.GrpcStreamingMaxBatchSize: uint32(650), flags.GrpcStreamingMaxChannelBufferSize: uint32(972), + flags.FullNodeStreamingSnapshotInterval: uint32(123), }, expectedNonValidatingFullNodeFlag: true, expectedDdAgentHost: "agentHostTest", @@ -186,6 +216,7 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcStreamingFlushMs: 408, expectedGrpcStreamingBatchSize: 650, expectedGrpcStreamingMaxChannelBufferSize: 972, + expectedFullNodeStreamingSnapshotInterval: 123, }, } @@ -238,6 +269,11 @@ func TestGetFlagValuesFromOptions(t *testing.T) { tc.expectedGrpcStreamingBatchSize, flags.GrpcStreamingMaxBatchSize, ) + require.Equal( + t, + tc.expectedFullNodeStreamingSnapshotInterval, + flags.FullNodeStreamingSnapshotInterval, + ) require.Equal( t, tc.expectedGrpcStreamingMaxChannelBufferSize, diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index c53b76e2c0..12bc38cfed 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -2,10 +2,11 @@ package streaming import ( "fmt" - satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" "sync" "time" + satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" + "cosmossdk.io/log" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/dydxprotocol/v4-chain/protocol/lib/metrics" @@ -44,6 +45,10 @@ type FullNodeStreamingManagerImpl struct { maxUpdatesInCache uint32 maxSubscriptionChannelSize uint32 + + // Block interval in which snapshot info should be sent out in. + // Defaults to 0, which means only one snapshot will be sent out. + snapshotBlockInterval uint32 } // OrderbookSubscription represents a active subscription to the orderbook updates stream. @@ -51,7 +56,7 @@ type OrderbookSubscription struct { subscriptionId uint32 // Initialize the subscription with orderbook snapshots. - initialize sync.Once + initialize *sync.Once // Clob pair ids to subscribe to. clobPairIds []uint32 @@ -64,6 +69,10 @@ type OrderbookSubscription struct { // Channel to buffer writes before the stream updatesChannel chan []clobtypes.StreamUpdate + + // If interval snapshots are turned on, the next block height at which + // a snapshot should be sent out. + nextSnapshotBlock uint32 } func NewFullNodeStreamingManager( @@ -71,6 +80,7 @@ func NewFullNodeStreamingManager( flushIntervalMs uint32, maxUpdatesInCache uint32, maxSubscriptionChannelSize uint32, + snapshotBlockInterval uint32, ) *FullNodeStreamingManagerImpl { logger = logger.With(log.ModuleKey, "full-node-streaming") fullNodeStreamingManager := &FullNodeStreamingManagerImpl{ @@ -87,6 +97,7 @@ func NewFullNodeStreamingManager( maxUpdatesInCache: maxUpdatesInCache, maxSubscriptionChannelSize: maxSubscriptionChannelSize, + snapshotBlockInterval: snapshotBlockInterval, } // Start the goroutine for pushing order updates through. @@ -149,6 +160,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( } subscription := &OrderbookSubscription{ subscriptionId: sm.nextSubscriptionId, + initialize: &sync.Once{}, clobPairIds: clobPairIds, subaccountIds: sIds, messageSender: messageSender, @@ -674,7 +686,15 @@ func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams( sm.FlushStreamUpdatesWithLock() updatesByClobPairId := make(map[uint32]*clobtypes.OffchainUpdates) + for subscriptionId, subscription := range sm.orderbookSubscriptions { + // If the snapshot block interval is enabled, reset the sync.Once in order to + // re-send snapshots out. + if sm.snapshotBlockInterval > 0 && + blockHeight == subscription.nextSnapshotBlock { + subscription.initialize = &sync.Once{} + } + subscription.initialize.Do( func() { allUpdates := clobtypes.NewOffchainUpdates() @@ -688,8 +708,10 @@ func (sm *FullNodeStreamingManagerImpl) InitializeNewStreams( for _, subaccountId := range subscription.subaccountIds { saUpdates = append(saUpdates, getSubaccountSnapshot(subaccountId)) } - sm.SendCombinedSnapshot(allUpdates, saUpdates, subscriptionId, blockHeight, execMode) + if sm.snapshotBlockInterval != 0 { + subscription.nextSnapshotBlock = blockHeight + sm.snapshotBlockInterval + } }, ) }