Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) chore_: cleanup wallet activity code #6168

Draft
wants to merge 3 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions node/status_node_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ import (
"github.com/status-im/status-go/services/wakuv2ext"
"github.com/status-im/status-go/services/wallet"
"github.com/status-im/status-go/services/wallet/thirdparty"
"github.com/status-im/status-go/services/wallet/transfer"
"github.com/status-im/status-go/services/web3provider"
"github.com/status-im/status-go/timesource"
"github.com/status-im/status-go/waku"
Expand Down Expand Up @@ -606,7 +605,7 @@ func (b *StatusNode) walletService(accountsDB *accounts.Database, appDB *sql.DB,
func (b *StatusNode) localNotificationsService(network uint64) (*localnotifications.Service, error) {
var err error
if b.localNotificationsSrvc == nil {
b.localNotificationsSrvc, err = localnotifications.NewService(b.appDB, transfer.NewDB(b.walletDB), network)
b.localNotificationsSrvc, err = localnotifications.NewService(b.appDB)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -684,13 +683,6 @@ func (b *StatusNode) StartLocalNotifications() error {
}
}

err := b.localNotificationsSrvc.SubscribeWallet(&b.walletFeed)

if err != nil {
b.logger.Error("LocalNotifications service could not subscribe to wallet on StartLocalNotifications", zap.Error(err))
return nil
}

return nil
}

Expand Down
14 changes: 0 additions & 14 deletions services/local-notifications/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package localnotifications

import (
"context"

"github.com/status-im/status-go/logutils"
)

func NewAPI(s *Service) *API {
Expand All @@ -17,15 +15,3 @@ type API struct {
func (api *API) NotificationPreferences(ctx context.Context) ([]NotificationPreference, error) {
return api.s.db.GetPreferences()
}

func (api *API) SwitchWalletNotifications(ctx context.Context, preference bool) error {
logutils.ZapLogger().Debug("Switch Transaction Notification")
err := api.s.db.ChangeWalletPreference(preference)
if err != nil {
return err
}

api.s.WatchingEnabled = preference

return nil
}
82 changes: 5 additions & 77 deletions services/local-notifications/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,11 @@ package localnotifications
import (
"database/sql"
"encoding/json"
"sync"

"go.uber.org/zap"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
gocommon "github.com/status-im/status-go/common"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/services/wallet/transfer"
"github.com/status-im/status-go/signal"
)

Expand Down Expand Up @@ -79,41 +72,17 @@ type MessageEvent struct{}
// CustomEvent - structure used to pass custom user set messages to bus
type CustomEvent struct{}

type transmitter struct {
publisher *event.Feed

wg sync.WaitGroup
quit chan struct{}
}

// Service keeps the state of message bus
type Service struct {
started bool
WatchingEnabled bool
chainID uint64
transmitter *transmitter
walletTransmitter *transmitter
db *Database
walletDB *transfer.Database
accountsDB *accounts.Database
started bool
db *Database
}

func NewService(appDB *sql.DB, walletDB *transfer.Database, chainID uint64) (*Service, error) {
db := NewDB(appDB, chainID)
accountsDB, err := accounts.NewDB(appDB)
if err != nil {
return nil, err
}
trans := &transmitter{}
walletTrans := &transmitter{}
func NewService(appDB *sql.DB) (*Service, error) {
db := NewDB(appDB)

return &Service{
db: db,
chainID: chainID,
walletDB: walletDB,
accountsDB: accountsDB,
transmitter: trans,
walletTransmitter: walletTrans,
db: db,
}, nil
}

Expand Down Expand Up @@ -165,54 +134,13 @@ func pushMessage(notification *Notification) {
// Start Worker which processes all incoming messages
func (s *Service) Start() error {
s.started = true

s.transmitter.quit = make(chan struct{})
s.transmitter.publisher = &event.Feed{}

events := make(chan TransactionEvent, 10)
sub := s.transmitter.publisher.Subscribe(events)

s.transmitter.wg.Add(1)
go func() {
defer gocommon.LogOnPanic()
defer s.transmitter.wg.Done()
for {
select {
case <-s.transmitter.quit:
sub.Unsubscribe()
return
case err := <-sub.Err():
if err != nil {
logutils.ZapLogger().Error("Local notifications transmitter failed with", zap.Error(err))
}
return
case event := <-events:
s.transactionsHandler(event)
}
}
}()

logutils.ZapLogger().Info("Successful start")

return nil
}

// Stop worker
func (s *Service) Stop() error {
s.started = false

if s.transmitter.quit != nil {
close(s.transmitter.quit)
s.transmitter.wg.Wait()
s.transmitter.quit = nil
}

if s.walletTransmitter.quit != nil {
close(s.walletTransmitter.quit)
s.walletTransmitter.wg.Wait()
s.walletTransmitter.quit = nil
}

return nil
}

Expand Down
127 changes: 1 addition & 126 deletions services/local-notifications/core_test.go
Original file line number Diff line number Diff line change
@@ -1,145 +1,20 @@
package localnotifications

import (
"fmt"
"math/big"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"

w_common "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/transfer"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/signal"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/t/utils"
"github.com/status-im/status-go/walletdatabase"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
)

func createWalletDb(t *testing.T) (*transfer.Database, func()) {
db, cleanup, err := helpers.SetupTestSQLDB(walletdatabase.DbInitializer{}, "local-notifications-tests-wallet-")
require.NoError(t, err)
return transfer.NewDB(db), func() {
require.NoError(t, cleanup())
}
}

func TestServiceStartStop(t *testing.T) {
db, stop := setupAppTestDb(t)
defer stop()

walletDb, walletStop := createWalletDb(t)
defer walletStop()

s, err := NewService(db, walletDb, 1777)
s, err := NewService(db)
require.NoError(t, err)
require.NoError(t, s.Start())
require.Equal(t, true, s.IsStarted())

require.NoError(t, s.Stop())
require.Equal(t, false, s.IsStarted())
}

func TestWalletSubscription(t *testing.T) {
db, stop := setupAppTestDb(t)
defer stop()

walletDb, walletStop := createWalletDb(t)
defer walletStop()

feed := &event.Feed{}
s, err := NewService(db, walletDb, 1777)
require.NoError(t, err)
require.NoError(t, s.Start())
require.Equal(t, true, s.IsStarted())

require.NoError(t, s.SubscribeWallet(feed))
require.Equal(t, true, s.IsWatchingWallet())

s.StartWalletWatcher()
require.Equal(t, true, s.IsWatchingWallet())

s.StopWalletWatcher()
require.Equal(t, false, s.IsWatchingWallet())

require.NoError(t, s.Stop())
require.Equal(t, false, s.IsStarted())
}

func TestTransactionNotification(t *testing.T) {
db, stop := setupAppTestDb(t)
defer stop()

walletDb, walletStop := createWalletDb(t)
defer walletStop()

s, err := NewService(db, walletDb, 1777)
require.NoError(t, err)
require.NoError(t, s.Start())
require.Equal(t, true, s.IsStarted())

var signalEvent []byte

signalHandler := signal.MobileSignalHandler(func(s []byte) {
signalEvent = s
})

signal.SetMobileSignalHandler(signalHandler)
t.Cleanup(signal.ResetMobileSignalHandler)

feed := &event.Feed{}
require.NoError(t, s.SubscribeWallet(feed))
s.WatchingEnabled = true

s.StartWalletWatcher()

header := &transfer.DBHeader{
Number: big.NewInt(1),
Hash: common.Hash{1},
Address: common.Address{1},
}
tx := types.NewTransaction(1, common.Address{1}, nil, 10, big.NewInt(10), nil)
receipt := types.NewReceipt(nil, false, 100)
receipt.Logs = []*types.Log{}
transfers := []transfer.Transfer{
{
ID: common.Hash{1},
Type: w_common.Type("eth"),
BlockHash: header.Hash,
BlockNumber: header.Number,
Transaction: tx,
Receipt: receipt,
Address: header.Address,
},
}
require.NoError(t, walletDb.SaveBlocks(1777, []*transfer.DBHeader{header}))
require.NoError(t, transfer.SaveTransfersMarkBlocksLoaded(walletDb, 1777, header.Address, transfers, []*big.Int{header.Number}))

feed.Send(walletevent.Event{
Type: transfer.EventRecentHistoryReady,
Accounts: []common.Address{header.Address},
})

feed.Send(walletevent.Event{
Type: transfer.EventNewTransfers,
BlockNumber: header.Number,
Accounts: []common.Address{header.Address},
})

require.NoError(t, utils.Eventually(func() error {
if signalEvent == nil {
return fmt.Errorf("signal was not handled")
}
require.True(t, strings.Contains(string(signalEvent), `"type":"local-notifications"`))
require.True(t, strings.Contains(string(signalEvent), `"to":"`+header.Address.Hex()))
return nil
}, 2*time.Second, 100*time.Millisecond))

require.NoError(t, s.Stop())
}
21 changes: 5 additions & 16 deletions services/local-notifications/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import (
)

type Database struct {
db *sql.DB
network uint64
db *sql.DB
}

type NotificationPreference struct {
Expand All @@ -16,8 +15,8 @@ type NotificationPreference struct {
Identifier string `json:"identifier,omitempty"`
}

func NewDB(db *sql.DB, network uint64) *Database {
return &Database{db: db, network: network}
func NewDB(db *sql.DB) *Database {
return &Database{db: db}
}

func (db *Database) GetPreferences() (rst []NotificationPreference, err error) {
Expand All @@ -37,17 +36,7 @@ func (db *Database) GetPreferences() (rst []NotificationPreference, err error) {
return rst, nil
}

func (db *Database) GetWalletPreference() (rst NotificationPreference, err error) {
pref := db.db.QueryRow("SELECT service, event, identifier, enabled FROM local_notifications_preferences WHERE service = 'wallet' AND event = 'transaction' AND identifier = 'all'")

err = pref.Scan(&rst.Service, &rst.Event, &rst.Identifier, &rst.Enabled)
if err == sql.ErrNoRows {
return rst, nil
}
return
}

func (db *Database) ChangeWalletPreference(preference bool) error {
_, err := db.db.Exec("INSERT OR REPLACE INTO local_notifications_preferences (service, event, identifier, enabled) VALUES ('wallet', 'transaction', 'all', ?)", preference)
func (db *Database) ChangePreference(p NotificationPreference) error {
_, err := db.db.Exec("INSERT OR REPLACE INTO local_notifications_preferences (enabled, service, event, identifier) VALUES (?, ?, ?, ?)", p.Enabled, p.Service, p.Event, p.Identifier)
return err
}
Loading
Loading