diff --git a/cmd/container.go b/cmd/container.go index 7670505b2..7c187eea6 100644 --- a/cmd/container.go +++ b/cmd/container.go @@ -43,6 +43,7 @@ func resolveOptions(v *viper.Viper, userOptions ...fx.Option) []fx.Option { // Handle OpenTelemetry options = append(options, otlptraces.CLITracesModule(v)) + redisLockStrategy := false switch v.GetString(lockStrategyFlag) { case "redis": var tlsConfig *tls.Config @@ -58,6 +59,7 @@ func resolveOptions(v *viper.Viper, userOptions ...fx.Option) []fx.Option { LockRetry: v.GetDuration(lockStrategyRedisRetryFlag), TLSConfig: tlsConfig, })) + redisLockStrategy = true } // Handle api part @@ -87,6 +89,7 @@ func resolveOptions(v *viper.Viper, userOptions ...fx.Option) []fx.Option { ConnString: v.GetString(storagePostgresConnectionStringFlag), } }(), + RedisLockStrategy: redisLockStrategy, })) options = append(options, internal.NewAnalyticsModule(v, Version)) diff --git a/cmd/internal/analytics_test.go b/cmd/internal/analytics_test.go index acc2789cd..b4199a1b0 100644 --- a/cmd/internal/analytics_test.go +++ b/cmd/internal/analytics_test.go @@ -126,7 +126,7 @@ func TestAnalyticsModule(t *testing.T) { module, fx.Provide(func(lc fx.Lifecycle) (storage.Driver[ledger.Store], error) { id := uuid.New() - driver := sqlstorage.NewDriver("sqlite", sqlstorage.NewSQLiteDB(os.TempDir(), id)) + driver := sqlstorage.NewDriver("sqlite", sqlstorage.NewSQLiteDB(os.TempDir(), id), false) lc.Append(fx.Hook{ OnStart: driver.Initialize, }) diff --git a/pkg/analytics/segment_test.go b/pkg/analytics/segment_test.go index b7d89a807..66a8c8ac7 100644 --- a/pkg/analytics/segment_test.go +++ b/pkg/analytics/segment_test.go @@ -88,7 +88,7 @@ var ( }), fx.Provide(func(lc fx.Lifecycle) (storage.Driver[ledger.Store], error) { id := uuid.New() - driver := sqlstorage.NewDriver("sqlite", sqlstorage.NewSQLiteDB(os.TempDir(), id)) + driver := sqlstorage.NewDriver("sqlite", sqlstorage.NewSQLiteDB(os.TempDir(), id), false) lc.Append(fx.Hook{ OnStart: driver.Initialize, }) diff --git a/pkg/ledger/execute_script.go b/pkg/ledger/execute_script.go index 303a6e20c..8f2131997 100644 --- a/pkg/ledger/execute_script.go +++ b/pkg/ledger/execute_script.go @@ -67,6 +67,7 @@ func (l *Ledger) ExecuteScript(ctx context.Context, preview bool, script core.Sc "get transactions with reference") } if len(txs.Data) > 0 { + return core.ExpandedTransaction{}, NewConflictError() } } diff --git a/pkg/ledgertesting/storage.go b/pkg/ledgertesting/storage.go index f203c9e0d..c8b524a17 100644 --- a/pkg/ledgertesting/storage.go +++ b/pkg/ledgertesting/storage.go @@ -22,11 +22,11 @@ func StorageDriverName() string { return "sqlite" } -func StorageDriver() (*sqlstorage.Driver, func(), error) { +func StorageDriver(multipleInstance bool) (*sqlstorage.Driver, func(), error) { switch StorageDriverName() { case "sqlite": id := uuid.New() - return sqlstorage.NewDriver("sqlite", sqlstorage.NewSQLiteDB(os.TempDir(), id)), func() {}, nil + return sqlstorage.NewDriver("sqlite", sqlstorage.NewSQLiteDB(os.TempDir(), id), multipleInstance), func() {}, nil case "postgres": pgServer, err := pgtesting.PostgresServer() if err != nil { @@ -39,6 +39,7 @@ func StorageDriver() (*sqlstorage.Driver, func(), error) { return sqlstorage.NewDriver( "postgres", sqlstorage.NewPostgresDB(db), + multipleInstance, ), func() { _ = pgServer.Close() }, nil @@ -48,7 +49,7 @@ func StorageDriver() (*sqlstorage.Driver, func(), error) { func ProvideStorageDriver() fx.Option { return fx.Provide(func(lc fx.Lifecycle) (*sqlstorage.Driver, error) { - driver, stopFn, err := StorageDriver() + driver, stopFn, err := StorageDriver(false) if err != nil { return nil, err } diff --git a/pkg/storage/sqlstorage/accounts.go b/pkg/storage/sqlstorage/accounts.go index 7543ac225..c2769e1f1 100644 --- a/pkg/storage/sqlstorage/accounts.go +++ b/pkg/storage/sqlstorage/accounts.go @@ -202,7 +202,9 @@ func (s *Store) GetAccounts(ctx context.Context, q ledger.AccountsQuery) (api.Cu func (s *Store) GetAccount(ctx context.Context, addr string) (*core.Account, error) { entry, ok := s.cache.Get(addr) - if ok { + // When having a single instance of the ledger, we can use the cached account. + // Otherwise, compute it every single time for now. + if !s.multipleInstance && ok { return entry.(*core.AccountWithVolumes).Account.Copy(), nil } @@ -278,7 +280,7 @@ func (s *Store) ensureAccountExists(ctx context.Context, account string) error { func (s *Store) UpdateAccountMetadata(ctx context.Context, address string, metadata core.Metadata, at time.Time) error { entry, ok := s.cache.Get(address) - if ok { + if !s.multipleInstance && ok { account := entry.(*core.AccountWithVolumes) account.Metadata = account.Metadata.Merge(metadata) } diff --git a/pkg/storage/sqlstorage/accounts_test.go b/pkg/storage/sqlstorage/accounts_test.go index 66b61d985..fe914a7ef 100644 --- a/pkg/storage/sqlstorage/accounts_test.go +++ b/pkg/storage/sqlstorage/accounts_test.go @@ -14,7 +14,7 @@ func TestAccounts(t *testing.T) { d := NewDriver("sqlite", &sqliteDB{ directory: os.TempDir(), dbName: uuid.New(), - }) + }, false) assert.NoError(t, d.Initialize(context.Background())) @@ -28,6 +28,31 @@ func TestAccounts(t *testing.T) { _, err = store.Initialize(context.Background()) assert.NoError(t, err) + accountTests(t, store) +} + +func TestAccountsMultipleInstance(t *testing.T) { + d := NewDriver("sqlite", &sqliteDB{ + directory: os.TempDir(), + dbName: uuid.New(), + }, true) + + assert.NoError(t, d.Initialize(context.Background())) + + defer func(d *Driver, ctx context.Context) { + assert.NoError(t, d.Close(ctx)) + }(d, context.Background()) + + store, _, err := d.GetLedgerStore(context.Background(), "foo", true) + assert.NoError(t, err) + + _, err = store.Initialize(context.Background()) + assert.NoError(t, err) + + accountTests(t, store) +} + +func accountTests(t *testing.T, store *Store) { t.Run("success balance", func(t *testing.T) { q := ledger.AccountsQuery{ PageSize: 10, diff --git a/pkg/storage/sqlstorage/aggregations.go b/pkg/storage/sqlstorage/aggregations.go index 7158172f4..04f4fbdf0 100644 --- a/pkg/storage/sqlstorage/aggregations.go +++ b/pkg/storage/sqlstorage/aggregations.go @@ -13,7 +13,9 @@ import ( func (s *Store) GetAccountWithVolumes(ctx context.Context, address string) (*core.AccountWithVolumes, error) { account, ok := s.cache.Get(address) - if ok { + // When having a single instance of the ledger, we can use the cached account. + // Otherwise, compute it every single time for now. + if !s.multipleInstance && ok { return account.(*core.AccountWithVolumes).Copy(), nil } @@ -88,7 +90,9 @@ func (s *Store) GetAccountWithVolumes(ctx context.Context, address string) (*cor } res.Balances = res.Volumes.Balances() - s.cache.Set(address, res.Copy(), cache.NoExpiration) + if !s.multipleInstance { + s.cache.Set(address, res.Copy(), cache.NoExpiration) + } return res, nil } diff --git a/pkg/storage/sqlstorage/driver.go b/pkg/storage/sqlstorage/driver.go index f5fe0458c..873e44479 100644 --- a/pkg/storage/sqlstorage/driver.go +++ b/pkg/storage/sqlstorage/driver.go @@ -104,6 +104,7 @@ type Driver struct { systemSchema Schema registeredLedgers map[string]*Store lock sync.Mutex + redisLockStrategy bool } func (d *Driver) GetSystemStore() storage.SystemStore { @@ -154,7 +155,7 @@ func (d *Driver) GetLedgerStore(ctx context.Context, name string, create bool) ( return nil, false, err } - ret = NewStore(schema, defaultExecutorProvider(schema), func(ctx context.Context) error { + ret = NewStore(schema, d.redisLockStrategy, defaultExecutorProvider(schema), func(ctx context.Context) error { d.lock.Lock() defer d.lock.Unlock() @@ -228,11 +229,12 @@ func (d *Driver) Close(ctx context.Context) error { return d.db.Close(ctx) } -func NewDriver(name string, db DB) *Driver { +func NewDriver(name string, db DB, redisLockStrategy bool) *Driver { return &Driver{ db: db, name: name, registeredLedgers: map[string]*Store{}, + redisLockStrategy: redisLockStrategy, } } diff --git a/pkg/storage/sqlstorage/driver_test.go b/pkg/storage/sqlstorage/driver_test.go index bc29a11d2..d3e19ead5 100644 --- a/pkg/storage/sqlstorage/driver_test.go +++ b/pkg/storage/sqlstorage/driver_test.go @@ -14,7 +14,7 @@ func TestNewDriver(t *testing.T) { d := NewDriver("sqlite", &sqliteDB{ directory: os.TempDir(), dbName: uuid.New(), - }) + }, false) assert.NoError(t, d.Initialize(context.Background())) @@ -39,7 +39,7 @@ func TestConfiguration(t *testing.T) { d := NewDriver("sqlite", &sqliteDB{ directory: os.TempDir(), dbName: uuid.New(), - }) + }, false) require.NoError(t, d.Initialize(context.Background())) require.NoError(t, d.GetSystemStore().InsertConfiguration(context.Background(), "foo", "bar")) diff --git a/pkg/storage/sqlstorage/logs.go b/pkg/storage/sqlstorage/logs.go index ebcecb809..02007027c 100644 --- a/pkg/storage/sqlstorage/logs.go +++ b/pkg/storage/sqlstorage/logs.go @@ -72,12 +72,16 @@ func (s *Store) appendLog(ctx context.Context, log ...core.Log) error { return s.error(err) } - s.lastLog = &log[len(log)-1] + if !s.multipleInstance { + s.lastLog = &log[len(log)-1] + } return nil } func (s *Store) GetLastLog(ctx context.Context) (*core.Log, error) { - if s.lastLog == nil { + // When having a single instance of the ledger, we can use the cached last log. + // Otherwise, compute it every single time for now. + if s.multipleInstance || s.lastLog == nil { sb := sqlbuilder.NewSelectBuilder() sb.From(s.schema.Table("log")) sb.Select("id", "type", "hash", "date", "data") @@ -107,7 +111,11 @@ func (s *Store) GetLastLog(ctx context.Context) (*core.Log, error) { } l.Date = l.Date.UTC() - s.lastLog = &l + if !s.multipleInstance { + s.lastLog = &l + } + + return &l, nil } return s.lastLog, nil } diff --git a/pkg/storage/sqlstorage/migrates/13-clean-logs/any_test.go b/pkg/storage/sqlstorage/migrates/13-clean-logs/any_test.go index 39b81fecb..7c7766537 100644 --- a/pkg/storage/sqlstorage/migrates/13-clean-logs/any_test.go +++ b/pkg/storage/sqlstorage/migrates/13-clean-logs/any_test.go @@ -15,76 +15,81 @@ import ( ) func TestMigrate(t *testing.T) { - driver, closeFunc, err := ledgertesting.StorageDriver() + driver, closeFunc, err := ledgertesting.StorageDriver(false) require.NoError(t, err) defer closeFunc() - require.NoError(t, driver.Initialize(context.Background())) - store, _, err := driver.GetLedgerStore(context.Background(), uuid.New(), true) + driverMultipleInstance, closeMultipleInstanceFunc, err := ledgertesting.StorageDriver(true) require.NoError(t, err) - - schema := store.Schema() - - migrations, err := sqlstorage.CollectMigrationFiles(sqlstorage.MigrationsFS) - require.NoError(t, err) - - modified, err := sqlstorage.Migrate(context.Background(), schema, migrations[0:13]...) - require.NoError(t, err) - require.True(t, modified) - - sqlq, args := sqlbuilder.NewInsertBuilder(). - InsertInto(schema.Table("log")). - Cols("id", "type", "hash", "date", "data"). - Values("0", core.NewTransactionType, "", time.Now(), `{ - "txid": 0, - "postings": [], - "reference": "tx1" - }`). - Values("1", core.NewTransactionType, "", time.Now(), `{ - "txid": 1, - "postings": [], - "preCommitVolumes": {}, - "postCommitVolumes": {}, - "reference": "tx2" - }`). - BuildWithFlavor(schema.Flavor()) - - _, err = schema.ExecContext(context.Background(), sqlq, args...) - require.NoError(t, err) - - modified, err = sqlstorage.Migrate(context.Background(), schema, migrations[13]) - require.NoError(t, err) - require.True(t, modified) - - sqlq, args = sqlbuilder.NewSelectBuilder(). - Select("data"). - From(schema.Table("log")). - BuildWithFlavor(schema.Flavor()) - - rows, err := schema.QueryContext(context.Background(), sqlq, args...) - require.NoError(t, err) - - require.True(t, rows.Next()) - var dataStr string - require.NoError(t, rows.Scan(&dataStr)) - - data := map[string]any{} - require.NoError(t, json.Unmarshal([]byte(dataStr), &data)) - - require.Equal(t, map[string]any{ - "txid": float64(0), - "postings": []interface{}{}, - "reference": "tx1", - }, data) - - require.True(t, rows.Next()) - require.NoError(t, rows.Scan(&dataStr)) - require.NoError(t, json.Unmarshal([]byte(dataStr), &data)) - - require.Equal(t, map[string]any{ - "txid": float64(1), - "postings": []interface{}{}, - "reference": "tx2", - }, data) - + defer closeMultipleInstanceFunc() + + for _, driver := range []*sqlstorage.Driver{driver, driverMultipleInstance} { + require.NoError(t, driver.Initialize(context.Background())) + store, _, err := driver.GetLedgerStore(context.Background(), uuid.New(), true) + require.NoError(t, err) + + schema := store.Schema() + + migrations, err := sqlstorage.CollectMigrationFiles(sqlstorage.MigrationsFS) + require.NoError(t, err) + + modified, err := sqlstorage.Migrate(context.Background(), schema, migrations[0:13]...) + require.NoError(t, err) + require.True(t, modified) + + sqlq, args := sqlbuilder.NewInsertBuilder(). + InsertInto(schema.Table("log")). + Cols("id", "type", "hash", "date", "data"). + Values("0", core.NewTransactionType, "", time.Now(), `{ + "txid": 0, + "postings": [], + "reference": "tx1" + }`). + Values("1", core.NewTransactionType, "", time.Now(), `{ + "txid": 1, + "postings": [], + "preCommitVolumes": {}, + "postCommitVolumes": {}, + "reference": "tx2" + }`). + BuildWithFlavor(schema.Flavor()) + + _, err = schema.ExecContext(context.Background(), sqlq, args...) + require.NoError(t, err) + + modified, err = sqlstorage.Migrate(context.Background(), schema, migrations[13]) + require.NoError(t, err) + require.True(t, modified) + + sqlq, args = sqlbuilder.NewSelectBuilder(). + Select("data"). + From(schema.Table("log")). + BuildWithFlavor(schema.Flavor()) + + rows, err := schema.QueryContext(context.Background(), sqlq, args...) + require.NoError(t, err) + + require.True(t, rows.Next()) + var dataStr string + require.NoError(t, rows.Scan(&dataStr)) + + data := map[string]any{} + require.NoError(t, json.Unmarshal([]byte(dataStr), &data)) + + require.Equal(t, map[string]any{ + "txid": float64(0), + "postings": []interface{}{}, + "reference": "tx1", + }, data) + + require.True(t, rows.Next()) + require.NoError(t, rows.Scan(&dataStr)) + require.NoError(t, json.Unmarshal([]byte(dataStr), &data)) + + require.Equal(t, map[string]any{ + "txid": float64(1), + "postings": []interface{}{}, + "reference": "tx2", + }, data) + } } diff --git a/pkg/storage/sqlstorage/migrates/17-optimized-segments/any_test.go b/pkg/storage/sqlstorage/migrates/17-optimized-segments/any_test.go index e124dcf89..9e3bdf66a 100644 --- a/pkg/storage/sqlstorage/migrates/17-optimized-segments/any_test.go +++ b/pkg/storage/sqlstorage/migrates/17-optimized-segments/any_test.go @@ -17,7 +17,7 @@ func TestMigrate17(t *testing.T) { t.Skip() } - driver, closeFunc, err := ledgertesting.StorageDriver() + driver, closeFunc, err := ledgertesting.StorageDriver(false) require.NoError(t, err) defer closeFunc() diff --git a/pkg/storage/sqlstorage/migrates/9-add-pre-post-volumes/any_test.go b/pkg/storage/sqlstorage/migrates/9-add-pre-post-volumes/any_test.go index 47dbf6988..e6cb6e690 100644 --- a/pkg/storage/sqlstorage/migrates/9-add-pre-post-volumes/any_test.go +++ b/pkg/storage/sqlstorage/migrates/9-add-pre-post-volumes/any_test.go @@ -215,7 +215,7 @@ var testCases = []testCase{ } func TestMigrate9(t *testing.T) { - driver, closeFunc, err := ledgertesting.StorageDriver() + driver, closeFunc, err := ledgertesting.StorageDriver(false) require.NoError(t, err) defer closeFunc() diff --git a/pkg/storage/sqlstorage/module.go b/pkg/storage/sqlstorage/module.go index ecbd03e6e..3b14e11cf 100644 --- a/pkg/storage/sqlstorage/module.go +++ b/pkg/storage/sqlstorage/module.go @@ -21,9 +21,10 @@ type PostgresConfig struct { } type ModuleConfig struct { - StorageDriver string - SQLiteConfig *SQLiteConfig - PostgresConfig *PostgresConfig + StorageDriver string + SQLiteConfig *SQLiteConfig + PostgresConfig *PostgresConfig + RedisLockStrategy bool } func OpenSQLDB(flavor Flavor, dataSourceName string) (*sql.DB, error) { @@ -49,7 +50,7 @@ func DriverModule(cfg ModuleConfig) fx.Option { return NewPostgresDB(db) })) options = append(options, fx.Provide(func(db DB) (*Driver, error) { - return NewDriver(PostgreSQL.String(), db), nil + return NewDriver(PostgreSQL.String(), db, cfg.RedisLockStrategy), nil })) options = append(options, health.ProvideHealthCheck(func(db *sql.DB) health.NamedCheck { return health.NewNamedCheck(PostgreSQL.String(), health.CheckFn(db.PingContext)) @@ -59,7 +60,7 @@ func DriverModule(cfg ModuleConfig) fx.Option { return NewSQLiteDB(cfg.SQLiteConfig.Dir, cfg.SQLiteConfig.DBName) })) options = append(options, fx.Provide(func(db DB) (*Driver, error) { - return NewDriver(SQLite.String(), db), nil + return NewDriver(SQLite.String(), db, cfg.RedisLockStrategy), nil })) options = append(options, health.ProvideHealthCheck(func() health.NamedCheck { return health.NewNamedCheck(SQLite.String(), health.CheckFn(func(ctx context.Context) error { diff --git a/pkg/storage/sqlstorage/store_ledger.go b/pkg/storage/sqlstorage/store_ledger.go index 0112654d4..7e2ac9515 100644 --- a/pkg/storage/sqlstorage/store_ledger.go +++ b/pkg/storage/sqlstorage/store_ledger.go @@ -24,6 +24,7 @@ type Store struct { lastLog *core.Log lastTx *core.ExpandedTransaction cache *cache.Cache + multipleInstance bool } func (s *Store) error(err error) error { @@ -63,7 +64,7 @@ func (s *Store) Close(ctx context.Context) error { return s.onClose(ctx) } -func NewStore(schema Schema, executorProvider func(ctx context.Context) (executor, error), +func NewStore(schema Schema, multipleInstance bool, executorProvider func(ctx context.Context) (executor, error), onClose, onDelete func(ctx context.Context) error) *Store { return &Store{ @@ -71,6 +72,7 @@ func NewStore(schema Schema, executorProvider func(ctx context.Context) (executo schema: schema, onClose: onClose, onDelete: onDelete, + multipleInstance: multipleInstance, cache: cache.New(5*time.Minute, 10*time.Minute), } } diff --git a/pkg/storage/sqlstorage/store_ledger_test.go b/pkg/storage/sqlstorage/store_ledger_test.go index 269d832e4..e6e440d83 100644 --- a/pkg/storage/sqlstorage/store_ledger_test.go +++ b/pkg/storage/sqlstorage/store_ledger_test.go @@ -728,7 +728,7 @@ func testTooManyClient(t *testing.T, store *sqlstorage.Store) { } func TestInitializeStore(t *testing.T) { - driver, stopFn, err := ledgertesting.StorageDriver() + driver, stopFn, err := ledgertesting.StorageDriver(false) require.NoError(t, err) defer stopFn() defer func(driver storage.Driver[*sqlstorage.Store], ctx context.Context) { diff --git a/pkg/storage/sqlstorage/transactions.go b/pkg/storage/sqlstorage/transactions.go index 386e8d0f5..337187e43 100644 --- a/pkg/storage/sqlstorage/transactions.go +++ b/pkg/storage/sqlstorage/transactions.go @@ -278,7 +278,9 @@ func (s *Store) GetTransaction(ctx context.Context, txId uint64) (*core.Expanded } func (s *Store) GetLastTransaction(ctx context.Context) (*core.ExpandedTransaction, error) { - if s.lastTx == nil { + // When having a single instance of the ledger, we can use the cached last transaction. + // Otherwise, compute it every single time for now. + if s.multipleInstance || s.lastTx == nil { sb := sqlbuilder.NewSelectBuilder() sb.Select("id", "timestamp", "reference", "metadata", "postings", "pre_commit_volumes", "post_commit_volumes") sb.From(s.schema.Table("transactions")) @@ -325,7 +327,11 @@ func (s *Store) GetLastTransaction(ctx context.Context) (*core.ExpandedTransacti tx.Timestamp = tx.Timestamp.UTC() tx.Reference = ref.String - s.lastTx = &tx + if !s.multipleInstance { + s.lastTx = &tx + } + + return &tx, nil } return s.lastTx, nil @@ -491,7 +497,9 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran return s.error(err) } - s.lastTx = &txs[len(txs)-1] + if !s.multipleInstance { + s.lastTx = &txs[len(txs)-1] + } return nil } @@ -531,7 +539,7 @@ func (s *Store) UpdateTransactionMetadata(ctx context.Context, id uint64, metada return errors.Wrap(err, "reading last log") } - if s.lastTx.ID == id { + if !s.multipleInstance && s.lastTx.ID == id { if s.lastTx.Metadata == nil { s.lastTx.Metadata = metadata } else { diff --git a/pkg/storage/sqlstorage/volumes.go b/pkg/storage/sqlstorage/volumes.go index 5f9302c64..6f2fda30d 100644 --- a/pkg/storage/sqlstorage/volumes.go +++ b/pkg/storage/sqlstorage/volumes.go @@ -12,18 +12,20 @@ import ( func (s *Store) updateVolumes(ctx context.Context, volumes core.AccountsAssetsVolumes) error { - storage.OnTransactionCommitted(ctx, func() { - for address, accountVolumes := range volumes { - entry, ok := s.cache.Get(address) - if ok { - account := entry.(*core.AccountWithVolumes) - for asset, volumes := range accountVolumes { - account.Volumes[asset] = volumes - account.Balances[asset] = volumes.Balance() + if !s.multipleInstance { + storage.OnTransactionCommitted(ctx, func() { + for address, accountVolumes := range volumes { + entry, ok := s.cache.Get(address) + if ok { + account := entry.(*core.AccountWithVolumes) + for asset, volumes := range accountVolumes { + account.Volumes[asset] = volumes + account.Balances[asset] = volumes.Balance() + } } } - } - }) + }) + } for address, accountVolumes := range volumes {