diff --git a/core/mapmutex/mapmutex.go b/core/mapmutex/mapmutex.go index d76f8bc495..fb19ea3270 100644 --- a/core/mapmutex/mapmutex.go +++ b/core/mapmutex/mapmutex.go @@ -12,6 +12,7 @@ import ( type untypedMapMutex interface { Lock(key interface{}) Unlocker TryLock(key interface{}) (Unlocker, bool) + Keys() []interface{} } type untypedMapMutexImpl struct { @@ -81,6 +82,18 @@ func (m *untypedMapMutexImpl) TryLock(key interface{}) (Unlocker, bool) { return nil, false } +// Keys returns all keys in the map. +func (m *untypedMapMutexImpl) Keys() []interface{} { + m.ml.Lock() + defer m.ml.Unlock() + + keys := make([]interface{}, 0, len(m.ma)) + for k := range m.ma { + keys = append(keys, k) + } + return keys +} + // Unlock releases the lock for this entry. func (me *mentry) Unlock() { m := me.m diff --git a/core/mapmutex/mapmutex_test.go b/core/mapmutex/mapmutex_test.go index 2555e4b58a..4dc7df2a52 100644 --- a/core/mapmutex/mapmutex_test.go +++ b/core/mapmutex/mapmutex_test.go @@ -52,6 +52,28 @@ func (s MapMutexSuite) TestExampleMapMutex() { NotPanics(s.T(), ExampleStringMapMutex) } +func (s MapMutexSuite) TestKeys() { + s.T().Run("StringMapMutexKeys", func(t *testing.T) { + mapMutex := mapmutex.NewStringMapMutex() + mapMutex.Lock("lock1") + Equal(t, "lock1", mapMutex.Keys()[0]) + Equal(t, 1, len(mapMutex.Keys())) + }) + s.T().Run("StringerMapMutexKeys", func(t *testing.T) { + mapMutex := mapmutex.NewStringerMapMutex() + vitalik := common.HexToAddress("0xab5801a7d398351b8be11c439e05c5b3259aec9b") + mapMutex.Lock(vitalik) + Equal(t, vitalik.String(), mapMutex.Keys()[0]) + Equal(t, 1, len(mapMutex.Keys())) + }) + s.T().Run("IntMapMutexKeys", func(t *testing.T) { + mapMutex := mapmutex.NewIntMapMutex() + mapMutex.Lock(1) + Equal(t, 1, mapMutex.Keys()[0]) + Equal(t, 1, len(mapMutex.Keys())) + }) +} + func (s MapMutexSuite) TestMapMutex() { //nolint:gosec r := rand.New(rand.NewSource(42)) diff --git a/core/mapmutex/stringer.go b/core/mapmutex/stringer.go index 02afaffc4d..e415ade7ef 100644 --- a/core/mapmutex/stringer.go +++ b/core/mapmutex/stringer.go @@ -1,11 +1,14 @@ package mapmutex -import "fmt" +import ( + "fmt" +) // StringerMapMutex is an implementation of mapMutex for the fmt.Stringer conforming types. type StringerMapMutex interface { Lock(key fmt.Stringer) Unlocker TryLock(key fmt.Stringer) (Unlocker, bool) + Keys() []string } // stringerLockerImpl is the implementation of StringerMapMutex. @@ -22,6 +25,16 @@ func (s stringerLockerImpl) Lock(key fmt.Stringer) Unlocker { return s.mapMux.Lock(key.String()) } +// Keys returns the keys of the map. +func (s stringerLockerImpl) Keys() []string { + var keys []string + for _, key := range s.mapMux.Keys() { + // nolint: forcetypeassert + keys = append(keys, key.(string)) + } + return keys +} + // NewStringerMapMutex creates an initialized locker that locks on fmt.String. func NewStringerMapMutex() StringerMapMutex { return &stringerLockerImpl{ @@ -33,6 +46,7 @@ func NewStringerMapMutex() StringerMapMutex { type StringMapMutex interface { Lock(key string) Unlocker TryLock(key string) (Unlocker, bool) + Keys() []string } // stringMutexImpl locks on a string type. @@ -57,10 +71,21 @@ func (s stringMutexImpl) TryLock(key string) (Unlocker, bool) { return s.mapMux.TryLock(key) } +// Keys returns the keys of the map. +func (s stringMutexImpl) Keys() []string { + keys := []string{} + for _, key := range s.mapMux.Keys() { + // nolint: forcetypeassert + keys = append(keys, key.(string)) + } + return keys +} + // IntMapMutex is a map mutex that allows locking on an int. type IntMapMutex interface { Lock(key int) Unlocker TryLock(key int) (Unlocker, bool) + Keys() []int } // intMapMux locks on an int. @@ -77,6 +102,16 @@ func (i intMapMux) Lock(key int) Unlocker { return i.mapMux.Lock(key) } +// Keys returns the keys of the map. +func (i intMapMux) Keys() []int { + var keys []int + for _, key := range i.mapMux.Keys() { + // nolint: forcetypeassert + keys = append(keys, key.(int)) + } + return keys +} + // NewIntMapMutex creates a map mutex for locking on an integer. func NewIntMapMutex() IntMapMutex { return &intMapMux{ diff --git a/services/rfq/relayer/reldb/base/quote.go b/services/rfq/relayer/reldb/base/quote.go index fa96d1dae1..c01556a939 100644 --- a/services/rfq/relayer/reldb/base/quote.go +++ b/services/rfq/relayer/reldb/base/quote.go @@ -89,7 +89,18 @@ func (s Store) GetQuoteResultsByStatus(ctx context.Context, matchStatuses ...rel } // UpdateQuoteRequestStatus todo: db test. -func (s Store) UpdateQuoteRequestStatus(ctx context.Context, id [32]byte, status reldb.QuoteRequestStatus) error { +func (s Store) UpdateQuoteRequestStatus(ctx context.Context, id [32]byte, status reldb.QuoteRequestStatus, prevStatus *reldb.QuoteRequestStatus) error { + if prevStatus == nil { + req, err := s.GetQuoteRequestByID(ctx, id) + if err != nil { + return fmt.Errorf("could not get quote: %w", err) + } + prevStatus = &req.Status + } + if !isValidStateTransition(*prevStatus, status) { + return nil + } + tx := s.DB().WithContext(ctx).Model(&RequestForQuote{}). Where(fmt.Sprintf("%s = ?", transactionIDFieldName), hexutil.Encode(id[:])). Update(statusFieldName, status) @@ -120,3 +131,10 @@ func (s Store) UpdateRelayNonce(ctx context.Context, id [32]byte, nonce uint64) } return nil } + +func isValidStateTransition(prevStatus, status reldb.QuoteRequestStatus) bool { + if status == reldb.DeadlineExceeded || status == reldb.WillNotProcess { + return true + } + return status >= prevStatus +} diff --git a/services/rfq/relayer/reldb/db.go b/services/rfq/relayer/reldb/db.go index 183d83d117..c63ad75527 100644 --- a/services/rfq/relayer/reldb/db.go +++ b/services/rfq/relayer/reldb/db.go @@ -24,7 +24,7 @@ type Writer interface { // StoreRebalance stores a rebalance. StoreRebalance(ctx context.Context, rebalance Rebalance) error // UpdateQuoteRequestStatus updates the status of a quote request - UpdateQuoteRequestStatus(ctx context.Context, id [32]byte, status QuoteRequestStatus) error + UpdateQuoteRequestStatus(ctx context.Context, id [32]byte, status QuoteRequestStatus, prevStatus *QuoteRequestStatus) error // UpdateRebalance updates the status of a rebalance action. // If the origin is supplied, it will be used to update the ID for the corresponding rebalance model. UpdateRebalance(ctx context.Context, rebalance Rebalance, updateID bool) error diff --git a/services/rfq/relayer/service/chainindexer.go b/services/rfq/relayer/service/chainindexer.go index 935167c8f0..b0aee18557 100644 --- a/services/rfq/relayer/service/chainindexer.go +++ b/services/rfq/relayer/service/chainindexer.go @@ -79,13 +79,13 @@ func (r *Relayer) runChainIndexer(ctx context.Context, chainID int) (err error) } case *fastbridge.FastBridgeBridgeRelayed: // blocking lock on the txid mutex to ensure state transitions are not overrwitten - unlocker := r.relayMtx.Lock(hexutil.Encode(event.TransactionId[:])) + unlocker := r.handlerMtx.Lock(hexutil.Encode(event.TransactionId[:])) defer unlocker.Unlock() // it wasn't me if event.Relayer != r.signer.Address() { //nolint: wrapcheck - return r.db.UpdateQuoteRequestStatus(ctx, event.TransactionId, reldb.RelayRaceLost) + return r.db.UpdateQuoteRequestStatus(ctx, event.TransactionId, reldb.RelayRaceLost, nil) } err = r.handleRelayLog(ctx, event) @@ -93,13 +93,13 @@ func (r *Relayer) runChainIndexer(ctx context.Context, chainID int) (err error) return fmt.Errorf("could not handle relay: %w", err) } case *fastbridge.FastBridgeBridgeProofProvided: - unlocker := r.relayMtx.Lock(hexutil.Encode(event.TransactionId[:])) + unlocker := r.handlerMtx.Lock(hexutil.Encode(event.TransactionId[:])) defer unlocker.Unlock() // it wasn't me if event.Relayer != r.signer.Address() { //nolint: wrapcheck - return r.db.UpdateQuoteRequestStatus(ctx, event.TransactionId, reldb.RelayRaceLost) + return r.db.UpdateQuoteRequestStatus(ctx, event.TransactionId, reldb.RelayRaceLost, nil) } err = r.handleProofProvided(ctx, event) @@ -107,13 +107,13 @@ func (r *Relayer) runChainIndexer(ctx context.Context, chainID int) (err error) return fmt.Errorf("could not handle proof provided: %w", err) } case *fastbridge.FastBridgeBridgeDepositClaimed: - unlocker := r.relayMtx.Lock(hexutil.Encode(event.TransactionId[:])) + unlocker := r.handlerMtx.Lock(hexutil.Encode(event.TransactionId[:])) defer unlocker.Unlock() // it wasn't me if event.Relayer != r.signer.Address() { //nolint: wrapcheck - return r.db.UpdateQuoteRequestStatus(ctx, event.TransactionId, reldb.RelayRaceLost) + return r.db.UpdateQuoteRequestStatus(ctx, event.TransactionId, reldb.RelayRaceLost, nil) } err = r.handleDepositClaimed(ctx, event) @@ -206,7 +206,7 @@ func getDecimalsKey(addr common.Address, chainID uint32) string { } func (r *Relayer) handleDepositClaimed(ctx context.Context, event *fastbridge.FastBridgeBridgeDepositClaimed) error { - err := r.db.UpdateQuoteRequestStatus(ctx, event.TransactionId, reldb.ClaimCompleted) + err := r.db.UpdateQuoteRequestStatus(ctx, event.TransactionId, reldb.ClaimCompleted, nil) if err != nil { return fmt.Errorf("could not update request status: %w", err) } diff --git a/services/rfq/relayer/service/handlers.go b/services/rfq/relayer/service/handlers.go index 493b7092ca..d8f85ebb1f 100644 --- a/services/rfq/relayer/service/handlers.go +++ b/services/rfq/relayer/service/handlers.go @@ -37,7 +37,7 @@ func (r *Relayer) handleBridgeRequestedLog(parentCtx context.Context, req *fastb metrics.EndSpanWithErr(span, err) }() - unlocker, ok := r.relayMtx.TryLock(hexutil.Encode(req.TransactionId[:])) + unlocker, ok := r.handlerMtx.TryLock(hexutil.Encode(req.TransactionId[:])) if !ok { span.SetAttributes(attribute.Bool("locked", true)) // already processing this request @@ -142,7 +142,7 @@ func (q *QuoteRequestHandler) handleSeen(ctx context.Context, span trace.Span, r return fmt.Errorf("could not determine if should process: %w", err) } if !shouldProcess { - err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.WillNotProcess) + err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.WillNotProcess, &request.Status) if err != nil { return fmt.Errorf("could not update request status: %w", err) } @@ -167,7 +167,7 @@ func (q *QuoteRequestHandler) handleSeen(ctx context.Context, span trace.Span, r if errors.Is(err, inventory.ErrUnsupportedChain) { // don't process request if chain is currently unsupported span.AddEvent("dropping unsupported chain") - err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.WillNotProcess) + err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.WillNotProcess, &request.Status) if err != nil { return fmt.Errorf("could not update request status: %w", err) } @@ -179,7 +179,7 @@ func (q *QuoteRequestHandler) handleSeen(ctx context.Context, span trace.Span, r // check if we have enough inventory to handle the request if committableBalance.Cmp(request.Transaction.DestAmount) < 0 { - err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.NotEnoughInventory) + err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.NotEnoughInventory, &request.Status) if err != nil { return fmt.Errorf("could not update request status: %w", err) } @@ -206,7 +206,7 @@ func (q *QuoteRequestHandler) handleSeen(ctx context.Context, span trace.Span, r } request.Status = reldb.CommittedPending - err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.CommittedPending) + err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.CommittedPending, &request.Status) if err != nil { return fmt.Errorf("could not update request status: %w", err) } @@ -270,7 +270,7 @@ func (q *QuoteRequestHandler) handleCommitPending(ctx context.Context, span trac } request.Status = reldb.CommittedConfirmed - err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.CommittedConfirmed) + err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.CommittedConfirmed, &request.Status) if err != nil { return fmt.Errorf("could not update request status: %w", err) } @@ -300,7 +300,7 @@ func (q *QuoteRequestHandler) handleCommitConfirmed(ctx context.Context, span tr span.AddEvent("relay successfully submitted") span.SetAttributes(attribute.Int("relay_nonce", int(nonce))) - err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.RelayStarted) + err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.RelayStarted, &request.Status) if err != nil { return fmt.Errorf("could not update quote request status: %w", err) } @@ -332,7 +332,7 @@ func (r *Relayer) handleRelayLog(ctx context.Context, req *fastbridge.FastBridge } // TODO: this can still get re-orged - err = r.db.UpdateQuoteRequestStatus(ctx, req.TransactionId, reldb.RelayCompleted) + err = r.db.UpdateQuoteRequestStatus(ctx, req.TransactionId, reldb.RelayCompleted, nil) if err != nil { return fmt.Errorf("could not update request status: %w", err) } @@ -361,7 +361,7 @@ func (q *QuoteRequestHandler) handleRelayCompleted(ctx context.Context, _ trace. return fmt.Errorf("could not submit transaction: %w", err) } - err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.ProvePosting) + err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.ProvePosting, &request.Status) if err != nil { return fmt.Errorf("could not update request status: %w", err) } @@ -375,7 +375,7 @@ func (q *QuoteRequestHandler) handleRelayCompleted(ctx context.Context, _ trace. func (r *Relayer) handleProofProvided(ctx context.Context, req *fastbridge.FastBridgeBridgeProofProvided) (err error) { // TODO: this can still get re-orged // ALso: we should make sure the previous status is ProvePosting - err = r.db.UpdateQuoteRequestStatus(ctx, req.TransactionId, reldb.ProvePosted) + err = r.db.UpdateQuoteRequestStatus(ctx, req.TransactionId, reldb.ProvePosted, nil) if err != nil { return fmt.Errorf("could not update request status: %w", err) } @@ -408,7 +408,7 @@ func (q *QuoteRequestHandler) handleProofPosted(ctx context.Context, _ trace.Spa } if bs == fastbridge.RelayerClaimed.Int() { - err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.ClaimCompleted) + err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.ClaimCompleted, &request.Status) if err != nil { return fmt.Errorf("could not update request status: %w", err) } @@ -443,7 +443,7 @@ func (q *QuoteRequestHandler) handleProofPosted(ctx context.Context, _ trace.Spa return fmt.Errorf("could not submit transaction: %w", err) } - err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.ClaimPending) + err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.ClaimPending, &request.Status) if err != nil { return fmt.Errorf("could not update request status: %w", err) } @@ -460,7 +460,7 @@ func (q *QuoteRequestHandler) handleNotEnoughInventory(ctx context.Context, _ tr } // if committableBalance > destAmount if committableBalance.Cmp(request.Transaction.DestAmount) > 0 { - err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.CommittedPending) + err = q.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.CommittedPending, &request.Status) if err != nil { return fmt.Errorf("could not update request status: %w", err) } diff --git a/services/rfq/relayer/service/relayer.go b/services/rfq/relayer/service/relayer.go index 819316e5d3..ace4355d54 100644 --- a/services/rfq/relayer/service/relayer.go +++ b/services/rfq/relayer/service/relayer.go @@ -58,8 +58,8 @@ type Relayer struct { decimalsCache *xsync.MapOf[string, *uint8] // semaphore is used to limit the number of concurrent requests semaphore *semaphore.Weighted - // relayMtx is used to synchronize handling of relay requests - relayMtx mapmutex.StringMapMutex + // handlerMtx is used to synchronize handling of relay requests + handlerMtx mapmutex.StringMapMutex } var logger = log.Logger("relayer") @@ -155,7 +155,7 @@ func NewRelayer(ctx context.Context, metricHandler metrics.Handler, cfg relconfi apiServer: apiServer, apiClient: apiClient, semaphore: semaphore.NewWeighted(maxConcurrentRequests), - relayMtx: mapmutex.NewStringMapMutex(), + handlerMtx: mapmutex.NewStringMapMutex(), } return &rel, nil } @@ -391,7 +391,7 @@ func (r *Relayer) processRequest(parentCtx context.Context, request reldb.QuoteR // if deadline < now if request.Transaction.Deadline.Cmp(big.NewInt(time.Now().Unix())) < 0 && request.Status.Int() < reldb.RelayCompleted.Int() { - err = r.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.DeadlineExceeded) + err = r.db.UpdateQuoteRequestStatus(ctx, request.TransactionID, reldb.DeadlineExceeded, &request.Status) if err != nil { return fmt.Errorf("could not update request status: %w", err) } diff --git a/services/rfq/relayer/service/statushandler.go b/services/rfq/relayer/service/statushandler.go index 3034f9174e..c329260926 100644 --- a/services/rfq/relayer/service/statushandler.go +++ b/services/rfq/relayer/service/statushandler.go @@ -49,8 +49,8 @@ type QuoteRequestHandler struct { // mutexMiddlewareFunc is used to wrap the handler in a mutex middleware. // this should only be done if Handling, not forwarding. mutexMiddlewareFunc func(func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error) func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error - // relayMtx is the mutex for relaying. - relayMtx mapmutex.StringMapMutex + // handlerMtx is the mutex for relaying. + handlerMtx mapmutex.StringMapMutex } // Handler is the handler for a quote request. @@ -79,7 +79,7 @@ func (r *Relayer) requestToHandler(ctx context.Context, req reldb.QuoteRequest) claimCache: r.claimCache, apiClient: r.apiClient, mutexMiddlewareFunc: r.mutexMiddleware, - relayMtx: r.relayMtx, + handlerMtx: r.handlerMtx, } // wrap in deadline middleware since the relay has not yet happened @@ -102,9 +102,12 @@ func (r *Relayer) requestToHandler(ctx context.Context, req reldb.QuoteRequest) func (r *Relayer) mutexMiddleware(next func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error) func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error { return func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) (err error) { - unlocker, ok := r.relayMtx.TryLock(hexutil.Encode(req.TransactionID[:])) + unlocker, ok := r.handlerMtx.TryLock(hexutil.Encode(req.TransactionID[:])) if !ok { - span.SetAttributes(attribute.Bool("locked", true)) + span.SetAttributes( + attribute.Bool("locked", true), + attribute.StringSlice("current_locks", r.handlerMtx.Keys()), + ) return nil } defer unlocker.Unlock() @@ -138,7 +141,7 @@ func (r *Relayer) deadlineMiddleware(next func(ctx context.Context, span trace.S // if deadline < now, we don't even have to bother calling the underlying function if req.Transaction.Deadline.Cmp(big.NewInt(almostNow.Unix())) < 0 { - err := r.db.UpdateQuoteRequestStatus(ctx, req.TransactionID, reldb.DeadlineExceeded) + err := r.db.UpdateQuoteRequestStatus(ctx, req.TransactionID, reldb.DeadlineExceeded, &req.Status) if err != nil { return fmt.Errorf("could not update request status: %w", err) } @@ -248,7 +251,7 @@ func (q *QuoteRequestHandler) Forward(ctx context.Context, request reldb.QuoteRe }() // sanity check to make sure that the lock is already acquired for this tx - _, ok := q.relayMtx.TryLock(txID) + _, ok := q.handlerMtx.TryLock(txID) if ok { panic(fmt.Sprintf("attempted forward while lock was not acquired for tx: %s", txID)) }