Skip to content

Commit

Permalink
Unexport chStopInFlight
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanTinianov committed Jan 16, 2025
1 parent 705a698 commit f643a5b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 19 deletions.
36 changes: 18 additions & 18 deletions multinode/adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ type Adapter[RPC any, HEAD Head] struct {
log logger.Logger
rpc *RPC
ctxTimeout time.Duration
StateMu sync.RWMutex // protects state* fields
stateMu sync.RWMutex // protects state* fields
subsSliceMu sync.RWMutex
subs map[Subscription]struct{}

latestBlock func(ctx context.Context, rpc *RPC) (HEAD, error)
latestFinalizedBlock func(ctx context.Context, rpc *RPC) (HEAD, error)

// ChStopInFlight can be closed to immediately cancel all in-flight requests on
// chStopInFlight can be closed to immediately cancel all in-flight requests on
// this RpcMultiNodeAdapter. Closing and replacing should be serialized through
// StateMu since it can happen on state transitions as well as RpcMultiNodeAdapter Close.
ChStopInFlight chan struct{}
// stateMu since it can happen on state transitions as well as RpcMultiNodeAdapter Close.
chStopInFlight chan struct{}

chainInfoLock sync.RWMutex
// intercepted values seen by callers of the rpcMultiNodeAdapter excluding health check calls. Need to ensure MultiNode provides repeatable read guarantee
Expand All @@ -54,7 +54,7 @@ func NewAdapter[RPC any, HEAD Head](
latestBlock: latestBlock,
latestFinalizedBlock: latestFinalizedBlock,
subs: make(map[Subscription]struct{}),
ChStopInFlight: make(chan struct{}),
chStopInFlight: make(chan struct{}),
}
}

Expand Down Expand Up @@ -146,7 +146,7 @@ func (m *Adapter[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-c
}

func (m *Adapter[RPC, HEAD]) LatestBlock(ctx context.Context) (HEAD, error) {
// capture ChStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle
// capture chStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle
ctx, cancel, chStopInFlight, rpc := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

Expand Down Expand Up @@ -238,11 +238,11 @@ func MakeQueryCtx(ctx context.Context, ch services.StopChan, timeout time.Durati
func (m *Adapter[RPC, HEAD]) AcquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc,
chStopInFlight chan struct{}, raw *RPC) {
// Need to wrap in mutex because state transition can cancel and replace context
m.StateMu.RLock()
chStopInFlight = m.ChStopInFlight
m.stateMu.RLock()
chStopInFlight = m.chStopInFlight
cp := *m.rpc
raw = &cp
m.StateMu.RUnlock()
m.stateMu.RUnlock()
ctx, cancel = MakeQueryCtx(parentCtx, chStopInFlight, timeout)
return
}
Expand All @@ -267,20 +267,20 @@ func (m *Adapter[RPC, HEAD]) UnsubscribeAllExcept(subs ...Subscription) {
}
}

// CancelInflightRequests closes and replaces the ChStopInFlight
// CancelInflightRequests closes and replaces the chStopInFlight
func (m *Adapter[RPC, HEAD]) CancelInflightRequests() {
m.StateMu.Lock()
defer m.StateMu.Unlock()
close(m.ChStopInFlight)
m.ChStopInFlight = make(chan struct{})
m.stateMu.Lock()
defer m.stateMu.Unlock()
close(m.chStopInFlight)
m.chStopInFlight = make(chan struct{})
}

// GetChStopInflight provides a convenience helper that mutex wraps a
// read to the ChStopInFlight
// read to the chStopInFlight
func (m *Adapter[RPC, HEAD]) GetChStopInflight() chan struct{} {
m.StateMu.RLock()
defer m.StateMu.RUnlock()
return m.ChStopInFlight
m.stateMu.RLock()
defer m.stateMu.RUnlock()
return m.chStopInFlight
}

func (m *Adapter[RPC, HEAD]) ResetLatestChainInfo() {
Expand Down
2 changes: 1 addition & 1 deletion multinode/adaptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func TestMultiNodeClient_RegisterSubs(t *testing.T) {
c.UnsubscribeAllExcept()
})

t.Run("ChStopInFlight returns error and unsubscribes", func(t *testing.T) {
t.Run("chStopInFlight returns error and unsubscribes", func(t *testing.T) {
c := newTestClient(t)
chStopInFlight := make(chan struct{})
close(chStopInFlight)
Expand Down

0 comments on commit f643a5b

Please sign in to comment.