Skip to content

Commit

Permalink
feat(rfq-relayer): add balances mutex (#3039)
Browse files Browse the repository at this point in the history
* Revert "revert(relayer): potential deadlock (#3036)" to reinstate #2994 

Co-authored-by: Daniel Wasserman <[email protected]>
  • Loading branch information
trajan0x and dwasse authored Aug 21, 2024
1 parent 4fd9ef1 commit f789485
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 12 deletions.
53 changes: 43 additions & 10 deletions services/rfq/relayer/service/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,36 @@ func (q *QuoteRequestHandler) handleSeen(ctx context.Context, span trace.Span, r
return nil
}

// check balance and mark it as CommitPending
err = q.commitPendingBalance(ctx, span, &request)
if err != nil {
return fmt.Errorf("could not commit pending balance: %w", err)
}

// immediately forward the request to handleCommitPending
span.AddEvent("forwarding to handleCommitPending")
fwdErr := q.Forward(ctx, request)
if fwdErr != nil {
logger.Errorf("could not forward to handle commit pending: %w", fwdErr)
span.AddEvent("could not forward to handle commit pending")
}

return nil
}

// commitPendingBalance locks the balance and marks the request as CommitPending.
func (q *QuoteRequestHandler) commitPendingBalance(ctx context.Context, span trace.Span, request *reldb.QuoteRequest) (err error) {
// lock the consumed balance
key := getBalanceMtxKey(q.Dest.ChainID, request.Transaction.DestToken)
span.SetAttributes(attribute.String("balance_lock_key", key))
unlocker, ok := q.balanceMtx.TryLock(key)
if !ok {
// balance is locked due to concurrent request, try again later
span.SetAttributes(attribute.Bool("locked", true))
return nil
}
defer unlocker.Unlock()

// get destination committable balance
committableBalance, err := q.Inventory.GetCommittableBalance(ctx, int(q.Dest.ChainID), request.Transaction.DestToken)
if errors.Is(err, inventory.ErrUnsupportedChain) {
Expand Down Expand Up @@ -211,14 +241,6 @@ func (q *QuoteRequestHandler) handleSeen(ctx context.Context, span trace.Span, r
return fmt.Errorf("could not update request status: %w", err)
}

// immediately forward the request to handleCommitPending
span.AddEvent("forwarding to handleCommitPending")
fwdErr := q.Forward(ctx, request)
if fwdErr != nil {
logger.Errorf("could not forward to handle commit pending: %w", fwdErr)
span.AddEvent("could not forward to handle commit pending")
}

return nil
}

Expand Down Expand Up @@ -487,12 +509,23 @@ func (q *QuoteRequestHandler) handleProofPosted(ctx context.Context, span trace.
// Error Handlers Only from this point below.
//
// handleNotEnoughInventory handles the not enough inventory status.
func (q *QuoteRequestHandler) handleNotEnoughInventory(ctx context.Context, _ trace.Span, request reldb.QuoteRequest) (err error) {
func (q *QuoteRequestHandler) handleNotEnoughInventory(ctx context.Context, span trace.Span, request reldb.QuoteRequest) (err error) {
// acquire balance lock
key := getBalanceMtxKey(q.Dest.ChainID, request.Transaction.DestToken)
span.SetAttributes(attribute.String("balance_lock_key", key))
unlocker, ok := q.balanceMtx.TryLock(key)
if !ok {
// balance is locked due to concurrent request, try again later
span.SetAttributes(attribute.Bool("locked", true))
return nil
}
defer unlocker.Unlock()

// commit destination balance
committableBalance, err := q.Inventory.GetCommittableBalance(ctx, int(q.Dest.ChainID), request.Transaction.DestToken)
if err != nil {
return fmt.Errorf("could not get committable balance: %w", err)
}
// if committableBalance > destAmount
if committableBalance.Cmp(request.Transaction.DestAmount) > 0 {
err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.CommittedPending, &request.Status)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions services/rfq/relayer/service/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ type Relayer struct {
decimalsCache *xsync.MapOf[string, *uint8]
// semaphore is used to limit the number of concurrent requests
semaphore *semaphore.Weighted
// handlerMtx is used to synchronize handling of relay requests
handlerMtx mapmutex.StringMapMutex
// handlerMtx is used to synchronize handling of relay requests, keyed on transaction ID
handlerMtx mapmutex.StringMapMutex
// balanceMtx is used to synchronize balance requests, keyed on a chainID and tokenAddress pair
balanceMtx mapmutex.StringMapMutex
otelRecorder iOtelRecorder
}

Expand Down Expand Up @@ -165,6 +167,7 @@ func NewRelayer(ctx context.Context, metricHandler metrics.Handler, cfg relconfi
apiClient: apiClient,
semaphore: semaphore.NewWeighted(maxConcurrentRequests),
handlerMtx: mapmutex.NewStringMapMutex(),
balanceMtx: mapmutex.NewStringMapMutex(),
otelRecorder: otelRecorder,
}
return &rel, nil
Expand Down
7 changes: 7 additions & 0 deletions services/rfq/relayer/service/statushandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ type QuoteRequestHandler struct {
mutexMiddlewareFunc func(func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error) func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error
// handlerMtx is the mutex for relaying.
handlerMtx mapmutex.StringMapMutex
// balanceMtx is the mutex for balances.
balanceMtx mapmutex.StringMapMutex
}

func getBalanceMtxKey(chainID uint32, token common.Address) string {
return fmt.Sprintf("%d-%s", chainID, token.Hex())
}

// Handler is the handler for a quote request.
Expand Down Expand Up @@ -81,6 +87,7 @@ func (r *Relayer) requestToHandler(ctx context.Context, req reldb.QuoteRequest)
apiClient: r.apiClient,
mutexMiddlewareFunc: r.mutexMiddleware,
handlerMtx: r.handlerMtx,
balanceMtx: r.balanceMtx,
}

// wrap in deadline middleware since the relay has not yet happened
Expand Down

0 comments on commit f789485

Please sign in to comment.