From d1e0861eeb4caedd18175be6d8b15acb1743104f Mon Sep 17 00:00:00 2001 From: Dario Gabriel Lipicar Date: Thu, 5 Dec 2024 15:44:03 -0300 Subject: [PATCH] chore_: remove transaction local notifications --- node/status_node_services.go | 10 +- services/local-notifications/api.go | 14 -- services/local-notifications/core.go | 82 +----- services/local-notifications/core_test.go | 127 +--------- services/local-notifications/database.go | 21 +- services/local-notifications/database_test.go | 32 +-- services/local-notifications/transaction.go | 234 ------------------ services/local-notifications/types.go | 4 +- 8 files changed, 22 insertions(+), 502 deletions(-) delete mode 100644 services/local-notifications/transaction.go diff --git a/node/status_node_services.go b/node/status_node_services.go index cbd527ad0cd..71ad5e796ad 100644 --- a/node/status_node_services.go +++ b/node/status_node_services.go @@ -60,7 +60,6 @@ import ( "github.com/status-im/status-go/services/wakuv2ext" "github.com/status-im/status-go/services/wallet" "github.com/status-im/status-go/services/wallet/thirdparty" - "github.com/status-im/status-go/services/wallet/transfer" "github.com/status-im/status-go/services/web3provider" "github.com/status-im/status-go/timesource" "github.com/status-im/status-go/waku" @@ -606,7 +605,7 @@ func (b *StatusNode) walletService(accountsDB *accounts.Database, appDB *sql.DB, func (b *StatusNode) localNotificationsService(network uint64) (*localnotifications.Service, error) { var err error if b.localNotificationsSrvc == nil { - b.localNotificationsSrvc, err = localnotifications.NewService(b.appDB, transfer.NewDB(b.walletDB), network) + b.localNotificationsSrvc, err = localnotifications.NewService(b.appDB) if err != nil { return nil, err } @@ -684,13 +683,6 @@ func (b *StatusNode) StartLocalNotifications() error { } } - err := b.localNotificationsSrvc.SubscribeWallet(&b.walletFeed) - - if err != nil { - b.logger.Error("LocalNotifications service could not subscribe to wallet on StartLocalNotifications", zap.Error(err)) - return nil - } - return nil } diff --git a/services/local-notifications/api.go b/services/local-notifications/api.go index 632ef499934..c3b82ecd693 100644 --- a/services/local-notifications/api.go +++ b/services/local-notifications/api.go @@ -2,8 +2,6 @@ package localnotifications import ( "context" - - "github.com/status-im/status-go/logutils" ) func NewAPI(s *Service) *API { @@ -17,15 +15,3 @@ type API struct { func (api *API) NotificationPreferences(ctx context.Context) ([]NotificationPreference, error) { return api.s.db.GetPreferences() } - -func (api *API) SwitchWalletNotifications(ctx context.Context, preference bool) error { - logutils.ZapLogger().Debug("Switch Transaction Notification") - err := api.s.db.ChangeWalletPreference(preference) - if err != nil { - return err - } - - api.s.WatchingEnabled = preference - - return nil -} diff --git a/services/local-notifications/core.go b/services/local-notifications/core.go index 71f229e7fed..2c650cd69d4 100644 --- a/services/local-notifications/core.go +++ b/services/local-notifications/core.go @@ -3,18 +3,11 @@ package localnotifications import ( "database/sql" "encoding/json" - "sync" - - "go.uber.org/zap" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rpc" - gocommon "github.com/status-im/status-go/common" "github.com/status-im/status-go/logutils" - "github.com/status-im/status-go/multiaccounts/accounts" - "github.com/status-im/status-go/services/wallet/transfer" "github.com/status-im/status-go/signal" ) @@ -79,41 +72,17 @@ type MessageEvent struct{} // CustomEvent - structure used to pass custom user set messages to bus type CustomEvent struct{} -type transmitter struct { - publisher *event.Feed - - wg sync.WaitGroup - quit chan struct{} -} - // Service keeps the state of message bus type Service struct { - started bool - WatchingEnabled bool - chainID uint64 - transmitter *transmitter - walletTransmitter *transmitter - db *Database - walletDB *transfer.Database - accountsDB *accounts.Database + started bool + db *Database } -func NewService(appDB *sql.DB, walletDB *transfer.Database, chainID uint64) (*Service, error) { - db := NewDB(appDB, chainID) - accountsDB, err := accounts.NewDB(appDB) - if err != nil { - return nil, err - } - trans := &transmitter{} - walletTrans := &transmitter{} +func NewService(appDB *sql.DB) (*Service, error) { + db := NewDB(appDB) return &Service{ - db: db, - chainID: chainID, - walletDB: walletDB, - accountsDB: accountsDB, - transmitter: trans, - walletTransmitter: walletTrans, + db: db, }, nil } @@ -165,54 +134,13 @@ func pushMessage(notification *Notification) { // Start Worker which processes all incoming messages func (s *Service) Start() error { s.started = true - - s.transmitter.quit = make(chan struct{}) - s.transmitter.publisher = &event.Feed{} - - events := make(chan TransactionEvent, 10) - sub := s.transmitter.publisher.Subscribe(events) - - s.transmitter.wg.Add(1) - go func() { - defer gocommon.LogOnPanic() - defer s.transmitter.wg.Done() - for { - select { - case <-s.transmitter.quit: - sub.Unsubscribe() - return - case err := <-sub.Err(): - if err != nil { - logutils.ZapLogger().Error("Local notifications transmitter failed with", zap.Error(err)) - } - return - case event := <-events: - s.transactionsHandler(event) - } - } - }() - logutils.ZapLogger().Info("Successful start") - return nil } // Stop worker func (s *Service) Stop() error { s.started = false - - if s.transmitter.quit != nil { - close(s.transmitter.quit) - s.transmitter.wg.Wait() - s.transmitter.quit = nil - } - - if s.walletTransmitter.quit != nil { - close(s.walletTransmitter.quit) - s.walletTransmitter.wg.Wait() - s.walletTransmitter.quit = nil - } - return nil } diff --git a/services/local-notifications/core_test.go b/services/local-notifications/core_test.go index baaeddf9a0c..f6a335188bd 100644 --- a/services/local-notifications/core_test.go +++ b/services/local-notifications/core_test.go @@ -1,43 +1,16 @@ package localnotifications import ( - "fmt" - "math/big" - "strings" "testing" - "time" "github.com/stretchr/testify/require" - - w_common "github.com/status-im/status-go/services/wallet/common" - "github.com/status-im/status-go/services/wallet/transfer" - "github.com/status-im/status-go/services/wallet/walletevent" - "github.com/status-im/status-go/signal" - "github.com/status-im/status-go/t/helpers" - "github.com/status-im/status-go/t/utils" - "github.com/status-im/status-go/walletdatabase" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/event" ) -func createWalletDb(t *testing.T) (*transfer.Database, func()) { - db, cleanup, err := helpers.SetupTestSQLDB(walletdatabase.DbInitializer{}, "local-notifications-tests-wallet-") - require.NoError(t, err) - return transfer.NewDB(db), func() { - require.NoError(t, cleanup()) - } -} - func TestServiceStartStop(t *testing.T) { db, stop := setupAppTestDb(t) defer stop() - walletDb, walletStop := createWalletDb(t) - defer walletStop() - - s, err := NewService(db, walletDb, 1777) + s, err := NewService(db) require.NoError(t, err) require.NoError(t, s.Start()) require.Equal(t, true, s.IsStarted()) @@ -45,101 +18,3 @@ func TestServiceStartStop(t *testing.T) { require.NoError(t, s.Stop()) require.Equal(t, false, s.IsStarted()) } - -func TestWalletSubscription(t *testing.T) { - db, stop := setupAppTestDb(t) - defer stop() - - walletDb, walletStop := createWalletDb(t) - defer walletStop() - - feed := &event.Feed{} - s, err := NewService(db, walletDb, 1777) - require.NoError(t, err) - require.NoError(t, s.Start()) - require.Equal(t, true, s.IsStarted()) - - require.NoError(t, s.SubscribeWallet(feed)) - require.Equal(t, true, s.IsWatchingWallet()) - - s.StartWalletWatcher() - require.Equal(t, true, s.IsWatchingWallet()) - - s.StopWalletWatcher() - require.Equal(t, false, s.IsWatchingWallet()) - - require.NoError(t, s.Stop()) - require.Equal(t, false, s.IsStarted()) -} - -func TestTransactionNotification(t *testing.T) { - db, stop := setupAppTestDb(t) - defer stop() - - walletDb, walletStop := createWalletDb(t) - defer walletStop() - - s, err := NewService(db, walletDb, 1777) - require.NoError(t, err) - require.NoError(t, s.Start()) - require.Equal(t, true, s.IsStarted()) - - var signalEvent []byte - - signalHandler := signal.MobileSignalHandler(func(s []byte) { - signalEvent = s - }) - - signal.SetMobileSignalHandler(signalHandler) - t.Cleanup(signal.ResetMobileSignalHandler) - - feed := &event.Feed{} - require.NoError(t, s.SubscribeWallet(feed)) - s.WatchingEnabled = true - - s.StartWalletWatcher() - - header := &transfer.DBHeader{ - Number: big.NewInt(1), - Hash: common.Hash{1}, - Address: common.Address{1}, - } - tx := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil) - receipt := types.NewReceipt(nil, false, 100) - receipt.Logs = []*types.Log{} - transfers := []transfer.Transfer{ - { - ID: common.Hash{1}, - Type: w_common.Type("eth"), - BlockHash: header.Hash, - BlockNumber: header.Number, - Transaction: tx, - Receipt: receipt, - Address: header.Address, - }, - } - require.NoError(t, walletDb.SaveBlocks(1777, []*transfer.DBHeader{header})) - require.NoError(t, transfer.SaveTransfersMarkBlocksLoaded(walletDb, 1777, header.Address, transfers, []*big.Int{header.Number})) - - feed.Send(walletevent.Event{ - Type: transfer.EventRecentHistoryReady, - Accounts: []common.Address{header.Address}, - }) - - feed.Send(walletevent.Event{ - Type: transfer.EventNewTransfers, - BlockNumber: header.Number, - Accounts: []common.Address{header.Address}, - }) - - require.NoError(t, utils.Eventually(func() error { - if signalEvent == nil { - return fmt.Errorf("signal was not handled") - } - require.True(t, strings.Contains(string(signalEvent), `"type":"local-notifications"`)) - require.True(t, strings.Contains(string(signalEvent), `"to":"`+header.Address.Hex())) - return nil - }, 2*time.Second, 100*time.Millisecond)) - - require.NoError(t, s.Stop()) -} diff --git a/services/local-notifications/database.go b/services/local-notifications/database.go index 4e06a750ab7..1ba3fe27614 100644 --- a/services/local-notifications/database.go +++ b/services/local-notifications/database.go @@ -5,8 +5,7 @@ import ( ) type Database struct { - db *sql.DB - network uint64 + db *sql.DB } type NotificationPreference struct { @@ -16,8 +15,8 @@ type NotificationPreference struct { Identifier string `json:"identifier,omitempty"` } -func NewDB(db *sql.DB, network uint64) *Database { - return &Database{db: db, network: network} +func NewDB(db *sql.DB) *Database { + return &Database{db: db} } func (db *Database) GetPreferences() (rst []NotificationPreference, err error) { @@ -37,17 +36,7 @@ func (db *Database) GetPreferences() (rst []NotificationPreference, err error) { return rst, nil } -func (db *Database) GetWalletPreference() (rst NotificationPreference, err error) { - pref := db.db.QueryRow("SELECT service, event, identifier, enabled FROM local_notifications_preferences WHERE service = 'wallet' AND event = 'transaction' AND identifier = 'all'") - - err = pref.Scan(&rst.Service, &rst.Event, &rst.Identifier, &rst.Enabled) - if err == sql.ErrNoRows { - return rst, nil - } - return -} - -func (db *Database) ChangeWalletPreference(preference bool) error { - _, err := db.db.Exec("INSERT OR REPLACE INTO local_notifications_preferences (service, event, identifier, enabled) VALUES ('wallet', 'transaction', 'all', ?)", preference) +func (db *Database) ChangePreference(p NotificationPreference) error { + _, err := db.db.Exec("INSERT OR REPLACE INTO local_notifications_preferences (enabled, service, event, identifier) VALUES (?, ?, ?, ?)", p.Enabled, p.Service, p.Event, p.Identifier) return err } diff --git a/services/local-notifications/database_test.go b/services/local-notifications/database_test.go index 367bc3a0499..69c1eea3302 100644 --- a/services/local-notifications/database_test.go +++ b/services/local-notifications/database_test.go @@ -17,32 +17,11 @@ func setupAppTestDb(t *testing.T) (*sql.DB, func()) { } func setupTestDB(t *testing.T, db *sql.DB) (*Database, func()) { - return NewDB(db, 1777), func() { + return NewDB(db), func() { require.NoError(t, db.Close()) } } -func TestWalletPreferences(t *testing.T) { - appDB, appStop := setupAppTestDb(t) - defer appStop() - - db, stop := setupTestDB(t, appDB) - defer stop() - - enabled := true - require.NoError(t, db.ChangeWalletPreference(enabled)) - rst, err := db.GetWalletPreference() - require.NoError(t, err) - require.Equal(t, enabled, rst.Enabled) - - enabled = false - require.NoError(t, db.ChangeWalletPreference(enabled)) - rst, err = db.GetWalletPreference() - require.Equal(t, enabled, rst.Enabled) - - require.NoError(t, err) -} - func TestPreferences(t *testing.T) { appDB, appStop := setupAppTestDb(t) defer appStop() @@ -52,7 +31,14 @@ func TestPreferences(t *testing.T) { enabled := true - require.NoError(t, db.ChangeWalletPreference(enabled)) + require.NoError(t, db.ChangePreference( + NotificationPreference{ + Enabled: enabled, + Service: "service", + Event: "event", + Identifier: "identifier", + }, + )) rst, err := db.GetPreferences() require.Equal(t, 1, len(rst)) diff --git a/services/local-notifications/transaction.go b/services/local-notifications/transaction.go deleted file mode 100644 index 7b5b49a6b2b..00000000000 --- a/services/local-notifications/transaction.go +++ /dev/null @@ -1,234 +0,0 @@ -package localnotifications - -import ( - "encoding/json" - "math/big" - - "go.uber.org/zap" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/event" - - gocommon "github.com/status-im/status-go/common" - "github.com/status-im/status-go/eth-node/types" - "github.com/status-im/status-go/logutils" - "github.com/status-im/status-go/multiaccounts/accounts" - "github.com/status-im/status-go/services/wallet/transfer" - "github.com/status-im/status-go/services/wallet/walletevent" -) - -type transactionState string - -const ( - walletDeeplinkPrefix = "status-app://wallet/" - - failed transactionState = "failed" - inbound transactionState = "inbound" - outbound transactionState = "outbound" -) - -// TransactionEvent - structure used to pass messages from wallet to bus -type TransactionEvent struct { - Type string `json:"type"` - BlockNumber *big.Int `json:"block-number"` - Accounts []common.Address `json:"accounts"` - MaxKnownBlocks map[common.Address]*big.Int `json:"max-known-blocks"` -} - -type transactionBody struct { - State transactionState `json:"state"` - From common.Address `json:"from"` - To common.Address `json:"to"` - FromAccount *accounts.Account `json:"fromAccount,omitempty"` - ToAccount *accounts.Account `json:"toAccount,omitempty"` - Value *hexutil.Big `json:"value"` - ERC20 bool `json:"erc20"` - Contract common.Address `json:"contract"` - Network uint64 `json:"network"` -} - -func (t transactionBody) MarshalJSON() ([]byte, error) { - type Alias transactionBody - item := struct{ *Alias }{Alias: (*Alias)(&t)} - return json.Marshal(item) -} - -func (s *Service) buildTransactionNotification(rawTransfer transfer.Transfer) *Notification { - logutils.ZapLogger().Debug("Handled a new transfer in buildTransactionNotification", zap.Any("info", rawTransfer)) - - var deeplink string - var state transactionState - transfer := transfer.CastToTransferView(rawTransfer) - - switch { - case transfer.TxStatus == hexutil.Uint64(0): - state = failed - case transfer.Address == transfer.To: - state = inbound - default: - state = outbound - } - - from, err := s.accountsDB.GetAccountByAddress(types.Address(transfer.From)) - - if err != nil { - logutils.ZapLogger().Debug("Could not select From account by address", zap.Error(err)) - } - - to, err := s.accountsDB.GetAccountByAddress(types.Address(transfer.To)) - - if err != nil { - logutils.ZapLogger().Debug("Could not select To account by address", zap.Error(err)) - } - - if from != nil { - deeplink = walletDeeplinkPrefix + from.Address.String() - } else if to != nil { - deeplink = walletDeeplinkPrefix + to.Address.String() - } - - body := transactionBody{ - State: state, - From: transfer.From, - To: transfer.Address, - FromAccount: from, - ToAccount: to, - Value: transfer.Value, - ERC20: string(transfer.Type) == "erc20", - Contract: transfer.Contract, - Network: transfer.NetworkID, - } - - return &Notification{ - BodyType: TypeTransaction, - ID: transfer.ID, - Body: body, - Deeplink: deeplink, - Category: CategoryTransaction, - } -} - -func (s *Service) transactionsHandler(payload TransactionEvent) { - logutils.ZapLogger().Info("Handled a new transaction", zap.Any("info", payload)) - - limit := 20 - if payload.BlockNumber != nil { - for _, address := range payload.Accounts { - if payload.BlockNumber.Cmp(payload.MaxKnownBlocks[address]) >= 0 { - logutils.ZapLogger().Info("Handled transfer for address", zap.Stringer("info", address)) - transfers, err := s.walletDB.GetTransfersByAddressAndBlock(s.chainID, address, payload.BlockNumber, int64(limit)) - if err != nil { - logutils.ZapLogger().Error("Could not fetch transfers", zap.Error(err)) - } - - for _, transaction := range transfers { - n := s.buildTransactionNotification(transaction) - pushMessage(n) - } - } - } - } -} - -// SubscribeWallet - Subscribes to wallet signals -func (s *Service) SubscribeWallet(publisher *event.Feed) error { - s.walletTransmitter.publisher = publisher - - preference, err := s.db.GetWalletPreference() - - if err != nil { - logutils.ZapLogger().Error("Failed to get wallet preference", zap.Error(err)) - s.WatchingEnabled = false - } else { - s.WatchingEnabled = preference.Enabled - } - - s.StartWalletWatcher() - - return err -} - -// StartWalletWatcher - Forward wallet events to notifications -func (s *Service) StartWalletWatcher() { - if s.walletTransmitter.quit != nil { - // already running, nothing to do - return - } - - if s.walletTransmitter.publisher == nil { - logutils.ZapLogger().Error("wallet publisher was not initialized") - return - } - - s.walletTransmitter.quit = make(chan struct{}) - events := make(chan walletevent.Event, 10) - sub := s.walletTransmitter.publisher.Subscribe(events) - - s.walletTransmitter.wg.Add(1) - - maxKnownBlocks := map[common.Address]*big.Int{} - go func() { - defer gocommon.LogOnPanic() - defer s.walletTransmitter.wg.Done() - historyReady := false - for { - select { - case <-s.walletTransmitter.quit: - sub.Unsubscribe() - return - case err := <-sub.Err(): - // technically event.Feed cannot send an error to subscription.Err channel. - // the only time we will get an event is when that channel is closed. - if err != nil { - logutils.ZapLogger().Error("wallet signals transmitter failed with", zap.Error(err)) - } - return - case event := <-events: - if event.Type == transfer.EventNewTransfers && historyReady && event.BlockNumber != nil { - newBlocks := false - for _, address := range event.Accounts { - if _, ok := maxKnownBlocks[address]; !ok { - newBlocks = true - maxKnownBlocks[address] = event.BlockNumber - } else if event.BlockNumber.Cmp(maxKnownBlocks[address]) == 1 { - maxKnownBlocks[address] = event.BlockNumber - newBlocks = true - } - } - if newBlocks && s.WatchingEnabled { - s.transmitter.publisher.Send(TransactionEvent{ - Type: string(event.Type), - BlockNumber: event.BlockNumber, - Accounts: event.Accounts, - MaxKnownBlocks: maxKnownBlocks, - }) - } - } else if event.Type == transfer.EventRecentHistoryReady { - historyReady = true - if event.BlockNumber != nil { - for _, address := range event.Accounts { - if _, ok := maxKnownBlocks[address]; !ok { - maxKnownBlocks[address] = event.BlockNumber - } - } - } - } - } - } - }() -} - -// StopWalletWatcher - stops watching for new wallet events -func (s *Service) StopWalletWatcher() { - if s.walletTransmitter.quit != nil { - close(s.walletTransmitter.quit) - s.walletTransmitter.wg.Wait() - s.walletTransmitter.quit = nil - } -} - -// IsWatchingWallet - check if local-notifications are subscribed to wallet updates -func (s *Service) IsWatchingWallet() bool { - return s.walletTransmitter.quit != nil -} diff --git a/services/local-notifications/types.go b/services/local-notifications/types.go index efc2af1a1b2..cc47eecbc5c 100644 --- a/services/local-notifications/types.go +++ b/services/local-notifications/types.go @@ -1,12 +1,10 @@ package localnotifications const ( - CategoryTransaction PushCategory = "transaction" CategoryMessage PushCategory = "newMessage" CategoryGroupInvite PushCategory = "groupInvite" CategoryCommunityRequestToJoin = "communityRequestToJoin" CategoryCommunityJoined = "communityJoined" - TypeTransaction NotificationType = "transaction" - TypeMessage NotificationType = "message" + TypeMessage NotificationType = "message" )