From 0fde502e9174e4b0166b60707b0573f949e0a57c Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Mon, 30 Sep 2024 21:03:31 +0200 Subject: [PATCH 1/5] Add generics to subscriber and publisher --- api/stream.go | 62 ++++++++++++------------------- bootstrap/bootstrap.go | 20 ++++++---- go.mod | 2 +- models/stream.go | 47 ++++++++++------------- services/ingestion/engine.go | 9 +++-- services/ingestion/engine_test.go | 25 ++++++------- services/requester/pool.go | 4 +- services/traces/engine.go | 20 ++-------- services/traces/engine_test.go | 6 +-- 9 files changed, 80 insertions(+), 115 deletions(-) diff --git a/api/stream.go b/api/stream.go index cce811051..55e1d9996 100644 --- a/api/stream.go +++ b/api/stream.go @@ -11,7 +11,6 @@ import ( "github.com/onflow/go-ethereum/eth/filters" "github.com/onflow/go-ethereum/rpc" "github.com/rs/zerolog" - "github.com/sethvargo/go-limiter" "github.com/onflow/flow-evm-gateway/config" "github.com/onflow/flow-evm-gateway/models" @@ -25,10 +24,9 @@ type StreamAPI struct { blocks storage.BlockIndexer transactions storage.TransactionIndexer receipts storage.ReceiptIndexer - blocksPublisher *models.Publisher - transactionsPublisher *models.Publisher - logsPublisher *models.Publisher - ratelimiter limiter.Store + blocksPublisher *models.Publisher[*models.Block] + transactionsPublisher *models.Publisher[*gethTypes.Transaction] + logsPublisher *models.Publisher[[]*gethTypes.Log] } func NewStreamAPI( @@ -37,10 +35,9 @@ func NewStreamAPI( blocks storage.BlockIndexer, transactions storage.TransactionIndexer, receipts storage.ReceiptIndexer, - blocksPublisher *models.Publisher, - transactionsPublisher *models.Publisher, - logsPublisher *models.Publisher, - ratelimiter limiter.Store, + blocksPublisher *models.Publisher[*models.Block], + transactionsPublisher *models.Publisher[*gethTypes.Transaction], + logsPublisher *models.Publisher[[]*gethTypes.Log], ) *StreamAPI { return &StreamAPI{ logger: logger, @@ -51,22 +48,17 @@ func NewStreamAPI( blocksPublisher: blocksPublisher, transactionsPublisher: transactionsPublisher, logsPublisher: logsPublisher, - ratelimiter: ratelimiter, } } // NewHeads send a notification each time a new block is appended to the chain. func (s *StreamAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { - return s.newSubscription( + return newSubscription( ctx, + s.logger, s.blocksPublisher, - func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error { - return func(data any) error { - block, ok := data.(*models.Block) - if !ok { - return fmt.Errorf("invalid data sent to block subscription: %s", sub.ID) - } - + func(notifier *rpc.Notifier, sub *rpc.Subscription) func(block *models.Block) error { + return func(block *models.Block) error { h, err := block.Hash() if err != nil { return err @@ -93,16 +85,12 @@ func (s *StreamAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { // transaction enters the transaction pool. If fullTx is true the full tx is // sent to the client, otherwise the hash is sent. func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) { - return s.newSubscription( + return newSubscription( ctx, + s.logger, s.transactionsPublisher, - func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error { - return func(data any) error { - tx, ok := data.(*gethTypes.Transaction) - if !ok { - return fmt.Errorf("invalid data sent to pending transaction subscription: %s", sub.ID) - } - + func(notifier *rpc.Notifier, sub *rpc.Subscription) func(*gethTypes.Transaction) error { + return func(tx *gethTypes.Transaction) error { if fullTx != nil && *fullTx { return notifier.Notify(sub.ID, tx) } @@ -120,16 +108,12 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) ( return nil, fmt.Errorf("failed to create log subscription filter: %w", err) } - return s.newSubscription( + return newSubscription( ctx, + s.logger, s.logsPublisher, - func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error { - return func(data any) error { - allLogs, ok := data.([]*gethTypes.Log) - if !ok { - return fmt.Errorf("invalid data sent to log subscription: %s", sub.ID) - } - + func(notifier *rpc.Notifier, sub *rpc.Subscription) func([]*gethTypes.Log) error { + return func(allLogs []*gethTypes.Log) error { for _, log := range allLogs { // todo we could optimize this matching for cases where we have multiple subscriptions // using the same filter criteria, we could only filter once and stream to all subscribers @@ -148,10 +132,11 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) ( ) } -func (s *StreamAPI) newSubscription( +func newSubscription[T any]( ctx context.Context, - publisher *models.Publisher, - callback func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error, + logger zerolog.Logger, + publisher *models.Publisher[T], + callback func(notifier *rpc.Notifier, sub *rpc.Subscription) func(T) error, ) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { @@ -162,8 +147,7 @@ func (s *StreamAPI) newSubscription( subs := models.NewSubscription(callback(notifier, rpcSub)) - rpcSub.ID = rpc.ID(subs.ID().String()) - l := s.logger.With().Str("subscription-id", subs.ID().String()).Logger() + l := logger.With().Str("subscription-id", fmt.Sprintf("%p", subs)).Logger() publisher.Subscribe(subs) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index e575e6b07..687996158 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -10,6 +10,7 @@ import ( "github.com/onflow/flow-go-sdk/access" "github.com/onflow/flow-go-sdk/access/grpc" "github.com/onflow/flow-go-sdk/crypto" + gethTypes "github.com/onflow/go-ethereum/core/types" "github.com/rs/zerolog" "github.com/sethvargo/go-limiter/memorystore" grpcOpts "google.golang.org/grpc" @@ -36,9 +37,9 @@ type Storages struct { } type Publishers struct { - Block *models.Publisher - Transaction *models.Publisher - Logs *models.Publisher + Block *models.Publisher[*models.Block] + Transaction *models.Publisher[*gethTypes.Transaction] + Logs *models.Publisher[[]*gethTypes.Log] } type Bootstrap struct { @@ -71,9 +72,9 @@ func New(config *config.Config) (*Bootstrap, error) { return &Bootstrap{ publishers: &Publishers{ - Block: models.NewPublisher(), - Transaction: models.NewPublisher(), - Logs: models.NewPublisher(), + Block: models.NewPublisher[*models.Block](), + Transaction: models.NewPublisher[*gethTypes.Transaction](), + Logs: models.NewPublisher[[]*gethTypes.Log](), }, storages: storages, logger: logger, @@ -208,7 +209,11 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error { } // create transaction pool - txPool := requester.NewTxPool(b.client, b.publishers.Transaction, b.logger) + txPool := requester.NewTxPool( + b.client, + b.publishers.Transaction, + b.logger, + ) evm, err := requester.NewEVM( b.client, @@ -259,7 +264,6 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error { b.publishers.Block, b.publishers.Transaction, b.publishers.Logs, - ratelimiter, ) pullAPI := api.NewPullAPI( diff --git a/go.mod b/go.mod index eddaa4fb1..5885b6a26 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( cloud.google.com/go/storage v1.36.0 github.com/cockroachdb/pebble v1.1.1 github.com/goccy/go-json v0.10.2 - github.com/google/uuid v1.6.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/onflow/atree v0.8.0-rc.6 github.com/onflow/cadence v1.0.0-preview.52 @@ -82,6 +81,7 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/google/s2a-go v0.1.7 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect diff --git a/models/stream.go b/models/stream.go index c2befa13e..d3fc61223 100644 --- a/models/stream.go +++ b/models/stream.go @@ -2,75 +2,66 @@ package models import ( "sync" - - "github.com/google/uuid" ) -type Publisher struct { +type Publisher[T any] struct { mux sync.RWMutex - subscribers map[uuid.UUID]Subscriber + subscribers map[Subscriber[T]]struct{} } -func NewPublisher() *Publisher { - return &Publisher{ +func NewPublisher[T any]() *Publisher[T] { + return &Publisher[T]{ mux: sync.RWMutex{}, - subscribers: make(map[uuid.UUID]Subscriber), + subscribers: make(map[Subscriber[T]]struct{}), } } -func (p *Publisher) Publish(data any) { +func (p *Publisher[T]) Publish(data T) { p.mux.RLock() defer p.mux.RUnlock() - for _, s := range p.subscribers { + for s := range p.subscribers { s.Notify(data) } } -func (p *Publisher) Subscribe(s Subscriber) { +func (p *Publisher[T]) Subscribe(s Subscriber[T]) { p.mux.Lock() defer p.mux.Unlock() - p.subscribers[s.ID()] = s + p.subscribers[s] = struct{}{} } -func (p *Publisher) Unsubscribe(s Subscriber) { +func (p *Publisher[T]) Unsubscribe(s Subscriber[T]) { p.mux.Lock() defer p.mux.Unlock() - delete(p.subscribers, s.ID()) + delete(p.subscribers, s) } -type Subscriber interface { - ID() uuid.UUID - Notify(data any) +type Subscriber[T any] interface { + Notify(data T) Error() <-chan error } -type Subscription struct { +type Subscription[T any] struct { err chan error - callback func(data any) error - uuid uuid.UUID + callback func(data T) error } -func NewSubscription(callback func(any) error) *Subscription { - return &Subscription{ +func NewSubscription[T any](callback func(T) error) *Subscription[T] { + return &Subscription[T]{ callback: callback, - uuid: uuid.New(), } } -func (b *Subscription) Notify(data any) { +func (b *Subscription[T]) Notify(data T) { err := b.callback(data) if err != nil { b.err <- err } } -func (b *Subscription) ID() uuid.UUID { - return b.uuid -} - -func (b *Subscription) Error() <-chan error { +func (b *Subscription[T]) Error() <-chan error { return b.err } diff --git a/services/ingestion/engine.go b/services/ingestion/engine.go index 2a9792933..35f02be5e 100644 --- a/services/ingestion/engine.go +++ b/services/ingestion/engine.go @@ -6,6 +6,7 @@ import ( pebbleDB "github.com/cockroachdb/pebble" "github.com/onflow/flow-go-sdk" + gethTypes "github.com/onflow/go-ethereum/core/types" "github.com/rs/zerolog" "github.com/onflow/flow-evm-gateway/metrics" @@ -41,8 +42,8 @@ type Engine struct { accounts storage.AccountIndexer log zerolog.Logger evmLastHeight *models.SequentialHeight - blocksPublisher *models.Publisher - logsPublisher *models.Publisher + blocksPublisher *models.Publisher[*models.Block] + logsPublisher *models.Publisher[[]*gethTypes.Log] collector metrics.Collector } @@ -53,8 +54,8 @@ func NewEventIngestionEngine( receipts storage.ReceiptIndexer, transactions storage.TransactionIndexer, accounts storage.AccountIndexer, - blocksPublisher *models.Publisher, - logsPublisher *models.Publisher, + blocksPublisher *models.Publisher[*models.Block], + logsPublisher *models.Publisher[[]*gethTypes.Log], log zerolog.Logger, collector metrics.Collector, ) *Engine { diff --git a/services/ingestion/engine_test.go b/services/ingestion/engine_test.go index a2b2651ab..ed38637c4 100644 --- a/services/ingestion/engine_test.go +++ b/services/ingestion/engine_test.go @@ -68,8 +68,8 @@ func TestSerialBlockIngestion(t *testing.T) { receipts, transactions, accounts, - models.NewPublisher(), - models.NewPublisher(), + models.NewPublisher[*models.Block](), + models.NewPublisher[[]*gethTypes.Log](), zerolog.Nop(), metrics.NopCollector, ) @@ -148,8 +148,8 @@ func TestSerialBlockIngestion(t *testing.T) { receipts, transactions, accounts, - models.NewPublisher(), - models.NewPublisher(), + models.NewPublisher[*models.Block](), + models.NewPublisher[[]*gethTypes.Log](), zerolog.Nop(), metrics.NopCollector, ) @@ -210,11 +210,9 @@ func TestSerialBlockIngestion(t *testing.T) { close(eventsChan) <-waitErr }) - } func TestBlockAndTransactionIngestion(t *testing.T) { - t.Run("successfully ingest transaction and block", func(t *testing.T) { receipts := &storageMock.ReceiptIndexer{} transactions := &storageMock.TransactionIndexer{} @@ -265,8 +263,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) { receipts, transactions, accounts, - models.NewPublisher(), - models.NewPublisher(), + models.NewPublisher[*models.Block](), + models.NewPublisher[[]*gethTypes.Log](), zerolog.Nop(), metrics.NopCollector, ) @@ -368,8 +366,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) { receipts, transactions, accounts, - models.NewPublisher(), - models.NewPublisher(), + models.NewPublisher[*models.Block](), + models.NewPublisher[[]*gethTypes.Log](), zerolog.Nop(), metrics.NopCollector, ) @@ -417,7 +415,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) { { Type: string(blockEvent.Etype), Value: blockCdc, - }}, + }, + }, Height: nextHeight, }) @@ -463,8 +462,8 @@ func TestBlockAndTransactionIngestion(t *testing.T) { receipts, transactions, accounts, - models.NewPublisher(), - models.NewPublisher(), + models.NewPublisher[*models.Block](), + models.NewPublisher[[]*gethTypes.Log](), zerolog.Nop(), metrics.NopCollector, ) diff --git a/services/requester/pool.go b/services/requester/pool.go index 4c91a3c95..bb19551a1 100644 --- a/services/requester/pool.go +++ b/services/requester/pool.go @@ -29,13 +29,13 @@ type TxPool struct { logger zerolog.Logger client *CrossSporkClient pool *sync.Map - txPublisher *models.Publisher + txPublisher *models.Publisher[*gethTypes.Transaction] // todo add methods to inspect transaction pool state } func NewTxPool( client *CrossSporkClient, - transactionsPublisher *models.Publisher, + transactionsPublisher *models.Publisher[*gethTypes.Transaction], logger zerolog.Logger, ) *TxPool { return &TxPool{ diff --git a/services/traces/engine.go b/services/traces/engine.go index fd51dfece..0f089f0ed 100644 --- a/services/traces/engine.go +++ b/services/traces/engine.go @@ -5,7 +5,6 @@ import ( "sync" "time" - "github.com/google/uuid" "github.com/onflow/flow-go-sdk" gethCommon "github.com/onflow/go-ethereum/common" "github.com/rs/zerolog" @@ -30,7 +29,7 @@ type Engine struct { *models.EngineStatus logger zerolog.Logger - blocksPublisher *models.Publisher + blocksPublisher *models.Publisher[*models.Block] blocks storage.BlockIndexer traces storage.TraceIndexer downloader Downloader @@ -39,7 +38,7 @@ type Engine struct { // NewTracesIngestionEngine creates a new instance of the engine. func NewTracesIngestionEngine( - blocksPublisher *models.Publisher, + blocksPublisher *models.Publisher[*models.Block], blocks storage.BlockIndexer, traces storage.TraceIndexer, downloader Downloader, @@ -70,13 +69,7 @@ func (e *Engine) Run(ctx context.Context) error { // Notify is a handler that is being used to subscribe for new EVM block notifications. // This method should be non-blocking. -func (e *Engine) Notify(data any) { - block, ok := data.(*models.Block) - if !ok { - e.logger.Error().Msg("invalid event type sent to trace ingestion") - return - } - +func (e *Engine) Notify(block *models.Block) { // If the block has no transactions, we simply return early // as there are no transaction traces to index. if len(block.TransactionHashes) == 0 { @@ -126,7 +119,6 @@ func (e *Engine) indexBlockTraces(evmBlock *models.Block, cadenceBlockID flow.Id return e.traces.StoreTransaction(h, trace, nil) }) - if err != nil { e.collector.TraceDownloadFailed() l.Error().Err(err).Msg("failed to download trace") @@ -139,12 +131,6 @@ func (e *Engine) indexBlockTraces(evmBlock *models.Block, cadenceBlockID flow.Id wg.Wait() } -// ID is required by the publisher interface and we return a random uuid since the -// subscription will only happen once by this engine -func (e *Engine) ID() uuid.UUID { - return uuid.New() -} - // Error is required by the publisher, and we just return a nil, // since the errors are handled gracefully in the indexBlockTraces func (e *Engine) Error() <-chan error { diff --git a/services/traces/engine_test.go b/services/traces/engine_test.go index 6bd187112..89473afcb 100644 --- a/services/traces/engine_test.go +++ b/services/traces/engine_test.go @@ -27,7 +27,7 @@ import ( // downloaded and stored. func TestTraceIngestion(t *testing.T) { t.Run("successful single block ingestion", func(t *testing.T) { - blockPublisher := models.NewPublisher() + blockPublisher := models.NewPublisher[*models.Block]() blocks := &storageMock.BlockIndexer{} trace := &storageMock.TraceIndexer{} downloader := &mocks.Downloader{} @@ -113,7 +113,7 @@ func TestTraceIngestion(t *testing.T) { }) t.Run("successful multiple blocks ingestion", func(t *testing.T) { - blocksPublisher := models.NewPublisher() + blocksPublisher := models.NewPublisher[*models.Block]() blocks := &storageMock.BlockIndexer{} trace := &storageMock.TraceIndexer{} downloader := &mocks.Downloader{} @@ -230,7 +230,7 @@ func TestTraceIngestion(t *testing.T) { }) t.Run("failed download retries", func(t *testing.T) { - blockBroadcaster := models.NewPublisher() + blockBroadcaster := models.NewPublisher[*models.Block]() blocks := &storageMock.BlockIndexer{} downloader := &mocks.Downloader{} trace := &storageMock.TraceIndexer{} From 44112e1b0d448c6200763238d7b67a837f361865 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Wed, 2 Oct 2024 16:58:43 +0200 Subject: [PATCH 2/5] subscriber and publisher tests and fixes --- models/stream.go | 9 ++- models/stream_test.go | 144 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 152 insertions(+), 1 deletion(-) create mode 100644 models/stream_test.go diff --git a/models/stream.go b/models/stream.go index d3fc61223..6af69a208 100644 --- a/models/stream.go +++ b/models/stream.go @@ -1,6 +1,7 @@ package models import ( + "fmt" "sync" ) @@ -52,13 +53,19 @@ type Subscription[T any] struct { func NewSubscription[T any](callback func(T) error) *Subscription[T] { return &Subscription[T]{ callback: callback, + err: make(chan error), } } func (b *Subscription[T]) Notify(data T) { err := b.callback(data) if err != nil { - b.err <- err + select { + case b.err <- err: + default: + // TODO: handle this better! + panic(fmt.Sprintf("failed to send error to subscription %v", err)) + } } } diff --git a/models/stream_test.go b/models/stream_test.go new file mode 100644 index 000000000..02d9ebfa1 --- /dev/null +++ b/models/stream_test.go @@ -0,0 +1,144 @@ +package models_test + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/onflow/flow-evm-gateway/models" + "github.com/stretchr/testify/require" +) + +func Test_Stream(t *testing.T) { + + t.Run("unsubscribe before subscribing", func(t *testing.T) { + p := newMockPublisher() + s := newMockSubscription() + + require.NotPanics(t, func() { + p.Unsubscribe(s) + }) + }) + + t.Run("subscribe, publish, unsubscribe, publish", func(t *testing.T) { + p := newMockPublisher() + s1 := newMockSubscription() + s2 := newMockSubscription() + + p.Subscribe(s1) + p.Subscribe(s2) + + p.Publish(mockData{}) + + require.Equal(t, uint(1), s1.callCount) + require.Equal(t, uint(1), s2.callCount) + + p.Unsubscribe(s1) + + p.Publish(mockData{}) + + require.Equal(t, uint(1), s1.callCount) + require.Equal(t, uint(2), s2.callCount) + }) + + t.Run("concurrent subscribe, publish, unsubscribe, publish", func(t *testing.T) { + + p := newMockPublisher() + + stopPublishing := make(chan struct{}) + + // publishing + go func() { + for { + select { + case <-stopPublishing: + return + case <-time.After(time.Millisecond * 1): + p.Publish(mockData{}) + } + } + }() + + wg := sync.WaitGroup{} + + // 10 goroutines adding 10 subscribers each + // waiting for 100 ms to make sure all goroutines are added + // and then unsubscribe all + wg.Add(10) + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + + subscriptions := make([]*mockSubscription, 10) + + for j := 0; j < 10; j++ { + s := newMockSubscription() + subscriptions[j] = s + p.Subscribe(s) + } + + <-time.After(time.Millisecond * 100) + + for _, s := range subscriptions { + p.Unsubscribe(s) + } + + // there should be at least 50 calls + for j := 0; j < 10; j++ { + require.Greater(t, subscriptions[j].callCount, uint(50)) + } + }() + } + + wg.Wait() + close(stopPublishing) + }) + + t.Run("error handling", func(t *testing.T) { + p := newMockPublisher() + s := &mockSubscription{} + errContent := fmt.Errorf("failed to process data") + + s.Subscription = models.NewSubscription[mockData](func(data mockData) error { + s.callCount++ + return errContent + }) + + p.Subscribe(s) + + go func() { + select { + case err := <-s.Error(): + require.ErrorIs(t, err, errContent) + case <-time.After(time.Millisecond * 10): + require.Fail(t, "should have received error") + } + }() + + // wait for the goroutine to subscribe to error channel + <-time.After(time.Millisecond * 1) + + p.Publish(mockData{}) + }) +} + +type mockData struct{} + +type mockSubscription struct { + *models.Subscription[mockData] + callCount uint +} + +func newMockSubscription() *mockSubscription { + s := &mockSubscription{} + s.Subscription = models.NewSubscription[mockData](func(data mockData) error { + s.callCount++ + return nil + }) + return s +} + +func newMockPublisher() *models.Publisher[mockData] { + return models.NewPublisher[mockData]() +} From df0be1037829ccb7c1f88d61921575a62abc3adc Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Wed, 2 Oct 2024 17:10:02 +0200 Subject: [PATCH 3/5] use atomic counter in tests --- models/stream_test.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/models/stream_test.go b/models/stream_test.go index 02d9ebfa1..1cd3c6a6e 100644 --- a/models/stream_test.go +++ b/models/stream_test.go @@ -3,6 +3,7 @@ package models_test import ( "fmt" "sync" + "sync/atomic" "testing" "time" @@ -31,15 +32,15 @@ func Test_Stream(t *testing.T) { p.Publish(mockData{}) - require.Equal(t, uint(1), s1.callCount) - require.Equal(t, uint(1), s2.callCount) + require.Equal(t, uint64(1), s1.CallCount()) + require.Equal(t, uint64(1), s2.CallCount()) p.Unsubscribe(s1) p.Publish(mockData{}) - require.Equal(t, uint(1), s1.callCount) - require.Equal(t, uint(2), s2.callCount) + require.Equal(t, uint64(1), s1.CallCount()) + require.Equal(t, uint64(2), s2.CallCount()) }) t.Run("concurrent subscribe, publish, unsubscribe, publish", func(t *testing.T) { @@ -86,7 +87,7 @@ func Test_Stream(t *testing.T) { // there should be at least 50 calls for j := 0; j < 10; j++ { - require.Greater(t, subscriptions[j].callCount, uint(50)) + require.Greater(t, subscriptions[j].CallCount(), uint64(50)) } }() } @@ -101,7 +102,7 @@ func Test_Stream(t *testing.T) { errContent := fmt.Errorf("failed to process data") s.Subscription = models.NewSubscription[mockData](func(data mockData) error { - s.callCount++ + s.callCount.Add(1) return errContent }) @@ -127,18 +128,22 @@ type mockData struct{} type mockSubscription struct { *models.Subscription[mockData] - callCount uint + callCount atomic.Uint64 } func newMockSubscription() *mockSubscription { s := &mockSubscription{} s.Subscription = models.NewSubscription[mockData](func(data mockData) error { - s.callCount++ + s.callCount.Add(1) return nil }) return s } +func (s *mockSubscription) CallCount() uint64 { + return s.callCount.Load() +} + func newMockPublisher() *models.Publisher[mockData] { return models.NewPublisher[mockData]() } From 0cdc2b3f550256ca2b79bfff6f895dc4dfc3cea4 Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Wed, 2 Oct 2024 17:28:38 +0200 Subject: [PATCH 4/5] remove timers from tests to make them more stable --- models/stream_test.go | 42 +++++++++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/models/stream_test.go b/models/stream_test.go index 1cd3c6a6e..8864bc44d 100644 --- a/models/stream_test.go +++ b/models/stream_test.go @@ -49,6 +49,8 @@ func Test_Stream(t *testing.T) { stopPublishing := make(chan struct{}) + published := make(chan struct{}) + // publishing go func() { for { @@ -57,42 +59,54 @@ func Test_Stream(t *testing.T) { return case <-time.After(time.Millisecond * 1): p.Publish(mockData{}) + + select { + case published <- struct{}{}: + default: + } } } }() - wg := sync.WaitGroup{} + waitAllSubscribed := sync.WaitGroup{} + waitAllUnsubscribed := sync.WaitGroup{} // 10 goroutines adding 10 subscribers each - // waiting for 100 ms to make sure all goroutines are added // and then unsubscribe all - wg.Add(10) + waitAllSubscribed.Add(10) + waitAllUnsubscribed.Add(10) for i := 0; i < 10; i++ { go func() { - defer wg.Done() - subscriptions := make([]*mockSubscription, 10) for j := 0; j < 10; j++ { s := newMockSubscription() subscriptions[j] = s p.Subscribe(s) + } + waitAllSubscribed.Done() + waitAllSubscribed.Wait() - <-time.After(time.Millisecond * 100) + // wait for all subscribers to receive data + for i := 0; i < 10; i++ { + <-published + } for _, s := range subscriptions { p.Unsubscribe(s) } - // there should be at least 50 calls + // there should be at least 1 call for j := 0; j < 10; j++ { - require.Greater(t, subscriptions[j].CallCount(), uint64(50)) + require.GreaterOrEqual(t, subscriptions[j].CallCount(), uint64(10)) } + + waitAllUnsubscribed.Done() }() } - wg.Wait() + waitAllUnsubscribed.Wait() close(stopPublishing) }) @@ -108,19 +122,21 @@ func Test_Stream(t *testing.T) { p.Subscribe(s) + shouldReceiveError := make(chan struct{}) + ready := make(chan struct{}) go func() { + close(ready) select { case err := <-s.Error(): require.ErrorIs(t, err, errContent) - case <-time.After(time.Millisecond * 10): + case <-shouldReceiveError: require.Fail(t, "should have received error") } }() - - // wait for the goroutine to subscribe to error channel - <-time.After(time.Millisecond * 1) + <-ready p.Publish(mockData{}) + close(shouldReceiveError) }) } From 47ecbea56a0f2ea223d38b5af372099fd20feabd Mon Sep 17 00:00:00 2001 From: Janez Podhostnik Date: Tue, 8 Oct 2024 11:59:41 -0700 Subject: [PATCH 5/5] more specific logs --- api/stream.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/api/stream.go b/api/stream.go index 55e1d9996..80acfce0f 100644 --- a/api/stream.go +++ b/api/stream.go @@ -147,7 +147,10 @@ func newSubscription[T any]( subs := models.NewSubscription(callback(notifier, rpcSub)) - l := logger.With().Str("subscription-id", fmt.Sprintf("%p", subs)).Logger() + l := logger.With(). + Str("gateway-subscription-id", fmt.Sprintf("%p", subs)). + Str("ethereum-subscription-id", string(rpcSub.ID)). + Logger() publisher.Subscribe(subs)