From bc570d663be2b7f4c777172741e350ed6295c6fc Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 27 Aug 2024 02:30:00 -0700 Subject: [PATCH] active order fault tolerance via "best-effort" boolean return (#471) * fix: active orders overflow bug * chore: active orders observability TODOs * updates * lint * add concurrency to the order book processing in active orders * active order fault tolerance --------- Co-authored-by: Deividas Petraitis --- domain/mvc/orderbook.go | 2 +- orderbook/types/get_orders_request.go | 8 +- orderbook/usecase/orderbook_usecase.go | 82 +++++++++++-------- .../delivery/http/passthrough_handler.go | 4 +- 4 files changed, 54 insertions(+), 42 deletions(-) diff --git a/domain/mvc/orderbook.go b/domain/mvc/orderbook.go index 14bc39c19..51b93f1f2 100644 --- a/domain/mvc/orderbook.go +++ b/domain/mvc/orderbook.go @@ -15,5 +15,5 @@ type OrderBookUsecase interface { GetAllTicks(poolID uint64) (map[int64]orderbookdomain.OrderbookTick, bool) // GetOrder returns all active orderbook orders for a given address. - GetActiveOrders(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, error) + GetActiveOrders(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, bool, error) } diff --git a/orderbook/types/get_orders_request.go b/orderbook/types/get_orders_request.go index e725900f0..fe9bf097a 100644 --- a/orderbook/types/get_orders_request.go +++ b/orderbook/types/get_orders_request.go @@ -84,11 +84,12 @@ func defaultSortOrder(orderA, orderB orderbookdomain.LimitOrder) int { // GetActiveOrdersResponse represents the response for the /pools/all-orders endpoint. type GetActiveOrdersResponse struct { - Orders []orderbookdomain.LimitOrder `json:"orders"` + Orders []orderbookdomain.LimitOrder `json:"orders"` + IsBestEffort bool `json:"is_best_effort"` } // NewGetAllOrderResponse creates a new GetActiveOrdersResponse. -func NewGetAllOrderResponse(orders []orderbookdomain.LimitOrder) *GetActiveOrdersResponse { +func NewGetAllOrderResponse(orders []orderbookdomain.LimitOrder, isBestEffort bool) *GetActiveOrdersResponse { sort.Slice(orders, func(i, j int) bool { return defaultSortOrder(orders[i], orders[j]) < 0 }) @@ -100,6 +101,7 @@ func NewGetAllOrderResponse(orders []orderbookdomain.LimitOrder) *GetActiveOrder } return &GetActiveOrdersResponse{ - Orders: orders, + Orders: orders, + IsBestEffort: isBestEffort, } } diff --git a/orderbook/usecase/orderbook_usecase.go b/orderbook/usecase/orderbook_usecase.go index 091315683..73eb4be79 100644 --- a/orderbook/usecase/orderbook_usecase.go +++ b/orderbook/usecase/orderbook_usecase.go @@ -123,16 +123,17 @@ func (o *orderbookUseCaseImpl) ProcessPool(ctx context.Context, pool sqsdomain.P } // GetActiveOrders implements mvc.OrderBookUsecase. -func (o *orderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, error) { +func (o *orderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address string) ([]orderbookdomain.LimitOrder, bool, error) { orderbooks, err := o.poolsUsecease.GetAllCanonicalOrderbookPoolIDs() if err != nil { - return nil, fmt.Errorf("failed to get all canonical orderbook pool IDs: %w", err) + return nil, false, fmt.Errorf("failed to get all canonical orderbook pool IDs: %w", err) } type orderbookResult struct { - orderbookID uint64 - limitOrders []orderbookdomain.LimitOrder - err error + isBestEffort bool + orderbookID uint64 + limitOrders []orderbookdomain.LimitOrder + err error } results := make(chan orderbookResult, len(orderbooks)) @@ -141,33 +142,39 @@ func (o *orderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address stri // Process orderbooks concurrently for _, orderbook := range orderbooks { go func(orderbook domain.CanonicalOrderBooksResult) { - limitOrders, err := o.processOrderBookActiveOrders(ctx, orderbook, address) + limitOrders, isBestEffort, err := o.processOrderBookActiveOrders(ctx, orderbook, address) results <- orderbookResult{ - orderbookID: orderbook.PoolID, - limitOrders: limitOrders, - err: err, + isBestEffort: isBestEffort, + orderbookID: orderbook.PoolID, + limitOrders: limitOrders, + err: err, } }(orderbook) } // Collect results finalResults := []orderbookdomain.LimitOrder{} + isBestEffort := false + for i := 0; i < len(orderbooks); i++ { select { case result := <-results: if result.err != nil { telemetry.ProcessingOrderbookActiveOrdersErrorCounter.Inc() o.logger.Error(telemetry.ProcessingOrderbookActiveOrdersErrorMetricName, zap.Any("orderbook_id", result.orderbookID), zap.Any("err", result.err)) - return nil, result.err + return nil, false, result.err } + + isBestEffort = isBestEffort || result.isBestEffort + finalResults = append(finalResults, result.limitOrders...) case <-ctx.Done(): - return nil, ctx.Err() + return nil, false, ctx.Err() } } - return finalResults, nil + return finalResults, isBestEffort, nil } // processOrderBookActiveOrders fetches and processes the active orders for a given orderbook. @@ -179,46 +186,39 @@ func (o *orderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address stri // // For every order, if an error occurs processing the order, it is skipped rather than failing the entire process. // This is a best-effort process. -func (o *orderbookUseCaseImpl) processOrderBookActiveOrders(ctx context.Context, orderBook domain.CanonicalOrderBooksResult, ownerAddress string) ([]orderbookdomain.LimitOrder, error) { +func (o *orderbookUseCaseImpl) processOrderBookActiveOrders(ctx context.Context, orderBook domain.CanonicalOrderBooksResult, ownerAddress string) ([]orderbookdomain.LimitOrder, bool, error) { orders, count, err := o.orderBookClient.GetActiveOrders(ctx, orderBook.ContractAddress, ownerAddress) if err != nil { - return nil, err + return nil, false, err } // There are orders to process for given orderbook if count == 0 { - return nil, nil + return nil, false, nil } quoteToken, err := o.tokensUsecease.GetMetadataByChainDenom(orderBook.Quote) if err != nil { - return nil, err + return nil, false, err } baseToken, err := o.tokensUsecease.GetMetadataByChainDenom(orderBook.Base) if err != nil { - return nil, err + return nil, false, err } // Create a slice to store the results results := make([]orderbookdomain.LimitOrder, 0, len(orders)) - for _, order := range orders { - tickForOrder, ok := o.orderbookRepository.GetTickByID(orderBook.PoolID, order.TickId) - if !ok { - // Do not return error, just log and continue for fault tolerance - telemetry.GetTickByIDNotFoundCounter.Inc() - o.logger.Info(telemetry.GetTickByIDNotFoundMetricName, zap.Any("contract", orderBook.ContractAddress), zap.Any("ticks", order.TickId), zap.Any("ok", ok)) - - // Note: initialize empty tick for fault- - tickForOrder = orderbookdomain.OrderbookTick{} - } + // If we encounter + isBestEffort := false + // For each order, create a formatted limit order + for _, order := range orders { // create limit order - result, err := o.createLimitOrder( + result, err := o.createFormattedLimitOrder( + orderBook.PoolID, order, - tickForOrder.TickState, - tickForOrder.UnrealizedCancels, orderbookdomain.Asset{ Symbol: quoteToken.CoinMinimalDenom, Decimals: quoteToken.Precision, @@ -230,27 +230,37 @@ func (o *orderbookUseCaseImpl) processOrderBookActiveOrders(ctx context.Context, orderBook.ContractAddress, ) if err != nil { - o.logger.Error("failed to create limit order", zap.Any("order", order), zap.Any("err", err)) telemetry.CreateLimitOrderErrorCounter.Inc() o.logger.Error(telemetry.CreateLimitOrderErrorMetricName, zap.Any("order", order), zap.Any("err", err)) + + isBestEffort = true + continue } results = append(results, result) } - return results, nil + return results, isBestEffort, nil } -// createLimitOrder creates a limit order from the orderbook order. -func (o *orderbookUseCaseImpl) createLimitOrder( +// createFormattedLimitOrder creates a limit order from the orderbook order. +func (o *orderbookUseCaseImpl) createFormattedLimitOrder( + poolID uint64, order orderbookdomain.Order, - tickState orderbookdomain.TickState, - unrealizedCancels orderbookdomain.UnrealizedCancels, quoteAsset orderbookdomain.Asset, baseAsset orderbookdomain.Asset, orderbookAddress string, ) (orderbookdomain.LimitOrder, error) { + tickForOrder, ok := o.orderbookRepository.GetTickByID(poolID, order.TickId) + if !ok { + telemetry.GetTickByIDNotFoundCounter.Inc() + return orderbookdomain.LimitOrder{}, fmt.Errorf("tick not found %s, %d", orderbookAddress, order.TickId) + } + + tickState := tickForOrder.TickState + unrealizedCancels := tickForOrder.UnrealizedCancels + // Parse quantity as int64 quantity, err := strconv.ParseInt(order.Quantity, 10, 64) if err != nil { diff --git a/passthrough/delivery/http/passthrough_handler.go b/passthrough/delivery/http/passthrough_handler.go index 29adb428f..b4b96103f 100644 --- a/passthrough/delivery/http/passthrough_handler.go +++ b/passthrough/delivery/http/passthrough_handler.go @@ -89,12 +89,12 @@ func (a *PassthroughHandler) GetActiveOrders(c echo.Context) (err error) { return c.JSON(http.StatusBadRequest, domain.ResponseError{Message: err.Error()}) } - orders, err := a.OUsecase.GetActiveOrders(ctx, req.UserOsmoAddress) + orders, isBestEffort, err := a.OUsecase.GetActiveOrders(ctx, req.UserOsmoAddress) if err != nil { return c.JSON(http.StatusInternalServerError, domain.ResponseError{Message: err.Error()}) } - resp := types.NewGetAllOrderResponse(orders) + resp := types.NewGetAllOrderResponse(orders, isBestEffort) return c.JSON(http.StatusOK, resp) }