diff --git a/multinode/adaptor.go b/multinode/adaptor.go index f17dc1c..b394424 100644 --- a/multinode/adaptor.go +++ b/multinode/adaptor.go @@ -22,17 +22,17 @@ type Adapter[RPC any, HEAD Head] struct { log logger.Logger rpc *RPC ctxTimeout time.Duration - StateMu sync.RWMutex // protects state* fields + stateMu sync.RWMutex // protects state* fields subsSliceMu sync.RWMutex subs map[Subscription]struct{} latestBlock func(ctx context.Context, rpc *RPC) (HEAD, error) latestFinalizedBlock func(ctx context.Context, rpc *RPC) (HEAD, error) - // ChStopInFlight can be closed to immediately cancel all in-flight requests on + // chStopInFlight can be closed to immediately cancel all in-flight requests on // this RpcMultiNodeAdapter. Closing and replacing should be serialized through - // StateMu since it can happen on state transitions as well as RpcMultiNodeAdapter Close. - ChStopInFlight chan struct{} + // stateMu since it can happen on state transitions as well as RpcMultiNodeAdapter Close. + chStopInFlight chan struct{} chainInfoLock sync.RWMutex // intercepted values seen by callers of the rpcMultiNodeAdapter excluding health check calls. Need to ensure MultiNode provides repeatable read guarantee @@ -54,7 +54,7 @@ func NewAdapter[RPC any, HEAD Head]( latestBlock: latestBlock, latestFinalizedBlock: latestFinalizedBlock, subs: make(map[Subscription]struct{}), - ChStopInFlight: make(chan struct{}), + chStopInFlight: make(chan struct{}), } } @@ -146,7 +146,7 @@ func (m *Adapter[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-c } func (m *Adapter[RPC, HEAD]) LatestBlock(ctx context.Context) (HEAD, error) { - // capture ChStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle + // capture chStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle ctx, cancel, chStopInFlight, rpc := m.AcquireQueryCtx(ctx, m.ctxTimeout) defer cancel() @@ -238,11 +238,11 @@ func MakeQueryCtx(ctx context.Context, ch services.StopChan, timeout time.Durati func (m *Adapter[RPC, HEAD]) AcquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc, chStopInFlight chan struct{}, raw *RPC) { // Need to wrap in mutex because state transition can cancel and replace context - m.StateMu.RLock() - chStopInFlight = m.ChStopInFlight + m.stateMu.RLock() + chStopInFlight = m.chStopInFlight cp := *m.rpc raw = &cp - m.StateMu.RUnlock() + m.stateMu.RUnlock() ctx, cancel = MakeQueryCtx(parentCtx, chStopInFlight, timeout) return } @@ -267,20 +267,20 @@ func (m *Adapter[RPC, HEAD]) UnsubscribeAllExcept(subs ...Subscription) { } } -// CancelInflightRequests closes and replaces the ChStopInFlight +// CancelInflightRequests closes and replaces the chStopInFlight func (m *Adapter[RPC, HEAD]) CancelInflightRequests() { - m.StateMu.Lock() - defer m.StateMu.Unlock() - close(m.ChStopInFlight) - m.ChStopInFlight = make(chan struct{}) + m.stateMu.Lock() + defer m.stateMu.Unlock() + close(m.chStopInFlight) + m.chStopInFlight = make(chan struct{}) } // GetChStopInflight provides a convenience helper that mutex wraps a -// read to the ChStopInFlight +// read to the chStopInFlight func (m *Adapter[RPC, HEAD]) GetChStopInflight() chan struct{} { - m.StateMu.RLock() - defer m.StateMu.RUnlock() - return m.ChStopInFlight + m.stateMu.RLock() + defer m.stateMu.RUnlock() + return m.chStopInFlight } func (m *Adapter[RPC, HEAD]) ResetLatestChainInfo() { diff --git a/multinode/adaptor_test.go b/multinode/adaptor_test.go index ca022b0..9af3b33 100644 --- a/multinode/adaptor_test.go +++ b/multinode/adaptor_test.go @@ -169,7 +169,7 @@ func TestMultiNodeClient_RegisterSubs(t *testing.T) { c.UnsubscribeAllExcept() }) - t.Run("ChStopInFlight returns error and unsubscribes", func(t *testing.T) { + t.Run("chStopInFlight returns error and unsubscribes", func(t *testing.T) { c := newTestClient(t) chStopInFlight := make(chan struct{}) close(chStopInFlight)