diff --git a/services/rfq/relayer/service/chainindexer.go b/services/rfq/relayer/service/chainindexer.go index 7b11646c4b..935167c8f0 100644 --- a/services/rfq/relayer/service/chainindexer.go +++ b/services/rfq/relayer/service/chainindexer.go @@ -6,6 +6,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/synapsecns/sanguine/core/metrics" "github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge" @@ -77,6 +78,10 @@ func (r *Relayer) runChainIndexer(ctx context.Context, chainID int) (err error) return fmt.Errorf("could not handle request: %w", err) } case *fastbridge.FastBridgeBridgeRelayed: + // blocking lock on the txid mutex to ensure state transitions are not overrwitten + unlocker := r.relayMtx.Lock(hexutil.Encode(event.TransactionId[:])) + defer unlocker.Unlock() + // it wasn't me if event.Relayer != r.signer.Address() { //nolint: wrapcheck @@ -88,6 +93,9 @@ func (r *Relayer) runChainIndexer(ctx context.Context, chainID int) (err error) return fmt.Errorf("could not handle relay: %w", err) } case *fastbridge.FastBridgeBridgeProofProvided: + unlocker := r.relayMtx.Lock(hexutil.Encode(event.TransactionId[:])) + defer unlocker.Unlock() + // it wasn't me if event.Relayer != r.signer.Address() { //nolint: wrapcheck @@ -99,6 +107,9 @@ func (r *Relayer) runChainIndexer(ctx context.Context, chainID int) (err error) return fmt.Errorf("could not handle proof provided: %w", err) } case *fastbridge.FastBridgeBridgeDepositClaimed: + unlocker := r.relayMtx.Lock(hexutil.Encode(event.TransactionId[:])) + defer unlocker.Unlock() + // it wasn't me if event.Relayer != r.signer.Address() { //nolint: wrapcheck diff --git a/services/rfq/relayer/service/statushandler.go b/services/rfq/relayer/service/statushandler.go index 64208e5a5b..4f8b9bbdc2 100644 --- a/services/rfq/relayer/service/statushandler.go +++ b/services/rfq/relayer/service/statushandler.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/jellydator/ttlcache/v3" + "github.com/synapsecns/sanguine/core/mapmutex" "github.com/synapsecns/sanguine/core/metrics" "github.com/synapsecns/sanguine/services/rfq/api/client" "github.com/synapsecns/sanguine/services/rfq/relayer/chain" @@ -48,6 +49,8 @@ type QuoteRequestHandler struct { // mutexMiddlewareFunc is used to wrap the handler in a mutex middleware. // this should only be done if Handling, not forwarding. mutexMiddlewareFunc func(func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error) func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error + // relayMtx is the mutex for relaying. + relayMtx mapmutex.StringMapMutex } // Handler is the handler for a quote request. @@ -76,6 +79,7 @@ func (r *Relayer) requestToHandler(ctx context.Context, req reldb.QuoteRequest) claimCache: r.claimCache, apiClient: r.apiClient, mutexMiddlewareFunc: r.mutexMiddleware, + relayMtx: r.relayMtx, } // wrap in deadline middleware since the relay has not yet happened @@ -221,14 +225,19 @@ func (q *QuoteRequestHandler) Handle(ctx context.Context, request reldb.QuoteReq // Forward forwards a quote request. // this ignores the mutex middleware. func (q *QuoteRequestHandler) Forward(ctx context.Context, request reldb.QuoteRequest) (err error) { + txID := hexutil.Encode(request.TransactionID[:]) ctx, span := q.metrics.Tracer().Start(ctx, fmt.Sprintf("forward-%s", request.Status.String()), trace.WithAttributes( - attribute.String("transaction_id", hexutil.Encode(request.TransactionID[:])), + attribute.String("transaction_id", txID), )) defer func() { metrics.EndSpanWithErr(span, err) }() - // TODO: consider adding a lock attempt/fail here as a defensive coding strategy. We *expect* stuff to be locked by the time we get to forward. + // sanity check to make sure that the lock is already acquired for this tx + _, ok := q.relayMtx.TryLock(txID) + if ok { + panic(fmt.Sprintf("attempted forward while lock was not acquired for tx: %s", txID)) + } return q.handlers[request.Status](ctx, span, request) }