Skip to content

Commit

Permalink
paginate liquidation daemon response (#2118)
Browse files Browse the repository at this point in the history
  • Loading branch information
jayy04 authored Aug 21, 2024
1 parent b441185 commit 909fc3a
Show file tree
Hide file tree
Showing 13 changed files with 389 additions and 107 deletions.
29 changes: 22 additions & 7 deletions protocol/daemons/flags/flags.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package flags

import (
"time"

servertypes "github.com/cosmos/cosmos-sdk/server/types"
oracleconfig "github.com/skip-mev/slinky/oracle/config"
"github.com/spf13/cast"
"github.com/spf13/cobra"
"time"
)

// List of CLI flags for Server and Client.
Expand All @@ -22,9 +23,10 @@ const (
FlagBridgeDaemonLoopDelayMs = "bridge-daemon-loop-delay-ms"
FlagBridgeDaemonEthRpcEndpoint = "bridge-daemon-eth-rpc-endpoint"

FlagLiquidationDaemonEnabled = "liquidation-daemon-enabled"
FlagLiquidationDaemonLoopDelayMs = "liquidation-daemon-loop-delay-ms"
FlagLiquidationDaemonQueryPageLimit = "liquidation-daemon-query-page-limit"
FlagLiquidationDaemonEnabled = "liquidation-daemon-enabled"
FlagLiquidationDaemonLoopDelayMs = "liquidation-daemon-loop-delay-ms"
FlagLiquidationDaemonQueryPageLimit = "liquidation-daemon-query-page-limit"
FlagLiquidationDaemonResponsePageLimit = "liquidation-daemon-response-page-limit"

// Oracle flags
FlagOracleEnabled = "oracle.enabled"
Expand Down Expand Up @@ -62,6 +64,8 @@ type LiquidationFlags struct {
LoopDelayMs uint32
// QueryPageLimit configures the pagination limit for fetching subaccounts.
QueryPageLimit uint64
// ResponsePageLimit configures the pagination limit for the response to application.
ResponsePageLimit uint64
}

// PriceFlags contains configuration flags for the Price Daemon.
Expand Down Expand Up @@ -102,9 +106,10 @@ func GetDefaultDaemonFlags() DaemonFlags {
EthRpcEndpoint: "",
},
Liquidation: LiquidationFlags{
Enabled: true,
LoopDelayMs: 1_600,
QueryPageLimit: 1_000,
Enabled: true,
LoopDelayMs: 1_600,
QueryPageLimit: 1_000,
ResponsePageLimit: 2_000,
},
Price: PriceFlags{
Enabled: false,
Expand Down Expand Up @@ -183,6 +188,11 @@ func AddDaemonFlagsToCmd(
df.Liquidation.QueryPageLimit,
"Limit on the number of items to fetch per query in the Liquidation Daemon task loop.",
)
cmd.Flags().Uint64(
FlagLiquidationDaemonResponsePageLimit,
df.Liquidation.ResponsePageLimit,
"Limit on the number of items to send to the main application in the Liquidation Daemon task loop.",
)

// Price Daemon.
cmd.Flags().Bool(
Expand Down Expand Up @@ -276,6 +286,11 @@ func GetDaemonFlagValuesFromOptions(
result.Liquidation.QueryPageLimit = v
}
}
if option := appOpts.Get(FlagLiquidationDaemonResponsePageLimit); option != nil {
if v, err := cast.ToUint64E(option); err == nil {
result.Liquidation.ResponsePageLimit = v
}
}

// Price Daemon.
if option := appOpts.Get(FlagPriceDaemonEnabled); option != nil {
Expand Down
3 changes: 3 additions & 0 deletions protocol/daemons/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func TestAddDaemonFlagsToCmd(t *testing.T) {
flags.FlagLiquidationDaemonEnabled,
flags.FlagLiquidationDaemonLoopDelayMs,
flags.FlagLiquidationDaemonQueryPageLimit,
flags.FlagLiquidationDaemonResponsePageLimit,

flags.FlagPriceDaemonEnabled,
flags.FlagPriceDaemonLoopDelayMs,
Expand Down Expand Up @@ -53,6 +54,7 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {
optsMap[flags.FlagLiquidationDaemonEnabled] = true
optsMap[flags.FlagLiquidationDaemonLoopDelayMs] = uint32(2222)
optsMap[flags.FlagLiquidationDaemonQueryPageLimit] = uint64(3333)
optsMap[flags.FlagLiquidationDaemonResponsePageLimit] = uint64(4444)

optsMap[flags.FlagPriceDaemonEnabled] = true
optsMap[flags.FlagPriceDaemonLoopDelayMs] = uint32(4444)
Expand Down Expand Up @@ -83,6 +85,7 @@ func TestGetDaemonFlagValuesFromOptions_Custom(t *testing.T) {
require.Equal(t, optsMap[flags.FlagLiquidationDaemonEnabled], r.Liquidation.Enabled)
require.Equal(t, optsMap[flags.FlagLiquidationDaemonLoopDelayMs], r.Liquidation.LoopDelayMs)
require.Equal(t, optsMap[flags.FlagLiquidationDaemonQueryPageLimit], r.Liquidation.QueryPageLimit)
require.Equal(t, optsMap[flags.FlagLiquidationDaemonResponsePageLimit], r.Liquidation.ResponsePageLimit)

// Price Daemon.
require.Equal(t, optsMap[flags.FlagPriceDaemonEnabled], r.Price.Enabled)
Expand Down
143 changes: 135 additions & 8 deletions protocol/daemons/liquidation/client/grpc_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func (c *Client) SendLiquidatableSubaccountIds(
liquidatableSubaccountIds []satypes.SubaccountId,
negativeTncSubaccountIds []satypes.SubaccountId,
openPositionInfoMap map[uint32]*clobtypes.SubaccountOpenPositionInfo,
pageLimit uint64,
) error {
defer telemetry.ModuleMeasureSince(
metrics.LiquidationDaemon,
Expand Down Expand Up @@ -241,19 +242,145 @@ func (c *Client) SendLiquidatableSubaccountIds(
subaccountOpenPositionInfo = append(subaccountOpenPositionInfo, *openPositionInfoMap[perpetualId])
}

request := &api.LiquidateSubaccountsRequest{
BlockHeight: blockHeight,
LiquidatableSubaccountIds: liquidatableSubaccountIds,
NegativeTncSubaccountIds: negativeTncSubaccountIds,
SubaccountOpenPositionInfo: subaccountOpenPositionInfo,
}
// Break this down to multiple requests if the number of subaccounts is too large.

// Liquidatable subaccount ids.
requests := GenerateLiquidateSubaccountsPaginatedRequests(
liquidatableSubaccountIds,
blockHeight,
pageLimit,
)

if _, err := c.LiquidationServiceClient.LiquidateSubaccounts(ctx, request); err != nil {
return err
// Negative TNC subaccount ids.
requests = append(
requests,
GenerateNegativeTNCSubaccountsPaginatedRequests(
negativeTncSubaccountIds,
blockHeight,
pageLimit,
)...,
)

// Subaccount open position info.
requests = append(
requests,
GenerateSubaccountOpenPositionPaginatedRequests(
subaccountOpenPositionInfo,
blockHeight,
pageLimit,
)...,
)

for _, req := range requests {
if _, err := c.LiquidationServiceClient.LiquidateSubaccounts(ctx, req); err != nil {
return err
}
}

return nil
}

func GenerateLiquidateSubaccountsPaginatedRequests(
ids []satypes.SubaccountId,
blockHeight uint32,
pageLimit uint64,
) []*api.LiquidateSubaccountsRequest {
if len(ids) == 0 {
return []*api.LiquidateSubaccountsRequest{
{
BlockHeight: blockHeight,
LiquidatableSubaccountIds: []satypes.SubaccountId{},
},
}
}

requests := make([]*api.LiquidateSubaccountsRequest, 0)
for start := 0; start < len(ids); start += int(pageLimit) {
end := lib.Min(start+int(pageLimit), len(ids))
request := &api.LiquidateSubaccountsRequest{
BlockHeight: blockHeight,
LiquidatableSubaccountIds: ids[start:end],
}
requests = append(requests, request)
}
return requests
}

func GenerateNegativeTNCSubaccountsPaginatedRequests(
ids []satypes.SubaccountId,
blockHeight uint32,
pageLimit uint64,
) []*api.LiquidateSubaccountsRequest {
if len(ids) == 0 {
return []*api.LiquidateSubaccountsRequest{
{
BlockHeight: blockHeight,
NegativeTncSubaccountIds: []satypes.SubaccountId{},
},
}
}

requests := make([]*api.LiquidateSubaccountsRequest, 0)
for start := 0; start < len(ids); start += int(pageLimit) {
end := lib.Min(start+int(pageLimit), len(ids))
request := &api.LiquidateSubaccountsRequest{
BlockHeight: blockHeight,
NegativeTncSubaccountIds: ids[start:end],
}
requests = append(requests, request)
}
return requests
}

func GenerateSubaccountOpenPositionPaginatedRequests(
subaccountOpenPositionInfo []clobtypes.SubaccountOpenPositionInfo,
blockHeight uint32,
pageLimit uint64,
) []*api.LiquidateSubaccountsRequest {
if len(subaccountOpenPositionInfo) == 0 {
return []*api.LiquidateSubaccountsRequest{
{
BlockHeight: blockHeight,
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{},
},
}
}

requests := make([]*api.LiquidateSubaccountsRequest, 0)
for _, info := range subaccountOpenPositionInfo {
// Long positions.
for start := 0; start < len(info.SubaccountsWithLongPosition); start += int(pageLimit) {
end := lib.Min(start+int(pageLimit), len(info.SubaccountsWithLongPosition))
request := &api.LiquidateSubaccountsRequest{
BlockHeight: blockHeight,
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{
{
PerpetualId: info.PerpetualId,
SubaccountsWithLongPosition: info.SubaccountsWithLongPosition[start:end],
},
},
}
requests = append(requests, request)
}

// Short positions.
for start := 0; start < len(info.SubaccountsWithShortPosition); start += int(pageLimit) {
end := lib.Min(start+int(pageLimit), len(info.SubaccountsWithShortPosition))
request := &api.LiquidateSubaccountsRequest{
BlockHeight: blockHeight,
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{
{
PerpetualId: info.PerpetualId,
SubaccountsWithShortPosition: info.SubaccountsWithShortPosition[start:end],
},
},
}
requests = append(requests, request)
}
}
return requests
}

func newContextWithQueryBlockHeight(
ctx context.Context,
blockHeight uint32,
Expand Down
52 changes: 43 additions & 9 deletions protocol/daemons/liquidation/client/grpc_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,22 +469,45 @@ func TestSendLiquidatableSubaccountIds(t *testing.T) {
req := &api.LiquidateSubaccountsRequest{
BlockHeight: uint32(50),
LiquidatableSubaccountIds: []satypes.SubaccountId{constants.Alice_Num0, constants.Bob_Num0},
NegativeTncSubaccountIds: []satypes.SubaccountId{constants.Carl_Num0, constants.Dave_Num0},
}
response := &api.LiquidateSubaccountsResponse{}
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)

req = &api.LiquidateSubaccountsRequest{
BlockHeight: uint32(50),
NegativeTncSubaccountIds: []satypes.SubaccountId{constants.Carl_Num0, constants.Dave_Num0},
}
response = &api.LiquidateSubaccountsResponse{}
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)

req = &api.LiquidateSubaccountsRequest{
BlockHeight: uint32(50),
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{
{
PerpetualId: 0,
SubaccountsWithLongPosition: []satypes.SubaccountId{
constants.Alice_Num0,
constants.Carl_Num0,
},
},
},
}
response = &api.LiquidateSubaccountsResponse{}
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)

req = &api.LiquidateSubaccountsRequest{
BlockHeight: uint32(50),
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{
{
PerpetualId: 0,
SubaccountsWithShortPosition: []satypes.SubaccountId{
constants.Bob_Num0,
constants.Dave_Num0,
},
},
},
}
response := &api.LiquidateSubaccountsResponse{}
response = &api.LiquidateSubaccountsResponse{}
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)
},
liquidatableSubaccountIds: []satypes.SubaccountId{
Expand Down Expand Up @@ -512,12 +535,24 @@ func TestSendLiquidatableSubaccountIds(t *testing.T) {
"Success Empty": {
setupMocks: func(ctx context.Context, mck *mocks.QueryClient) {
req := &api.LiquidateSubaccountsRequest{
BlockHeight: uint32(50),
LiquidatableSubaccountIds: []satypes.SubaccountId{},
}
response := &api.LiquidateSubaccountsResponse{}
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)

req = &api.LiquidateSubaccountsRequest{
BlockHeight: uint32(50),
NegativeTncSubaccountIds: []satypes.SubaccountId{},
}
response = &api.LiquidateSubaccountsResponse{}
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)

req = &api.LiquidateSubaccountsRequest{
BlockHeight: uint32(50),
LiquidatableSubaccountIds: []satypes.SubaccountId{},
NegativeTncSubaccountIds: []satypes.SubaccountId{},
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{},
}
response := &api.LiquidateSubaccountsResponse{}
response = &api.LiquidateSubaccountsResponse{}
mck.On("LiquidateSubaccounts", ctx, req).Return(response, nil)
},
liquidatableSubaccountIds: []satypes.SubaccountId{},
Expand All @@ -527,10 +562,8 @@ func TestSendLiquidatableSubaccountIds(t *testing.T) {
"Errors are propagated": {
setupMocks: func(ctx context.Context, mck *mocks.QueryClient) {
req := &api.LiquidateSubaccountsRequest{
BlockHeight: uint32(50),
LiquidatableSubaccountIds: []satypes.SubaccountId{},
NegativeTncSubaccountIds: []satypes.SubaccountId{},
SubaccountOpenPositionInfo: []clobtypes.SubaccountOpenPositionInfo{},
BlockHeight: uint32(50),
LiquidatableSubaccountIds: []satypes.SubaccountId{},
}
mck.On("LiquidateSubaccounts", ctx, req).Return(nil, errors.New("test error"))
},
Expand All @@ -555,6 +588,7 @@ func TestSendLiquidatableSubaccountIds(t *testing.T) {
tc.liquidatableSubaccountIds,
tc.negativeTncSubaccountIds,
tc.subaccountOpenPositionInfo,
1000,
)
require.Equal(t, tc.expectedError, err)
})
Expand Down
1 change: 1 addition & 0 deletions protocol/daemons/liquidation/client/sub_task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func (s *SubTaskRunnerImpl) RunLiquidationDaemonTaskLoop(
liquidatableSubaccountIds,
negativeTncSubaccountIds,
subaccountOpenPositionInfo,
liqFlags.ResponsePageLimit,
)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 909fc3a

Please sign in to comment.