From f789485803b2aca0c7052bd2524a54e34a3bf573 Mon Sep 17 00:00:00 2001 From: trajan0x <83933037+trajan0x@users.noreply.github.com> Date: Wed, 21 Aug 2024 13:12:25 -0400 Subject: [PATCH] feat(rfq-relayer): add balances mutex (#3039) * Revert "revert(relayer): potential deadlock (#3036)" to reinstate #2994 Co-authored-by: Daniel Wasserman --- services/rfq/relayer/service/handlers.go | 53 +++++++++++++++---- services/rfq/relayer/service/relayer.go | 7 ++- services/rfq/relayer/service/statushandler.go | 7 +++ 3 files changed, 55 insertions(+), 12 deletions(-) diff --git a/services/rfq/relayer/service/handlers.go b/services/rfq/relayer/service/handlers.go index 46b00e9ecc..7b4670e20c 100644 --- a/services/rfq/relayer/service/handlers.go +++ b/services/rfq/relayer/service/handlers.go @@ -162,6 +162,36 @@ func (q *QuoteRequestHandler) handleSeen(ctx context.Context, span trace.Span, r return nil } + // check balance and mark it as CommitPending + err = q.commitPendingBalance(ctx, span, &request) + if err != nil { + return fmt.Errorf("could not commit pending balance: %w", err) + } + + // immediately forward the request to handleCommitPending + span.AddEvent("forwarding to handleCommitPending") + fwdErr := q.Forward(ctx, request) + if fwdErr != nil { + logger.Errorf("could not forward to handle commit pending: %w", fwdErr) + span.AddEvent("could not forward to handle commit pending") + } + + return nil +} + +// commitPendingBalance locks the balance and marks the request as CommitPending. +func (q *QuoteRequestHandler) commitPendingBalance(ctx context.Context, span trace.Span, request *reldb.QuoteRequest) (err error) { + // lock the consumed balance + key := getBalanceMtxKey(q.Dest.ChainID, request.Transaction.DestToken) + span.SetAttributes(attribute.String("balance_lock_key", key)) + unlocker, ok := q.balanceMtx.TryLock(key) + if !ok { + // balance is locked due to concurrent request, try again later + span.SetAttributes(attribute.Bool("locked", true)) + return nil + } + defer unlocker.Unlock() + // get destination committable balance committableBalance, err := q.Inventory.GetCommittableBalance(ctx, int(q.Dest.ChainID), request.Transaction.DestToken) if errors.Is(err, inventory.ErrUnsupportedChain) { @@ -211,14 +241,6 @@ func (q *QuoteRequestHandler) handleSeen(ctx context.Context, span trace.Span, r return fmt.Errorf("could not update request status: %w", err) } - // immediately forward the request to handleCommitPending - span.AddEvent("forwarding to handleCommitPending") - fwdErr := q.Forward(ctx, request) - if fwdErr != nil { - logger.Errorf("could not forward to handle commit pending: %w", fwdErr) - span.AddEvent("could not forward to handle commit pending") - } - return nil } @@ -487,12 +509,23 @@ func (q *QuoteRequestHandler) handleProofPosted(ctx context.Context, span trace. // Error Handlers Only from this point below. // // handleNotEnoughInventory handles the not enough inventory status. -func (q *QuoteRequestHandler) handleNotEnoughInventory(ctx context.Context, _ trace.Span, request reldb.QuoteRequest) (err error) { +func (q *QuoteRequestHandler) handleNotEnoughInventory(ctx context.Context, span trace.Span, request reldb.QuoteRequest) (err error) { + // acquire balance lock + key := getBalanceMtxKey(q.Dest.ChainID, request.Transaction.DestToken) + span.SetAttributes(attribute.String("balance_lock_key", key)) + unlocker, ok := q.balanceMtx.TryLock(key) + if !ok { + // balance is locked due to concurrent request, try again later + span.SetAttributes(attribute.Bool("locked", true)) + return nil + } + defer unlocker.Unlock() + + // commit destination balance committableBalance, err := q.Inventory.GetCommittableBalance(ctx, int(q.Dest.ChainID), request.Transaction.DestToken) if err != nil { return fmt.Errorf("could not get committable balance: %w", err) } - // if committableBalance > destAmount if committableBalance.Cmp(request.Transaction.DestAmount) > 0 { err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.CommittedPending, &request.Status) if err != nil { diff --git a/services/rfq/relayer/service/relayer.go b/services/rfq/relayer/service/relayer.go index 9726c68678..bf3d7a3b2b 100644 --- a/services/rfq/relayer/service/relayer.go +++ b/services/rfq/relayer/service/relayer.go @@ -61,8 +61,10 @@ type Relayer struct { decimalsCache *xsync.MapOf[string, *uint8] // semaphore is used to limit the number of concurrent requests semaphore *semaphore.Weighted - // handlerMtx is used to synchronize handling of relay requests - handlerMtx mapmutex.StringMapMutex + // handlerMtx is used to synchronize handling of relay requests, keyed on transaction ID + handlerMtx mapmutex.StringMapMutex + // balanceMtx is used to synchronize balance requests, keyed on a chainID and tokenAddress pair + balanceMtx mapmutex.StringMapMutex otelRecorder iOtelRecorder } @@ -165,6 +167,7 @@ func NewRelayer(ctx context.Context, metricHandler metrics.Handler, cfg relconfi apiClient: apiClient, semaphore: semaphore.NewWeighted(maxConcurrentRequests), handlerMtx: mapmutex.NewStringMapMutex(), + balanceMtx: mapmutex.NewStringMapMutex(), otelRecorder: otelRecorder, } return &rel, nil diff --git a/services/rfq/relayer/service/statushandler.go b/services/rfq/relayer/service/statushandler.go index 6aedc1c331..8e6663fd2e 100644 --- a/services/rfq/relayer/service/statushandler.go +++ b/services/rfq/relayer/service/statushandler.go @@ -52,6 +52,12 @@ type QuoteRequestHandler struct { mutexMiddlewareFunc func(func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error) func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error // handlerMtx is the mutex for relaying. handlerMtx mapmutex.StringMapMutex + // balanceMtx is the mutex for balances. + balanceMtx mapmutex.StringMapMutex +} + +func getBalanceMtxKey(chainID uint32, token common.Address) string { + return fmt.Sprintf("%d-%s", chainID, token.Hex()) } // Handler is the handler for a quote request. @@ -81,6 +87,7 @@ func (r *Relayer) requestToHandler(ctx context.Context, req reldb.QuoteRequest) apiClient: r.apiClient, mutexMiddlewareFunc: r.mutexMiddleware, handlerMtx: r.handlerMtx, + balanceMtx: r.balanceMtx, } // wrap in deadline middleware since the relay has not yet happened