Skip to content

Commit

Permalink
Merge pull request #602 from onflow/janez/generic-publisher-subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
janezpodhostnik authored Oct 10, 2024
2 parents 2e89609 + e615281 commit f29d065
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 116 deletions.
65 changes: 26 additions & 39 deletions api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/onflow/go-ethereum/eth/filters"
"github.com/onflow/go-ethereum/rpc"
"github.com/rs/zerolog"
"github.com/sethvargo/go-limiter"

"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/models"
Expand All @@ -25,10 +24,9 @@ type StreamAPI struct {
blocks storage.BlockIndexer
transactions storage.TransactionIndexer
receipts storage.ReceiptIndexer
blocksPublisher *models.Publisher
transactionsPublisher *models.Publisher
logsPublisher *models.Publisher
ratelimiter limiter.Store
blocksPublisher *models.Publisher[*models.Block]
transactionsPublisher *models.Publisher[*gethTypes.Transaction]
logsPublisher *models.Publisher[[]*gethTypes.Log]
}

func NewStreamAPI(
Expand All @@ -37,10 +35,9 @@ func NewStreamAPI(
blocks storage.BlockIndexer,
transactions storage.TransactionIndexer,
receipts storage.ReceiptIndexer,
blocksPublisher *models.Publisher,
transactionsPublisher *models.Publisher,
logsPublisher *models.Publisher,
ratelimiter limiter.Store,
blocksPublisher *models.Publisher[*models.Block],
transactionsPublisher *models.Publisher[*gethTypes.Transaction],
logsPublisher *models.Publisher[[]*gethTypes.Log],
) *StreamAPI {
return &StreamAPI{
logger: logger,
Expand All @@ -51,22 +48,17 @@ func NewStreamAPI(
blocksPublisher: blocksPublisher,
transactionsPublisher: transactionsPublisher,
logsPublisher: logsPublisher,
ratelimiter: ratelimiter,
}
}

// NewHeads send a notification each time a new block is appended to the chain.
func (s *StreamAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
return s.newSubscription(
return newSubscription(
ctx,
s.logger,
s.blocksPublisher,
func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error {
return func(data any) error {
block, ok := data.(*models.Block)
if !ok {
return fmt.Errorf("invalid data sent to block subscription: %s", sub.ID)
}

func(notifier *rpc.Notifier, sub *rpc.Subscription) func(block *models.Block) error {
return func(block *models.Block) error {
h, err := block.Hash()
if err != nil {
return err
Expand All @@ -93,16 +85,12 @@ func (s *StreamAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
// transaction enters the transaction pool. If fullTx is true the full tx is
// sent to the client, otherwise the hash is sent.
func (s *StreamAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) {
return s.newSubscription(
return newSubscription(
ctx,
s.logger,
s.transactionsPublisher,
func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error {
return func(data any) error {
tx, ok := data.(*gethTypes.Transaction)
if !ok {
return fmt.Errorf("invalid data sent to pending transaction subscription: %s", sub.ID)
}

func(notifier *rpc.Notifier, sub *rpc.Subscription) func(*gethTypes.Transaction) error {
return func(tx *gethTypes.Transaction) error {
if fullTx != nil && *fullTx {
return notifier.Notify(sub.ID, tx)
}
Expand All @@ -120,16 +108,12 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (
return nil, fmt.Errorf("failed to create log subscription filter: %w", err)
}

return s.newSubscription(
return newSubscription(
ctx,
s.logger,
s.logsPublisher,
func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error {
return func(data any) error {
allLogs, ok := data.([]*gethTypes.Log)
if !ok {
return fmt.Errorf("invalid data sent to log subscription: %s", sub.ID)
}

func(notifier *rpc.Notifier, sub *rpc.Subscription) func([]*gethTypes.Log) error {
return func(allLogs []*gethTypes.Log) error {
for _, log := range allLogs {
// todo we could optimize this matching for cases where we have multiple subscriptions
// using the same filter criteria, we could only filter once and stream to all subscribers
Expand All @@ -148,10 +132,11 @@ func (s *StreamAPI) Logs(ctx context.Context, criteria filters.FilterCriteria) (
)
}

func (s *StreamAPI) newSubscription(
func newSubscription[T any](
ctx context.Context,
publisher *models.Publisher,
callback func(notifier *rpc.Notifier, sub *rpc.Subscription) func(any) error,
logger zerolog.Logger,
publisher *models.Publisher[T],
callback func(notifier *rpc.Notifier, sub *rpc.Subscription) func(T) error,
) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
Expand All @@ -162,8 +147,10 @@ func (s *StreamAPI) newSubscription(

subs := models.NewSubscription(callback(notifier, rpcSub))

rpcSub.ID = rpc.ID(subs.ID().String())
l := s.logger.With().Str("subscription-id", subs.ID().String()).Logger()
l := logger.With().
Str("gateway-subscription-id", fmt.Sprintf("%p", subs)).
Str("ethereum-subscription-id", string(rpcSub.ID)).
Logger()

publisher.Subscribe(subs)

Expand Down
20 changes: 12 additions & 8 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/onflow/flow-go-sdk/access"
"github.com/onflow/flow-go-sdk/access/grpc"
"github.com/onflow/flow-go-sdk/crypto"
gethTypes "github.com/onflow/go-ethereum/core/types"
"github.com/rs/zerolog"
"github.com/sethvargo/go-limiter/memorystore"
grpcOpts "google.golang.org/grpc"
Expand All @@ -36,9 +37,9 @@ type Storages struct {
}

type Publishers struct {
Block *models.Publisher
Transaction *models.Publisher
Logs *models.Publisher
Block *models.Publisher[*models.Block]
Transaction *models.Publisher[*gethTypes.Transaction]
Logs *models.Publisher[[]*gethTypes.Log]
}

type Bootstrap struct {
Expand Down Expand Up @@ -72,9 +73,9 @@ func New(config *config.Config) (*Bootstrap, error) {

return &Bootstrap{
publishers: &Publishers{
Block: models.NewPublisher(),
Transaction: models.NewPublisher(),
Logs: models.NewPublisher(),
Block: models.NewPublisher[*models.Block](),
Transaction: models.NewPublisher[*gethTypes.Transaction](),
Logs: models.NewPublisher[[]*gethTypes.Log](),
},
storages: storages,
logger: logger,
Expand Down Expand Up @@ -209,7 +210,11 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
}

// create transaction pool
txPool := requester.NewTxPool(b.client, b.publishers.Transaction, b.logger)
txPool := requester.NewTxPool(
b.client,
b.publishers.Transaction,
b.logger,
)

evm, err := requester.NewEVM(
b.client,
Expand Down Expand Up @@ -260,7 +265,6 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
b.publishers.Block,
b.publishers.Transaction,
b.publishers.Logs,
ratelimiter,
)

pullAPI := api.NewPullAPI(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
cloud.google.com/go/storage v1.36.0
github.com/cockroachdb/pebble v1.1.1
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/onflow/atree v0.8.0-rc.6
github.com/onflow/cadence v1.0.0
Expand Down Expand Up @@ -82,6 +81,7 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
Expand Down
56 changes: 27 additions & 29 deletions models/stream.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,74 @@
package models

import (
"fmt"
"sync"

"github.com/google/uuid"
)

type Publisher struct {
type Publisher[T any] struct {
mux sync.RWMutex
subscribers map[uuid.UUID]Subscriber
subscribers map[Subscriber[T]]struct{}
}

func NewPublisher() *Publisher {
return &Publisher{
func NewPublisher[T any]() *Publisher[T] {
return &Publisher[T]{
mux: sync.RWMutex{},
subscribers: make(map[uuid.UUID]Subscriber),
subscribers: make(map[Subscriber[T]]struct{}),
}
}

func (p *Publisher) Publish(data any) {
func (p *Publisher[T]) Publish(data T) {
p.mux.RLock()
defer p.mux.RUnlock()

for _, s := range p.subscribers {
for s := range p.subscribers {
s.Notify(data)
}
}

func (p *Publisher) Subscribe(s Subscriber) {
func (p *Publisher[T]) Subscribe(s Subscriber[T]) {
p.mux.Lock()
defer p.mux.Unlock()

p.subscribers[s.ID()] = s
p.subscribers[s] = struct{}{}
}

func (p *Publisher) Unsubscribe(s Subscriber) {
func (p *Publisher[T]) Unsubscribe(s Subscriber[T]) {
p.mux.Lock()
defer p.mux.Unlock()

delete(p.subscribers, s.ID())
delete(p.subscribers, s)
}

type Subscriber interface {
ID() uuid.UUID
Notify(data any)
type Subscriber[T any] interface {
Notify(data T)
Error() <-chan error
}

type Subscription struct {
type Subscription[T any] struct {
err chan error
callback func(data any) error
uuid uuid.UUID
callback func(data T) error
}

func NewSubscription(callback func(any) error) *Subscription {
return &Subscription{
func NewSubscription[T any](callback func(T) error) *Subscription[T] {
return &Subscription[T]{
callback: callback,
uuid: uuid.New(),
err: make(chan error),
}
}

func (b *Subscription) Notify(data any) {
func (b *Subscription[T]) Notify(data T) {
err := b.callback(data)
if err != nil {
b.err <- err
select {
case b.err <- err:
default:
// TODO: handle this better!
panic(fmt.Sprintf("failed to send error to subscription %v", err))
}
}
}

func (b *Subscription) ID() uuid.UUID {
return b.uuid
}

func (b *Subscription) Error() <-chan error {
func (b *Subscription[T]) Error() <-chan error {
return b.err
}
Loading

0 comments on commit f29d065

Please sign in to comment.