Skip to content

Commit

Permalink
RFQ Relayer: use mutex on listener-triggered handlers (#2767)
Browse files Browse the repository at this point in the history
* Feat: blocking lock on handlers triggered by listener

* [goreleaser]

* Feat: lock before RelayRaceLost check

* [goreleaser]

* Feat: lock check on forwarding

* [goreleaser]

---------

Co-authored-by: Trajan0x <[email protected]>
  • Loading branch information
dwasse and trajan0x authored Jun 23, 2024
1 parent 7ecc185 commit 87afd9b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 2 deletions.
11 changes: 11 additions & 0 deletions services/rfq/relayer/service/chainindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/synapsecns/sanguine/core/metrics"
"github.com/synapsecns/sanguine/services/rfq/contracts/fastbridge"
Expand Down Expand Up @@ -77,6 +78,10 @@ func (r *Relayer) runChainIndexer(ctx context.Context, chainID int) (err error)
return fmt.Errorf("could not handle request: %w", err)
}
case *fastbridge.FastBridgeBridgeRelayed:
// blocking lock on the txid mutex to ensure state transitions are not overrwitten
unlocker := r.relayMtx.Lock(hexutil.Encode(event.TransactionId[:]))
defer unlocker.Unlock()

// it wasn't me
if event.Relayer != r.signer.Address() {
//nolint: wrapcheck
Expand All @@ -88,6 +93,9 @@ func (r *Relayer) runChainIndexer(ctx context.Context, chainID int) (err error)
return fmt.Errorf("could not handle relay: %w", err)
}
case *fastbridge.FastBridgeBridgeProofProvided:
unlocker := r.relayMtx.Lock(hexutil.Encode(event.TransactionId[:]))
defer unlocker.Unlock()

// it wasn't me
if event.Relayer != r.signer.Address() {
//nolint: wrapcheck
Expand All @@ -99,6 +107,9 @@ func (r *Relayer) runChainIndexer(ctx context.Context, chainID int) (err error)
return fmt.Errorf("could not handle proof provided: %w", err)
}
case *fastbridge.FastBridgeBridgeDepositClaimed:
unlocker := r.relayMtx.Lock(hexutil.Encode(event.TransactionId[:]))
defer unlocker.Unlock()

// it wasn't me
if event.Relayer != r.signer.Address() {
//nolint: wrapcheck
Expand Down
13 changes: 11 additions & 2 deletions services/rfq/relayer/service/statushandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/jellydator/ttlcache/v3"
"github.com/synapsecns/sanguine/core/mapmutex"
"github.com/synapsecns/sanguine/core/metrics"
"github.com/synapsecns/sanguine/services/rfq/api/client"
"github.com/synapsecns/sanguine/services/rfq/relayer/chain"
Expand Down Expand Up @@ -48,6 +49,8 @@ type QuoteRequestHandler struct {
// mutexMiddlewareFunc is used to wrap the handler in a mutex middleware.
// this should only be done if Handling, not forwarding.
mutexMiddlewareFunc func(func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error) func(ctx context.Context, span trace.Span, req reldb.QuoteRequest) error
// relayMtx is the mutex for relaying.
relayMtx mapmutex.StringMapMutex
}

// Handler is the handler for a quote request.
Expand Down Expand Up @@ -76,6 +79,7 @@ func (r *Relayer) requestToHandler(ctx context.Context, req reldb.QuoteRequest)
claimCache: r.claimCache,
apiClient: r.apiClient,
mutexMiddlewareFunc: r.mutexMiddleware,
relayMtx: r.relayMtx,
}

// wrap in deadline middleware since the relay has not yet happened
Expand Down Expand Up @@ -221,14 +225,19 @@ func (q *QuoteRequestHandler) Handle(ctx context.Context, request reldb.QuoteReq
// Forward forwards a quote request.
// this ignores the mutex middleware.
func (q *QuoteRequestHandler) Forward(ctx context.Context, request reldb.QuoteRequest) (err error) {
txID := hexutil.Encode(request.TransactionID[:])
ctx, span := q.metrics.Tracer().Start(ctx, fmt.Sprintf("forward-%s", request.Status.String()), trace.WithAttributes(
attribute.String("transaction_id", hexutil.Encode(request.TransactionID[:])),
attribute.String("transaction_id", txID),
))
defer func() {
metrics.EndSpanWithErr(span, err)
}()

// TODO: consider adding a lock attempt/fail here as a defensive coding strategy. We *expect* stuff to be locked by the time we get to forward.
// sanity check to make sure that the lock is already acquired for this tx
_, ok := q.relayMtx.TryLock(txID)
if ok {
panic(fmt.Sprintf("attempted forward while lock was not acquired for tx: %s", txID))
}

return q.handlers[request.Status](ctx, span, request)
}

0 comments on commit 87afd9b

Please sign in to comment.