From 28152cb8e0662603e9dc3e36fb51ede159ae7c14 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Wed, 23 Oct 2024 16:04:57 -0400 Subject: [PATCH] Remove deprecated X-chain pubsub server (#3490) --- go.mod | 2 +- pubsub/bloom/filter.go | 51 -------- pubsub/bloom/filter_test.go | 32 ----- pubsub/bloom/map_filter.go | 35 ------ pubsub/connection.go | 214 -------------------------------- pubsub/connections.go | 51 -------- pubsub/filter_param.go | 87 ------------- pubsub/filter_test.go | 77 ------------ pubsub/filterer.go | 8 -- pubsub/messages.go | 77 ------------ pubsub/server.go | 127 ------------------- vms/avm/pubsub_filterer.go | 44 ------- vms/avm/pubsub_filterer_test.go | 50 -------- vms/avm/service.md | 119 ------------------ vms/avm/vm.go | 7 -- 15 files changed, 1 insertion(+), 980 deletions(-) delete mode 100644 pubsub/bloom/filter.go delete mode 100644 pubsub/bloom/filter_test.go delete mode 100644 pubsub/bloom/map_filter.go delete mode 100644 pubsub/connection.go delete mode 100644 pubsub/connections.go delete mode 100644 pubsub/filter_param.go delete mode 100644 pubsub/filter_test.go delete mode 100644 pubsub/filterer.go delete mode 100644 pubsub/messages.go delete mode 100644 pubsub/server.go delete mode 100644 vms/avm/pubsub_filterer.go delete mode 100644 vms/avm/pubsub_filterer_test.go diff --git a/go.mod b/go.mod index 4cea2f92a5a8..3900e0bbfc3c 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,6 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.0 github.com/gorilla/rpc v1.2.0 - github.com/gorilla/websocket v1.5.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/holiman/uint256 v1.2.4 github.com/huin/goupnp v1.3.0 @@ -121,6 +120,7 @@ require ( github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/pprof v0.0.0-20230207041349-798e818bf904 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/hashicorp/go-bexpr v0.1.10 // indirect github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect diff --git a/pubsub/bloom/filter.go b/pubsub/bloom/filter.go deleted file mode 100644 index b0d023b51f19..000000000000 --- a/pubsub/bloom/filter.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package bloom - -import ( - "errors" - - "github.com/ava-labs/avalanchego/utils/bloom" -) - -const bytesPerHash = 8 - -var ( - _ Filter = (*filter)(nil) - - errMaxBytes = errors.New("too large") -) - -type Filter interface { - // Add adds to filter, assumed thread safe - Add(...[]byte) - - // Check checks filter, assumed thread safe - Check([]byte) bool -} - -func New(maxN int, p float64, maxBytes int) (Filter, error) { - numHashes, numEntries := bloom.OptimalParameters(maxN, p) - if neededBytes := 1 + numHashes*bytesPerHash + numEntries; neededBytes > maxBytes { - return nil, errMaxBytes - } - f, err := bloom.New(numHashes, numEntries) - return &filter{ - filter: f, - }, err -} - -type filter struct { - filter *bloom.Filter -} - -func (f *filter) Add(bl ...[]byte) { - for _, b := range bl { - bloom.Add(f.filter, b, nil) - } -} - -func (f *filter) Check(b []byte) bool { - return bloom.Contains(f.filter, b, nil) -} diff --git a/pubsub/bloom/filter_test.go b/pubsub/bloom/filter_test.go deleted file mode 100644 index 3b2c4b71a59d..000000000000 --- a/pubsub/bloom/filter_test.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package bloom - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/ava-labs/avalanchego/utils/units" -) - -func TestNew(t *testing.T) { - var ( - require = require.New(t) - maxN = 10000 - p = 0.1 - maxBytes = 1 * units.MiB // 1 MiB - ) - f, err := New(maxN, p, maxBytes) - require.NoError(err) - require.NotNil(f) - - f.Add([]byte("hello")) - - checked := f.Check([]byte("hello")) - require.True(checked, "should have contained the key") - - checked = f.Check([]byte("bye")) - require.False(checked, "shouldn't have contained the key") -} diff --git a/pubsub/bloom/map_filter.go b/pubsub/bloom/map_filter.go deleted file mode 100644 index d0edcbe88fd0..000000000000 --- a/pubsub/bloom/map_filter.go +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package bloom - -import ( - "sync" - - "github.com/ava-labs/avalanchego/utils/set" -) - -type mapFilter struct { - lock sync.RWMutex - values set.Set[string] -} - -func NewMap() Filter { - return &mapFilter{} -} - -func (m *mapFilter) Add(bl ...[]byte) { - m.lock.Lock() - defer m.lock.Unlock() - - for _, b := range bl { - m.values.Add(string(b)) - } -} - -func (m *mapFilter) Check(b []byte) bool { - m.lock.RLock() - defer m.lock.RUnlock() - - return m.values.Contains(string(b)) -} diff --git a/pubsub/connection.go b/pubsub/connection.go deleted file mode 100644 index 31d493355cda..000000000000 --- a/pubsub/connection.go +++ /dev/null @@ -1,214 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package pubsub - -import ( - "encoding/json" - "errors" - "fmt" - "sync/atomic" - "time" - - "github.com/gorilla/websocket" - "go.uber.org/zap" - - "github.com/ava-labs/avalanchego/pubsub/bloom" -) - -var ( - ErrFilterNotInitialized = errors.New("filter not initialized") - ErrAddressLimit = errors.New("address limit exceeded") - ErrInvalidFilterParam = errors.New("invalid bloom filter params") - ErrInvalidCommand = errors.New("invalid command") - _ Filter = (*connection)(nil) -) - -type Filter interface { - Check(addr []byte) bool -} - -// connection is a representation of the websocket connection. -type connection struct { - s *Server - - // The websocket connection. - conn *websocket.Conn - - // Buffered channel of outbound messages. - send chan interface{} - - fp *FilterParam - - active uint32 -} - -func (c *connection) Check(addr []byte) bool { - return c.fp.Check(addr) -} - -func (c *connection) isActive() bool { - active := atomic.LoadUint32(&c.active) - return active != 0 -} - -func (c *connection) deactivate() { - atomic.StoreUint32(&c.active, 0) -} - -func (c *connection) Send(msg interface{}) bool { - if !c.isActive() { - return false - } - select { - case c.send <- msg: - return true - default: - } - return false -} - -// readPump pumps messages from the websocket connection to the hub. -// -// The application runs readPump in a per-connection goroutine. The application -// ensures that there is at most one reader on a connection by executing all -// reads from this goroutine. -func (c *connection) readPump() { - defer func() { - c.deactivate() - c.s.removeConnection(c) - - // close is called by both the writePump and the readPump so one of them - // will always error - _ = c.conn.Close() - }() - - c.conn.SetReadLimit(maxMessageSize) - // SetReadDeadline returns an error if the connection is corrupted - if err := c.conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil { - return - } - c.conn.SetPongHandler(func(string) error { - return c.conn.SetReadDeadline(time.Now().Add(pongWait)) - }) - - for { - err := c.readMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - c.s.log.Debug("unexpected close in websockets", - zap.Error(err), - ) - } - break - } - } -} - -// writePump pumps messages from the hub to the websocket connection. -// -// A goroutine running writePump is started for each connection. The -// application ensures that there is at most one writer to a connection by -// executing all writes from this goroutine. -func (c *connection) writePump() { - ticker := time.NewTicker(pingPeriod) - defer func() { - c.deactivate() - ticker.Stop() - c.s.removeConnection(c) - - // close is called by both the writePump and the readPump so one of them - // will always error - _ = c.conn.Close() - }() - for { - select { - case message, ok := <-c.send: - if err := c.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { - c.s.log.Debug("closing the connection", - zap.String("reason", "failed to set the write deadline"), - zap.Error(err), - ) - return - } - if !ok { - // The hub closed the channel. Attempt to close the connection - // gracefully. - _ = c.conn.WriteMessage(websocket.CloseMessage, []byte{}) - return - } - - if err := c.conn.WriteJSON(message); err != nil { - return - } - case <-ticker.C: - if err := c.conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil { - c.s.log.Debug("closing the connection", - zap.String("reason", "failed to set the write deadline"), - zap.Error(err), - ) - return - } - if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { - return - } - } - } -} - -func (c *connection) readMessage() error { - _, r, err := c.conn.NextReader() - if err != nil { - return err - } - cmd := &Command{} - err = json.NewDecoder(r).Decode(cmd) - if err != nil { - return err - } - - switch { - case cmd.NewBloom != nil: - err = c.handleNewBloom(cmd.NewBloom) - case cmd.NewSet != nil: - c.handleNewSet(cmd.NewSet) - case cmd.AddAddresses != nil: - err = c.handleAddAddresses(cmd.AddAddresses) - default: - err = ErrInvalidCommand - } - if err != nil { - c.Send(&errorMsg{ - Error: err.Error(), - }) - } - return err -} - -func (c *connection) handleNewBloom(cmd *NewBloom) error { - if !cmd.IsParamsValid() { - return ErrInvalidFilterParam - } - filter, err := bloom.New(int(cmd.MaxElements), float64(cmd.CollisionProb), MaxBytes) - if err != nil { - return fmt.Errorf("bloom filter creation failed %w", err) - } - c.fp.SetFilter(filter) - return nil -} - -func (c *connection) handleNewSet(_ *NewSet) { - c.fp.NewSet() -} - -func (c *connection) handleAddAddresses(cmd *AddAddresses) error { - if err := cmd.parseAddresses(); err != nil { - return fmt.Errorf("address parse failed %w", err) - } - err := c.fp.Add(cmd.addressIds...) - if err != nil { - return fmt.Errorf("address append failed %w", err) - } - c.s.subscribedConnections.Add(c) - return nil -} diff --git a/pubsub/connections.go b/pubsub/connections.go deleted file mode 100644 index 25d35ac8cd82..000000000000 --- a/pubsub/connections.go +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package pubsub - -import ( - "sync" - - "github.com/ava-labs/avalanchego/utils/set" -) - -type connections struct { - lock sync.RWMutex - conns set.Set[*connection] - connsList []Filter -} - -func newConnections() *connections { - return &connections{} -} - -func (c *connections) Conns() []Filter { - c.lock.RLock() - defer c.lock.RUnlock() - - return append([]Filter{}, c.connsList...) -} - -func (c *connections) Remove(conn *connection) { - c.lock.Lock() - defer c.lock.Unlock() - - c.conns.Remove(conn) - c.createConnsList() -} - -func (c *connections) Add(conn *connection) { - c.lock.Lock() - defer c.lock.Unlock() - - c.conns.Add(conn) - c.createConnsList() -} - -func (c *connections) createConnsList() { - resp := make([]Filter, 0, len(c.conns)) - for c := range c.conns { - resp = append(resp, c) - } - c.connsList = resp -} diff --git a/pubsub/filter_param.go b/pubsub/filter_param.go deleted file mode 100644 index 5fd80a2ad706..000000000000 --- a/pubsub/filter_param.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package pubsub - -import ( - "sync" - - "github.com/ava-labs/avalanchego/pubsub/bloom" - "github.com/ava-labs/avalanchego/utils/set" -) - -type FilterParam struct { - lock sync.RWMutex - set set.Set[string] - filter bloom.Filter -} - -func NewFilterParam() *FilterParam { - return &FilterParam{ - set: set.Set[string]{}, - } -} - -func (f *FilterParam) NewSet() { - f.lock.Lock() - defer f.lock.Unlock() - - f.set = set.Set[string]{} - f.filter = nil -} - -func (f *FilterParam) Filter() bloom.Filter { - f.lock.RLock() - defer f.lock.RUnlock() - - return f.filter -} - -func (f *FilterParam) SetFilter(filter bloom.Filter) bloom.Filter { - f.lock.Lock() - defer f.lock.Unlock() - - f.filter = filter - f.set = nil - return f.filter -} - -func (f *FilterParam) Check(addr []byte) bool { - f.lock.RLock() - defer f.lock.RUnlock() - - if f.filter != nil && f.filter.Check(addr) { - return true - } - return f.set.Contains(string(addr)) -} - -func (f *FilterParam) Add(bl ...[]byte) error { - filter := f.Filter() - if filter != nil { - filter.Add(bl...) - return nil - } - - f.lock.Lock() - defer f.lock.Unlock() - - if f.set == nil { - return ErrFilterNotInitialized - } - - if len(f.set)+len(bl) > MaxAddresses { - return ErrAddressLimit - } - for _, b := range bl { - f.set.Add(string(b)) - } - return nil -} - -func (f *FilterParam) Len() int { - f.lock.RLock() - defer f.lock.RUnlock() - - return len(f.set) -} diff --git a/pubsub/filter_test.go b/pubsub/filter_test.go deleted file mode 100644 index 3b47a38e0237..000000000000 --- a/pubsub/filter_test.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package pubsub - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/ava-labs/avalanchego/api" - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/pubsub/bloom" - "github.com/ava-labs/avalanchego/utils/constants" - "github.com/ava-labs/avalanchego/utils/formatting/address" -) - -func TestAddAddressesParseAddresses(t *testing.T) { - require := require.New(t) - - chainAlias := "X" - hrp := constants.GetHRP(5) - - addrID := ids.ShortID{1} - addrStr, err := address.Format(chainAlias, hrp, addrID[:]) - require.NoError(err) - - msg := &AddAddresses{JSONAddresses: api.JSONAddresses{ - Addresses: []string{ - addrStr, - }, - }} - - require.NoError(msg.parseAddresses()) - - require.Len(msg.addressIds, 1) - require.Equal(addrID[:], msg.addressIds[0]) -} - -func TestFilterParamUpdateMulti(t *testing.T) { - require := require.New(t) - - fp := NewFilterParam() - - addr1 := []byte("abc") - addr2 := []byte("def") - addr3 := []byte("xyz") - - require.NoError(fp.Add(addr1, addr2, addr3)) - require.Len(fp.set, 3) - require.Contains(fp.set, string(addr1)) - require.Contains(fp.set, string(addr2)) - require.Contains(fp.set, string(addr3)) -} - -func TestFilterParam(t *testing.T) { - require := require.New(t) - - mapFilter := bloom.NewMap() - - fp := NewFilterParam() - fp.SetFilter(mapFilter) - - addr := ids.GenerateTestShortID() - require.NoError(fp.Add(addr[:])) - require.True(fp.Check(addr[:])) - delete(fp.set, string(addr[:])) - - mapFilter.Add(addr[:]) - require.True(fp.Check(addr[:])) - require.False(fp.Check([]byte("bye"))) -} - -func TestNewBloom(t *testing.T) { - cm := &NewBloom{} - require.False(t, cm.IsParamsValid()) -} diff --git a/pubsub/filterer.go b/pubsub/filterer.go deleted file mode 100644 index 3ec2910a9c4c..000000000000 --- a/pubsub/filterer.go +++ /dev/null @@ -1,8 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package pubsub - -type Filterer interface { - Filter(connections []Filter) ([]bool, interface{}) -} diff --git a/pubsub/messages.go b/pubsub/messages.go deleted file mode 100644 index ec41af813cdb..000000000000 --- a/pubsub/messages.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package pubsub - -import ( - "github.com/ava-labs/avalanchego/api" - "github.com/ava-labs/avalanchego/utils/formatting/address" - "github.com/ava-labs/avalanchego/utils/json" -) - -// NewBloom command for a new bloom filter -// -// Deprecated: The pubsub server is deprecated. -type NewBloom struct { - // MaxElements size of bloom filter - MaxElements json.Uint64 `json:"maxElements"` - // CollisionProb expected error rate of filter - CollisionProb json.Float64 `json:"collisionProb"` -} - -// NewSet command for a new map set -// -// Deprecated: The pubsub server is deprecated. -type NewSet struct{} - -// AddAddresses command to add addresses -// -// Deprecated: The pubsub server is deprecated. -type AddAddresses struct { - api.JSONAddresses - - // addressIds array of addresses, kept as a [][]byte for use in the bloom filter - addressIds [][]byte -} - -// Command execution command -// -// Deprecated: The pubsub server is deprecated. -type Command struct { - NewBloom *NewBloom `json:"newBloom,omitempty"` - NewSet *NewSet `json:"newSet,omitempty"` - AddAddresses *AddAddresses `json:"addAddresses,omitempty"` -} - -func (c *Command) String() string { - switch { - case c.NewBloom != nil: - return "newBloom" - case c.NewSet != nil: - return "newSet" - case c.AddAddresses != nil: - return "addAddresses" - default: - return "unknown" - } -} - -func (c *NewBloom) IsParamsValid() bool { - p := float64(c.CollisionProb) - return c.MaxElements > 0 && 0 < p && p <= 1 -} - -// parseAddresses converts the bech32 addresses to their byte format. -func (c *AddAddresses) parseAddresses() error { - if c.addressIds == nil { - c.addressIds = make([][]byte, len(c.Addresses)) - } - for i, addrStr := range c.Addresses { - _, _, addrBytes, err := address.Parse(addrStr) - if err != nil { - return err - } - c.addressIds[i] = addrBytes - } - return nil -} diff --git a/pubsub/server.go b/pubsub/server.go deleted file mode 100644 index b07dea89b34c..000000000000 --- a/pubsub/server.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package pubsub - -import ( - "net/http" - "sync" - "time" - - "github.com/gorilla/websocket" - "go.uber.org/zap" - - "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/utils/set" - "github.com/ava-labs/avalanchego/utils/units" -) - -const ( - // Size of the ws read buffer - readBufferSize = units.KiB - - // Size of the ws write buffer - writeBufferSize = units.KiB - - // Time allowed to write a message to the peer. - writeWait = 10 * time.Second - - // Time allowed to read the next pong message from the peer. - pongWait = 60 * time.Second - - // Send pings to peer with this period. Must be less than pongWait. - pingPeriod = (pongWait * 9) / 10 - - // Maximum message size allowed from peer. - maxMessageSize = 10 * units.KiB // bytes - - // Maximum number of pending messages to send to a peer. - maxPendingMessages = 1024 // messages - - // MaxBytes the max number of bytes for a filter - MaxBytes = 1 * units.MiB - - // MaxAddresses the max number of addresses allowed - MaxAddresses = 10000 -) - -type errorMsg struct { - Error string `json:"error"` -} - -var upgrader = websocket.Upgrader{ - ReadBufferSize: readBufferSize, - WriteBufferSize: writeBufferSize, - CheckOrigin: func(*http.Request) bool { - return true - }, -} - -// Server maintains the set of active clients and sends messages to the clients. -type Server struct { - log logging.Logger - lock sync.RWMutex - // conns a list of all our connections - conns set.Set[*connection] - // subscribedConnections the connections that have activated subscriptions - subscribedConnections *connections -} - -// Deprecated: The pubsub server is deprecated. -func New(log logging.Logger) *Server { - return &Server{ - log: log, - subscribedConnections: newConnections(), - } -} - -func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - wsConn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - s.log.Debug("failed to upgrade", - zap.Error(err), - ) - return - } - conn := &connection{ - s: s, - conn: wsConn, - send: make(chan interface{}, maxPendingMessages), - fp: NewFilterParam(), - active: 1, - } - s.addConnection(conn) -} - -func (s *Server) Publish(parser Filterer) { - conns := s.subscribedConnections.Conns() - toNotify, msg := parser.Filter(conns) - for i, shouldNotify := range toNotify { - if !shouldNotify { - continue - } - conn := conns[i].(*connection) - if !conn.Send(msg) { - s.log.Verbo("dropping message to subscribed connection due to too many pending messages") - } - } -} - -func (s *Server) addConnection(conn *connection) { - s.lock.Lock() - defer s.lock.Unlock() - - s.conns.Add(conn) - - go conn.writePump() - go conn.readPump() -} - -func (s *Server) removeConnection(conn *connection) { - s.subscribedConnections.Remove(conn) - - s.lock.Lock() - defer s.lock.Unlock() - - s.conns.Remove(conn) -} diff --git a/vms/avm/pubsub_filterer.go b/vms/avm/pubsub_filterer.go deleted file mode 100644 index caf0ba348393..000000000000 --- a/vms/avm/pubsub_filterer.go +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package avm - -import ( - "github.com/ava-labs/avalanchego/api" - "github.com/ava-labs/avalanchego/pubsub" - "github.com/ava-labs/avalanchego/vms/avm/txs" - "github.com/ava-labs/avalanchego/vms/components/avax" -) - -var _ pubsub.Filterer = (*connector)(nil) - -type connector struct { - tx *txs.Tx -} - -func NewPubSubFilterer(tx *txs.Tx) pubsub.Filterer { - return &connector{tx: tx} -} - -// Apply the filter on the addresses. -func (f *connector) Filter(filters []pubsub.Filter) ([]bool, interface{}) { - resp := make([]bool, len(filters)) - for _, utxo := range f.tx.UTXOs() { - addressable, ok := utxo.Out.(avax.Addressable) - if !ok { - continue - } - - for _, address := range addressable.Addresses() { - for i, c := range filters { - if resp[i] { - continue - } - resp[i] = c.Check(address) - } - } - } - return resp, api.JSONTxID{ - TxID: f.tx.ID(), - } -} diff --git a/vms/avm/pubsub_filterer_test.go b/vms/avm/pubsub_filterer_test.go deleted file mode 100644 index 0059b2218e39..000000000000 --- a/vms/avm/pubsub_filterer_test.go +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package avm - -import ( - "bytes" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/pubsub" - "github.com/ava-labs/avalanchego/vms/avm/txs" - "github.com/ava-labs/avalanchego/vms/components/avax" - "github.com/ava-labs/avalanchego/vms/secp256k1fx" -) - -type mockFilter struct { - addr []byte -} - -func (f *mockFilter) Check(addr []byte) bool { - return bytes.Equal(addr, f.addr) -} - -func TestFilter(t *testing.T) { - require := require.New(t) - - addrID := ids.ShortID{1} - tx := txs.Tx{Unsigned: &txs.BaseTx{BaseTx: avax.BaseTx{ - Outs: []*avax.TransferableOutput{ - { - Out: &secp256k1fx.TransferOutput{ - OutputOwners: secp256k1fx.OutputOwners{ - Addrs: []ids.ShortID{addrID}, - }, - }, - }, - }, - }}} - addrBytes := addrID[:] - - fp := pubsub.NewFilterParam() - require.NoError(fp.Add(addrBytes)) - - parser := NewPubSubFilterer(&tx) - fr, _ := parser.Filter([]pubsub.Filter{&mockFilter{addr: addrBytes}}) - require.Equal([]bool{true}, fr) -} diff --git a/vms/avm/service.md b/vms/avm/service.md index dfba13b05f0e..ac4558794337 100644 --- a/vms/avm/service.md +++ b/vms/avm/service.md @@ -2198,122 +2198,3 @@ curl -X POST --data '{ } } ``` - -### Events - -Listen for transactions on a specified address. - -This call is made to the events API endpoint: - -`/ext/bc/X/events` - -:::caution - -Endpoint deprecated as of [**v1.9.12**](https://github.com/ava-labs/avalanchego/releases/tag/v1.9.12). - -::: - -#### **Golang Example** - -```go -package main - -import ( - "encoding/json" - "log" - "net" - "net/http" - "sync" - - "github.com/ava-labs/avalanchego/api" - "github.com/ava-labs/avalanchego/pubsub" - "github.com/gorilla/websocket" -) - -func main() { - dialer := websocket.Dialer{ - NetDial: func(netw, addr string) (net.Conn, error) { - return net.Dial(netw, addr) - }, - } - - httpHeader := http.Header{} - conn, _, err := dialer.Dial("ws://localhost:9650/ext/bc/X/events", httpHeader) - if err != nil { - panic(err) - } - - waitGroup := &sync.WaitGroup{} - waitGroup.Add(1) - - readMsg := func() { - defer waitGroup.Done() - - for { - mt, msg, err := conn.ReadMessage() - if err != nil { - log.Println(err) - return - } - switch mt { - case websocket.TextMessage: - log.Println(string(msg)) - default: - log.Println(mt, string(msg)) - } - } - } - - go readMsg() - - cmd := &pubsub.Command{NewSet: &pubsub.NewSet{}} - cmdmsg, err := json.Marshal(cmd) - if err != nil { - panic(err) - } - err = conn.WriteMessage(websocket.TextMessage, cmdmsg) - if err != nil { - panic(err) - } - - var addresses []string - addresses = append(addresses, " X-fuji....") - cmd = &pubsub.Command{AddAddresses: &pubsub.AddAddresses{JSONAddresses: api.JSONAddresses{Addresses: addresses}}} - cmdmsg, err = json.Marshal(cmd) - if err != nil { - panic(err) - } - - err = conn.WriteMessage(websocket.TextMessage, cmdmsg) - if err != nil { - panic(err) - } - - waitGroup.Wait() -} -``` - -**Operations:** - -| Command | Description | Example | Arguments | -| :--------------- | :--------------------------- | :------------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------- | -| **NewSet** | create a new address map set | `{"newSet":{}}` | | -| **NewBloom** | create a new bloom set. | `{"newBloom":{"maxElements":"1000","collisionProb":"0.0100"}}` | `maxElements` - number of elements in filter must be > 0 `collisionProb` - allowed collision probability must be > 0 and <= 1 | -| **AddAddresses** | add an address to the set | `{"addAddresses":{"addresses":\["X-fuji..."\]}}` | addresses - list of addresses to match | - -Calling **NewSet** or **NewBloom** resets the filter, and must be followed with **AddAddresses**. -**AddAddresses** can be called multiple times. - -**Set details:** - -- **NewSet** performs absolute address matches, if the address is in the set you will be sent the - transaction. -- **NewBloom** [Bloom filtering](https://en.wikipedia.org/wiki/Bloom_filter) can produce false - positives, but can allow a greater number of addresses to be filtered. If the addresses is in the - filter, you will be sent the transaction. - -**Example Response:** - -```json -2021/05/11 15:59:35 {"txID":"22HWKHrREyXyAiDnVmGp3TQQ79tHSSVxA9h26VfDEzoxvwveyk"} -``` diff --git a/vms/avm/vm.go b/vms/avm/vm.go index c9170ba882f7..1026ba7d3de7 100644 --- a/vms/avm/vm.go +++ b/vms/avm/vm.go @@ -20,7 +20,6 @@ import ( "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/database/versiondb" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/pubsub" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/consensus/snowman" "github.com/ava-labs/avalanchego/snow/consensus/snowstorm" @@ -84,8 +83,6 @@ type VM struct { parser block.Parser - pubsub *pubsub.Server - appSender common.AppSender // State management @@ -195,8 +192,6 @@ func (vm *VM) Initialize( vm.db = versiondb.New(db) vm.assetToFxCache = &cache.LRU[ids.ID, set.Bits64]{Size: assetToFxCacheSize} - vm.pubsub = pubsub.New(ctx.Log) - typedFxs := make([]extensions.Fx, len(fxs)) vm.fxs = make([]*extensions.ParsedFx, len(fxs)) for i, fxContainer := range fxs { @@ -353,7 +348,6 @@ func (vm *VM) CreateHandlers(context.Context) (map[string]http.Handler, error) { return map[string]http.Handler{ "": rpcServer, "/wallet": walletServer, - "/events": vm.pubsub, }, err } @@ -681,7 +675,6 @@ func (vm *VM) onAccept(tx *txs.Tx) error { return fmt.Errorf("error indexing tx: %w", err) } - vm.pubsub.Publish(NewPubSubFilterer(tx)) vm.walletService.decided(txID) return nil }