Skip to content

Commit

Permalink
Rename Adaptor
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanTinianov committed Jan 10, 2025
1 parent 3542ebf commit 3cb0f86
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 21 deletions.
38 changes: 19 additions & 19 deletions multinode/adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/smartcontractkit/chainlink-framework/multinode/config"
)

// MultiNodeAdapter is used to integrate multinode into chain-specific clients
type MultiNodeAdapter[RPC any, HEAD Head] struct {
// Adapter is used to integrate multinode into chain-specific clients
type Adapter[RPC any, HEAD Head] struct {
cfg *config.MultiNodeConfig
log logger.Logger
rpc *RPC
Expand All @@ -37,12 +37,12 @@ type MultiNodeAdapter[RPC any, HEAD Head] struct {
latestChainInfo ChainInfo
}

func NewMultiNodeAdapter[RPC any, HEAD Head](
func NewAdapter[RPC any, HEAD Head](
cfg *config.MultiNodeConfig, rpc *RPC, ctxTimeout time.Duration, log logger.Logger,
latestBlock func(ctx context.Context, rpc *RPC) (HEAD, error),
latestFinalizedBlock func(ctx context.Context, rpc *RPC) (HEAD, error),
) (*MultiNodeAdapter[RPC, HEAD], error) {
return &MultiNodeAdapter[RPC, HEAD]{
) (*Adapter[RPC, HEAD], error) {
return &Adapter[RPC, HEAD]{
cfg: cfg,
rpc: rpc,
log: log,
Expand All @@ -54,14 +54,14 @@ func NewMultiNodeAdapter[RPC any, HEAD Head](
}, nil
}

func (m *MultiNodeAdapter[RPC, HEAD]) LenSubs() int {
func (m *Adapter[RPC, HEAD]) LenSubs() int {
m.subsSliceMu.RLock()
defer m.subsSliceMu.RUnlock()
return len(m.subs)
}

// registerSub adds the sub to the rpcMultiNodeAdapter list
func (m *MultiNodeAdapter[RPC, HEAD]) registerSub(sub *ManagedSubscription, stopInFLightCh chan struct{}) error {
func (m *Adapter[RPC, HEAD]) registerSub(sub *ManagedSubscription, stopInFLightCh chan struct{}) error {
// ensure that the `sub` belongs to current life cycle of the `rpcMultiNodeAdapter` and it should not be killed due to
// previous `DisconnectAll` call.
select {
Expand All @@ -76,13 +76,13 @@ func (m *MultiNodeAdapter[RPC, HEAD]) registerSub(sub *ManagedSubscription, stop
return nil
}

func (m *MultiNodeAdapter[RPC, HEAD]) removeSub(sub Subscription) {
func (m *Adapter[RPC, HEAD]) removeSub(sub Subscription) {
m.subsSliceMu.Lock()
defer m.subsSliceMu.Unlock()
delete(m.subs, sub)
}

func (m *MultiNodeAdapter[RPC, HEAD]) LatestBlock(ctx context.Context) (HEAD, error) {
func (m *Adapter[RPC, HEAD]) LatestBlock(ctx context.Context) (HEAD, error) {
// capture chStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle
ctx, cancel, chStopInFlight, rpc := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()
Expand All @@ -100,7 +100,7 @@ func (m *MultiNodeAdapter[RPC, HEAD]) LatestBlock(ctx context.Context) (HEAD, er
return head, nil
}

func (m *MultiNodeAdapter[RPC, HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, error) {
func (m *Adapter[RPC, HEAD]) LatestFinalizedBlock(ctx context.Context) (HEAD, error) {
ctx, cancel, chStopInFlight, rpc := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

Expand All @@ -117,7 +117,7 @@ func (m *MultiNodeAdapter[RPC, HEAD]) LatestFinalizedBlock(ctx context.Context)
return head, nil
}

func (m *MultiNodeAdapter[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-chan HEAD, Subscription, error) {
func (m *Adapter[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-chan HEAD, Subscription, error) {
ctx, cancel, chStopInFlight, _ := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

Expand Down Expand Up @@ -152,7 +152,7 @@ func (m *MultiNodeAdapter[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-c
return channel, sub, nil
}

func (m *MultiNodeAdapter[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error) {
func (m *Adapter[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error) {
ctx, cancel, chStopInFlight, _ := m.AcquireQueryCtx(ctx, m.ctxTimeout)
defer cancel()

Expand Down Expand Up @@ -185,7 +185,7 @@ func (m *MultiNodeAdapter[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Cont
return channel, sub, nil
}

func (m *MultiNodeAdapter[RPC, HEAD]) onNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) {
func (m *Adapter[RPC, HEAD]) onNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) {
if !head.IsValid() {
return
}
Expand All @@ -203,7 +203,7 @@ func (m *MultiNodeAdapter[RPC, HEAD]) onNewHead(ctx context.Context, requestCh <
}
}

func (m *MultiNodeAdapter[RPC, HEAD]) onNewFinalizedHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) {
func (m *Adapter[RPC, HEAD]) onNewFinalizedHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) {
if !head.IsValid() {
return
}
Expand Down Expand Up @@ -236,7 +236,7 @@ func MakeQueryCtx(ctx context.Context, ch services.StopChan, timeout time.Durati
return ctx, cancel
}

func (m *MultiNodeAdapter[RPC, HEAD]) AcquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc,
func (m *Adapter[RPC, HEAD]) AcquireQueryCtx(parentCtx context.Context, timeout time.Duration) (ctx context.Context, cancel context.CancelFunc,
chStopInFlight chan struct{}, raw *RPC) {
// Need to wrap in mutex because state transition can cancel and replace context
m.stateMu.RLock()
Expand All @@ -248,7 +248,7 @@ func (m *MultiNodeAdapter[RPC, HEAD]) AcquireQueryCtx(parentCtx context.Context,
return
}

func (m *MultiNodeAdapter[RPC, HEAD]) UnsubscribeAllExcept(subs ...Subscription) {
func (m *Adapter[RPC, HEAD]) UnsubscribeAllExcept(subs ...Subscription) {
m.subsSliceMu.Lock()
keepSubs := map[Subscription]struct{}{}
for _, sub := range subs {
Expand All @@ -269,22 +269,22 @@ func (m *MultiNodeAdapter[RPC, HEAD]) UnsubscribeAllExcept(subs ...Subscription)
}

// cancelInflightRequests closes and replaces the chStopInFlight
func (m *MultiNodeAdapter[RPC, HEAD]) cancelInflightRequests() {
func (m *Adapter[RPC, HEAD]) cancelInflightRequests() {
m.stateMu.Lock()
defer m.stateMu.Unlock()
close(m.chStopInFlight)
m.chStopInFlight = make(chan struct{})
}

func (m *MultiNodeAdapter[RPC, HEAD]) Close() {
func (m *Adapter[RPC, HEAD]) Close() {
m.cancelInflightRequests()
m.UnsubscribeAllExcept()
m.chainInfoLock.Lock()
m.latestChainInfo = ChainInfo{}
m.chainInfoLock.Unlock()
}

func (m *MultiNodeAdapter[RPC, HEAD]) GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo) {
func (m *Adapter[RPC, HEAD]) GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo) {
m.chainInfoLock.Lock()
defer m.chainInfoLock.Unlock()
return m.latestChainInfo, m.highestUserObservations
Expand Down
4 changes: 2 additions & 2 deletions multinode/adaptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func ptr[T any](t T) *T {
return &t
}

func newTestClient(t *testing.T) *MultiNodeAdapter[testRPC, *testHead] {
func newTestClient(t *testing.T) *Adapter[testRPC, *testHead] {
requestTimeout := 5 * time.Second
lggr := logger.Test(t)
cfg := &config.MultiNodeConfig{
Expand All @@ -57,7 +57,7 @@ func newTestClient(t *testing.T) *MultiNodeAdapter[testRPC, *testHead] {
FinalizedBlockOffset: ptr(uint32(50)),
},
}
c, err := NewMultiNodeAdapter[testRPC, *testHead](cfg, &testRPC{}, requestTimeout, lggr, LatestBlock, LatestBlock)
c, err := NewAdapter[testRPC, *testHead](cfg, &testRPC{}, requestTimeout, lggr, LatestBlock, LatestBlock)
require.NoError(t, err)
t.Cleanup(c.Close)
return c
Expand Down

0 comments on commit 3cb0f86

Please sign in to comment.