diff --git a/tools/walletextension/metrics/metrics.go b/tools/walletextension/metrics/metrics.go new file mode 100644 index 000000000..e79dae8cd --- /dev/null +++ b/tools/walletextension/metrics/metrics.go @@ -0,0 +1,189 @@ +package metrics + +import ( + "crypto/sha256" + "encoding/hex" + "log" + "sync" + "sync/atomic" + "time" + + "github.com/ten-protocol/go-ten/tools/walletextension/storage/database/cosmosdb" +) + +const ( + // Persistence intervals (how often metrics are saved to CosmosDB) + MetricsPersistInterval = 10 * time.Minute + + // Cleanup intervals (how often inactive users are cleaned up) + InactiveUserCleanupInterval = 1 * time.Hour + + // Activity thresholds + UserInactivityThreshold = 30 * 24 * time.Hour // 30 days + MonthlyActiveUserWindow = 30 * 24 * time.Hour // 30 days +) + +// Metrics interface defines the metrics operations +type Metrics interface { + RecordNewUser() + RecordAccountRegistered() + RecordUserActivity(anonymousID string) + GetTotalUsers() uint64 + GetTotalAccountsRegistered() uint64 + GetMonthlyActiveUsers() int + Stop() +} + +type MetricsTracker struct { + totalUsers atomic.Uint64 + accountsRegistered atomic.Uint64 + activeUsers map[string]time.Time // key is double-hashed userID + activeUserLock sync.RWMutex + storage *cosmosdb.MetricsStorageCosmosDB + persistTicker *time.Ticker +} + +func NewMetricsTracker(storage *cosmosdb.MetricsStorageCosmosDB) Metrics { + mt := &MetricsTracker{ + activeUsers: make(map[string]time.Time), + storage: storage, + persistTicker: time.NewTicker(MetricsPersistInterval), + } + + // Load existing metrics + if metrics, err := storage.LoadMetrics(); err == nil { + mt.totalUsers.Store(metrics.TotalUsers) + mt.accountsRegistered.Store(metrics.AccountsRegistered) + + mt.activeUserLock.Lock() + for hashedUserID, timestamp := range metrics.ActiveUsers { + if t, err := time.Parse(time.RFC3339, timestamp); err == nil { + mt.activeUsers[hashedUserID] = t + } + } + mt.activeUserLock.Unlock() + } + + // Start cleanup routine for inactive users + go mt.cleanupInactiveUsers() + go mt.persistMetrics() + + return mt +} + +// hashUserID creates a double-hashed version of the userID +func (mt *MetricsTracker) hashUserID(userID []byte) string { + // First hash + firstHash := sha256.Sum256(userID) + // Second hash + secondHash := sha256.Sum256(firstHash[:]) + return hex.EncodeToString(secondHash[:]) +} + +func (mt *MetricsTracker) RecordNewUser() { + mt.totalUsers.Add(1) +} + +// RecordAccountRegistered increments the total number of registered accounts +func (mt *MetricsTracker) RecordAccountRegistered() { + mt.accountsRegistered.Add(1) +} + +// RecordUserActivity updates the last activity timestamp for a user +func (mt *MetricsTracker) RecordUserActivity(anonymousID string) { + hashedUserID := mt.hashUserID([]byte(anonymousID)) + + mt.activeUserLock.Lock() + mt.activeUsers[hashedUserID] = time.Now() + mt.activeUserLock.Unlock() +} + +// GetTotalUsers returns the total number of registered users +func (mt *MetricsTracker) GetTotalUsers() uint64 { + return mt.totalUsers.Load() +} + +// GetTotalAccountsRegistered returns the total number of registered accounts +func (mt *MetricsTracker) GetTotalAccountsRegistered() uint64 { + return mt.accountsRegistered.Load() +} + +// GetMonthlyActiveUsers returns the number of users active in the last 30 days +func (mt *MetricsTracker) GetMonthlyActiveUsers() int { + mt.activeUserLock.RLock() + defer mt.activeUserLock.RUnlock() + + count := 0 + activeThreshold := time.Now().Add(-MonthlyActiveUserWindow) + + for _, lastActive := range mt.activeUsers { + if lastActive.After(activeThreshold) { + count++ + } + } + return count +} + +// persistMetrics periodically saves metrics to CosmosDB +func (mt *MetricsTracker) persistMetrics() { + for range mt.persistTicker.C { + mt.saveMetrics() + } +} + +func (mt *MetricsTracker) saveMetrics() { + mt.activeUserLock.RLock() + activeUsersMap := make(map[string]string) + for hashedUserID, timestamp := range mt.activeUsers { + activeUsersMap[hashedUserID] = timestamp.UTC().Format(time.RFC3339) + } + mt.activeUserLock.RUnlock() + + metrics := &cosmosdb.MetricsDocument{ + ID: cosmosdb.METRICS_DOC_ID, + TotalUsers: mt.totalUsers.Load(), + AccountsRegistered: mt.accountsRegistered.Load(), + ActiveUsers: activeUsersMap, + } + + if err := mt.storage.SaveMetrics(metrics); err != nil { + // Either log the error properly or return it + log.Printf("Failed to persist metrics: %v", err) + } +} + +func (mt *MetricsTracker) cleanupInactiveUsers() { + ticker := time.NewTicker(InactiveUserCleanupInterval) + for range ticker.C { + mt.activeUserLock.Lock() + inactiveThreshold := time.Now().Add(-UserInactivityThreshold) + + for userID, lastActive := range mt.activeUsers { + if lastActive.Before(inactiveThreshold) { + delete(mt.activeUsers, userID) + } + } + mt.activeUserLock.Unlock() + } +} + +// Stop cleanly stops the metrics tracker +func (mt *MetricsTracker) Stop() { + mt.persistTicker.Stop() + mt.saveMetrics() // Final save before stopping +} + +// NoOpMetricsTracker implements Metrics interface but does nothing +type NoOpMetricsTracker struct{} + +func NewNoOpMetricsTracker() Metrics { + return &NoOpMetricsTracker{} +} + +func (mt *NoOpMetricsTracker) RecordNewUser() {} +func (mt *NoOpMetricsTracker) RecordAccountRegistered() {} +func (mt *NoOpMetricsTracker) RecordUserActivity(string) {} +func (mt *NoOpMetricsTracker) GetTotalUsers() uint64 { return 0 } +func (mt *NoOpMetricsTracker) GetTotalAccountsRegistered() uint64 { return 0 } +func (mt *NoOpMetricsTracker) GetMonthlyActiveUsers() int { return 0 } +func (mt *NoOpMetricsTracker) Stop() {} diff --git a/tools/walletextension/rpcapi/utils.go b/tools/walletextension/rpcapi/utils.go index 99f4d879b..828b47699 100644 --- a/tools/walletextension/rpcapi/utils.go +++ b/tools/walletextension/rpcapi/utils.go @@ -87,6 +87,8 @@ func ExecAuthRPC[R any](ctx context.Context, w *services.Services, cfg *AuthExec return nil, err } + w.MetricsTracker.RecordUserActivity(hexutils.BytesToHex(user.ID)) + rateLimitAllowed, requestUUID := w.RateLimiter.Allow(gethcommon.Address(user.ID)) defer w.RateLimiter.SetRequestEnd(gethcommon.Address(user.ID), requestUUID) if !rateLimitAllowed { diff --git a/tools/walletextension/services/wallet_extension.go b/tools/walletextension/services/wallet_extension.go index 98981963c..9b667a38f 100644 --- a/tools/walletextension/services/wallet_extension.go +++ b/tools/walletextension/services/wallet_extension.go @@ -22,6 +22,7 @@ import ( "github.com/status-im/keycard-go/hexutils" "github.com/ten-protocol/go-ten/tools/walletextension/cache" + "github.com/ten-protocol/go-ten/tools/walletextension/metrics" gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" @@ -49,6 +50,7 @@ type Services struct { Config *common.Config NewHeadsService *subscriptioncommon.NewHeadsService cacheInvalidationCh chan *tencommon.BatchHeader + MetricsTracker metrics.Metrics } type NewHeadNotifier interface { @@ -58,7 +60,7 @@ type NewHeadNotifier interface { // number of rpc responses to cache const rpcResponseCacheSize = 1_000_000 -func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserStorage, stopControl *stopcontrol.StopControl, version string, logger gethlog.Logger, config *common.Config) *Services { +func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserStorage, stopControl *stopcontrol.StopControl, version string, logger gethlog.Logger, metricsTracker metrics.Metrics, config *common.Config) *Services { newGatewayCache, err := cache.NewCache(rpcResponseCacheSize, logger) if err != nil { logger.Error(fmt.Errorf("could not create cache. Cause: %w", err).Error()) @@ -80,6 +82,7 @@ func NewServices(hostAddrHTTP string, hostAddrWS string, storage storage.UserSto RateLimiter: rateLimiter, Config: config, cacheInvalidationCh: make(chan *tencommon.BatchHeader), + MetricsTracker: metricsTracker, } services.NewHeadsService = subscriptioncommon.NewNewHeadsService( @@ -185,6 +188,7 @@ func (w *Services) GenerateAndStoreNewUser() ([]byte, error) { w.Logger().Error(fmt.Sprintf("failed to save user to the database: %s", err)) return nil, err } + w.MetricsTracker.RecordNewUser() requestEndTime := time.Now() duration := requestEndTime.Sub(requestStartTime) @@ -194,6 +198,7 @@ func (w *Services) GenerateAndStoreNewUser() ([]byte, error) { // AddAddressToUser checks if a message is in correct format and if signature is valid. If all checks pass we save address and signature against userID func (w *Services) AddAddressToUser(userID []byte, address string, signature []byte, signatureType viewingkey.SignatureType) error { + w.MetricsTracker.RecordUserActivity(hexutils.BytesToHex(userID)) audit(w, "Adding address to user: %s, address: %s", hexutils.BytesToHex(userID), address) requestStartTime := time.Now() addressFromMessage := gethcommon.HexToAddress(address) @@ -213,6 +218,7 @@ func (w *Services) AddAddressToUser(userID []byte, address string, signature []b w.Logger().Error(fmt.Errorf("error while storing account (%s) for user (%s): %w", addressFromMessage.Hex(), userID, err).Error()) return err } + w.MetricsTracker.RecordAccountRegistered() audit(w, "Storing new address for user: %s, address: %s, duration: %d ", hexutils.BytesToHex(userID), address, time.Since(requestStartTime).Milliseconds()) return nil @@ -220,6 +226,7 @@ func (w *Services) AddAddressToUser(userID []byte, address string, signature []b // UserHasAccount checks if provided account exist in the database for given userID func (w *Services) UserHasAccount(userID []byte, address string) (bool, error) { + w.MetricsTracker.RecordUserActivity(hexutils.BytesToHex(userID)) audit(w, "Checking if user has account: %s, address: %s", hexutils.BytesToHex(userID), address) addressBytes, err := hex.DecodeString(address[2:]) // remove 0x prefix from address if err != nil { diff --git a/tools/walletextension/storage/database/cosmosdb/metrics_storage.go b/tools/walletextension/storage/database/cosmosdb/metrics_storage.go new file mode 100644 index 000000000..7537a8060 --- /dev/null +++ b/tools/walletextension/storage/database/cosmosdb/metrics_storage.go @@ -0,0 +1,133 @@ +package cosmosdb + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" +) + +const ( + METRICS_CONTAINER_NAME = "metrics" + METRICS_DOC_ID = "global_metrics" +) + +type MetricsDocument struct { + ID string `json:"id"` + TotalUsers uint64 `json:"totalUsers"` + AccountsRegistered uint64 `json:"accountsRegistered"` + ActiveUsers map[string]string `json:"activeUsers"` // double-hashed userID -> ISO timestamp + ActiveUsersCount int `json:"activeUsersCount"` + LastUpdated string `json:"lastUpdated"` +} + +// MetricsStorageCosmosDB handles metrics persistence in CosmosDB +type MetricsStorageCosmosDB struct { + client *azcosmos.Client + metricsContainer *azcosmos.ContainerClient +} + +func NewMetricsStorage(connectionString string) (*MetricsStorageCosmosDB, error) { + client, err := azcosmos.NewClientFromConnectionString(connectionString, nil) + if err != nil { + return nil, fmt.Errorf("failed to create CosmosDB client: %w", err) + } + + ctx := context.Background() + + // Ensure database exists + _, err = client.CreateDatabase(ctx, azcosmos.DatabaseProperties{ID: DATABASE_NAME}, nil) + if err != nil && !strings.Contains(err.Error(), "Conflict") { + return nil, fmt.Errorf("failed to create database: %w", err) + } + + metricsContainer, err := client.NewContainer(DATABASE_NAME, METRICS_CONTAINER_NAME) + if err != nil { + return nil, fmt.Errorf("failed to get metrics container: %w", err) + } + + return &MetricsStorageCosmosDB{ + client: client, + metricsContainer: metricsContainer, + }, nil +} + +func (m *MetricsStorageCosmosDB) LoadMetrics() (*MetricsDocument, error) { + ctx := context.Background() + partitionKey := azcosmos.NewPartitionKeyString(METRICS_DOC_ID) + + response, err := m.metricsContainer.ReadItem(ctx, partitionKey, METRICS_DOC_ID, nil) + if err != nil { + if strings.Contains(err.Error(), "NotFound") { + // Initialize with empty metrics if not found + return &MetricsDocument{ + ID: METRICS_DOC_ID, + ActiveUsers: make(map[string]string), + }, nil + } + return nil, err + } + + var doc MetricsDocument + if err := json.Unmarshal(response.Value, &doc); err != nil { + return nil, err + } + return &doc, nil +} + +func (m *MetricsStorageCosmosDB) SaveMetrics(metrics *MetricsDocument) error { + ctx := context.Background() + partitionKey := azcosmos.NewPartitionKeyString(METRICS_DOC_ID) + + // Calculate active users count and clean up inactive users + activeThreshold := time.Now().Add(-30 * 24 * time.Hour) // 30 days + activeCount := 0 + activeUsersMap := make(map[string]string) + + for userID, timestampStr := range metrics.ActiveUsers { + if timestamp, err := time.Parse(time.RFC3339, timestampStr); err == nil { + if timestamp.After(activeThreshold) { + activeCount++ + activeUsersMap[userID] = timestampStr + } + } + } + + metrics.ActiveUsers = activeUsersMap // Only keep active users + metrics.ActiveUsersCount = activeCount + metrics.LastUpdated = time.Now().UTC().Format(time.RFC3339) + + docJSON, err := json.Marshal(metrics) + if err != nil { + return err + } + + _, err = m.metricsContainer.UpsertItem(ctx, partitionKey, docJSON, nil) + return err +} + +// NoOpMetricsStorage is a no-op implementation of metrics storage +type noOpMetricsStorage struct{} + +// MetricsStorage interface defines the metrics storage operations +type MetricsStorage interface { + LoadMetrics() (*MetricsDocument, error) + SaveMetrics(*MetricsDocument) error +} + +func NewNoOpMetricsStorage() MetricsStorage { + return &noOpMetricsStorage{} +} + +func (n *noOpMetricsStorage) LoadMetrics() (*MetricsDocument, error) { + return &MetricsDocument{ + ActiveUsers: make(map[string]string), + }, nil +} + +func (n *noOpMetricsStorage) SaveMetrics(*MetricsDocument) error { + return nil +} diff --git a/tools/walletextension/storage/storage.go b/tools/walletextension/storage/storage.go index ca39c628d..37375b049 100644 --- a/tools/walletextension/storage/storage.go +++ b/tools/walletextension/storage/storage.go @@ -40,3 +40,11 @@ func New(dbType, dbConnectionURL, dbPath string, randomKey []byte, logger gethlo return NewUserStorageWithCache(underlyingStorage, logger) } + +// NewMetricsStorage is a factory function to create a MetricsStorage instance +func NewMetricsStorage(dbType, dbConnectionURL string) (*cosmosdb.MetricsStorageCosmosDB, error) { + if dbType == "cosmosDB" { + return cosmosdb.NewMetricsStorage(dbConnectionURL) + } + return nil, nil // Return nil for other database types +} diff --git a/tools/walletextension/walletextension_container.go b/tools/walletextension/walletextension_container.go index a3dff0ba6..c7ac5f0de 100644 --- a/tools/walletextension/walletextension_container.go +++ b/tools/walletextension/walletextension_container.go @@ -6,6 +6,7 @@ import ( "os" "time" + "github.com/ten-protocol/go-ten/tools/walletextension/metrics" "github.com/ten-protocol/go-ten/tools/walletextension/services" "github.com/ten-protocol/go-ten/go/common/subscription" @@ -46,6 +47,19 @@ func NewContainerFromConfig(config wecommon.Config, logger gethlog.Logger) *Cont os.Exit(1) } + // Create metrics tracker + var metricsTracker metrics.Metrics + if config.DBType == "cosmosDB" { + metricsStorage, err := storage.NewMetricsStorage(config.DBType, config.DBConnectionURL) + if err != nil { + logger.Crit("unable to create metrics storage", log.ErrKey, err) + os.Exit(1) + } + metricsTracker = metrics.NewMetricsTracker(metricsStorage) + } else { + metricsTracker = metrics.NewNoOpMetricsTracker() + } + // start the database with the encryption key userStorage, err := storage.New(config.DBType, config.DBConnectionURL, config.DBPathOverride, encryptionKey, logger) if err != nil { @@ -60,7 +74,7 @@ func NewContainerFromConfig(config wecommon.Config, logger gethlog.Logger) *Cont } stopControl := stopcontrol.New() - walletExt := services.NewServices(hostRPCBindAddrHTTP, hostRPCBindAddrWS, userStorage, stopControl, version, logger, &config) + walletExt := services.NewServices(hostRPCBindAddrHTTP, hostRPCBindAddrWS, userStorage, stopControl, version, logger, metricsTracker, &config) cfg := &node.RPCConfig{ EnableHTTP: true, HTTPPort: config.WalletExtensionPortHTTP, @@ -145,6 +159,11 @@ func NewContainerFromConfig(config wecommon.Config, logger gethlog.Logger) *Cont }, }) + // Add metrics tracker to stop sequence + stopControl.OnStop(func() { + metricsTracker.Stop() + }) + return &Container{ stopControl: stopControl, rpcServer: rpcServer,