Skip to content

Commit

Permalink
fix(ledger): do not use caches when using multiple instances (#467)
Browse files Browse the repository at this point in the history
* fix(ledger): do not use caches when using multiple instances
  • Loading branch information
paul-nicolas authored Oct 3, 2023
1 parent 8285d25 commit 644091f
Show file tree
Hide file tree
Showing 19 changed files with 172 additions and 108 deletions.
3 changes: 3 additions & 0 deletions cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func resolveOptions(v *viper.Viper, userOptions ...fx.Option) []fx.Option {
// Handle OpenTelemetry
options = append(options, otlptraces.CLITracesModule(v))

redisLockStrategy := false
switch v.GetString(lockStrategyFlag) {
case "redis":
var tlsConfig *tls.Config
Expand All @@ -58,6 +59,7 @@ func resolveOptions(v *viper.Viper, userOptions ...fx.Option) []fx.Option {
LockRetry: v.GetDuration(lockStrategyRedisRetryFlag),
TLSConfig: tlsConfig,
}))
redisLockStrategy = true
}

// Handle api part
Expand Down Expand Up @@ -87,6 +89,7 @@ func resolveOptions(v *viper.Viper, userOptions ...fx.Option) []fx.Option {
ConnString: v.GetString(storagePostgresConnectionStringFlag),
}
}(),
RedisLockStrategy: redisLockStrategy,
}))

options = append(options, internal.NewAnalyticsModule(v, Version))
Expand Down
2 changes: 1 addition & 1 deletion cmd/internal/analytics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestAnalyticsModule(t *testing.T) {
module,
fx.Provide(func(lc fx.Lifecycle) (storage.Driver[ledger.Store], error) {
id := uuid.New()
driver := sqlstorage.NewDriver("sqlite", sqlstorage.NewSQLiteDB(os.TempDir(), id))
driver := sqlstorage.NewDriver("sqlite", sqlstorage.NewSQLiteDB(os.TempDir(), id), false)
lc.Append(fx.Hook{
OnStart: driver.Initialize,
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/analytics/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var (
}),
fx.Provide(func(lc fx.Lifecycle) (storage.Driver[ledger.Store], error) {
id := uuid.New()
driver := sqlstorage.NewDriver("sqlite", sqlstorage.NewSQLiteDB(os.TempDir(), id))
driver := sqlstorage.NewDriver("sqlite", sqlstorage.NewSQLiteDB(os.TempDir(), id), false)
lc.Append(fx.Hook{
OnStart: driver.Initialize,
})
Expand Down
1 change: 1 addition & 0 deletions pkg/ledger/execute_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (l *Ledger) ExecuteScript(ctx context.Context, preview bool, script core.Sc
"get transactions with reference")
}
if len(txs.Data) > 0 {

return core.ExpandedTransaction{}, NewConflictError()
}
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/ledgertesting/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ func StorageDriverName() string {
return "sqlite"
}

func StorageDriver() (*sqlstorage.Driver, func(), error) {
func StorageDriver(multipleInstance bool) (*sqlstorage.Driver, func(), error) {
switch StorageDriverName() {
case "sqlite":
id := uuid.New()
return sqlstorage.NewDriver("sqlite", sqlstorage.NewSQLiteDB(os.TempDir(), id)), func() {}, nil
return sqlstorage.NewDriver("sqlite", sqlstorage.NewSQLiteDB(os.TempDir(), id), multipleInstance), func() {}, nil
case "postgres":
pgServer, err := pgtesting.PostgresServer()
if err != nil {
Expand All @@ -39,6 +39,7 @@ func StorageDriver() (*sqlstorage.Driver, func(), error) {
return sqlstorage.NewDriver(
"postgres",
sqlstorage.NewPostgresDB(db),
multipleInstance,
), func() {
_ = pgServer.Close()
}, nil
Expand All @@ -48,7 +49,7 @@ func StorageDriver() (*sqlstorage.Driver, func(), error) {

func ProvideStorageDriver() fx.Option {
return fx.Provide(func(lc fx.Lifecycle) (*sqlstorage.Driver, error) {
driver, stopFn, err := StorageDriver()
driver, stopFn, err := StorageDriver(false)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/sqlstorage/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,9 @@ func (s *Store) GetAccounts(ctx context.Context, q ledger.AccountsQuery) (api.Cu
func (s *Store) GetAccount(ctx context.Context, addr string) (*core.Account, error) {

entry, ok := s.cache.Get(addr)
if ok {
// When having a single instance of the ledger, we can use the cached account.
// Otherwise, compute it every single time for now.
if !s.multipleInstance && ok {
return entry.(*core.AccountWithVolumes).Account.Copy(), nil
}

Expand Down Expand Up @@ -278,7 +280,7 @@ func (s *Store) ensureAccountExists(ctx context.Context, account string) error {
func (s *Store) UpdateAccountMetadata(ctx context.Context, address string, metadata core.Metadata, at time.Time) error {

entry, ok := s.cache.Get(address)
if ok {
if !s.multipleInstance && ok {
account := entry.(*core.AccountWithVolumes)
account.Metadata = account.Metadata.Merge(metadata)
}
Expand Down
27 changes: 26 additions & 1 deletion pkg/storage/sqlstorage/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestAccounts(t *testing.T) {
d := NewDriver("sqlite", &sqliteDB{
directory: os.TempDir(),
dbName: uuid.New(),
})
}, false)

assert.NoError(t, d.Initialize(context.Background()))

Expand All @@ -28,6 +28,31 @@ func TestAccounts(t *testing.T) {
_, err = store.Initialize(context.Background())
assert.NoError(t, err)

accountTests(t, store)
}

func TestAccountsMultipleInstance(t *testing.T) {
d := NewDriver("sqlite", &sqliteDB{
directory: os.TempDir(),
dbName: uuid.New(),
}, true)

assert.NoError(t, d.Initialize(context.Background()))

defer func(d *Driver, ctx context.Context) {
assert.NoError(t, d.Close(ctx))
}(d, context.Background())

store, _, err := d.GetLedgerStore(context.Background(), "foo", true)
assert.NoError(t, err)

_, err = store.Initialize(context.Background())
assert.NoError(t, err)

accountTests(t, store)
}

func accountTests(t *testing.T, store *Store) {
t.Run("success balance", func(t *testing.T) {
q := ledger.AccountsQuery{
PageSize: 10,
Expand Down
8 changes: 6 additions & 2 deletions pkg/storage/sqlstorage/aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (

func (s *Store) GetAccountWithVolumes(ctx context.Context, address string) (*core.AccountWithVolumes, error) {
account, ok := s.cache.Get(address)
if ok {
// When having a single instance of the ledger, we can use the cached account.
// Otherwise, compute it every single time for now.
if !s.multipleInstance && ok {
return account.(*core.AccountWithVolumes).Copy(), nil
}

Expand Down Expand Up @@ -88,7 +90,9 @@ func (s *Store) GetAccountWithVolumes(ctx context.Context, address string) (*cor
}
res.Balances = res.Volumes.Balances()

s.cache.Set(address, res.Copy(), cache.NoExpiration)
if !s.multipleInstance {
s.cache.Set(address, res.Copy(), cache.NoExpiration)
}

return res, nil
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/sqlstorage/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ type Driver struct {
systemSchema Schema
registeredLedgers map[string]*Store
lock sync.Mutex
redisLockStrategy bool
}

func (d *Driver) GetSystemStore() storage.SystemStore {
Expand Down Expand Up @@ -154,7 +155,7 @@ func (d *Driver) GetLedgerStore(ctx context.Context, name string, create bool) (
return nil, false, err
}

ret = NewStore(schema, defaultExecutorProvider(schema), func(ctx context.Context) error {
ret = NewStore(schema, d.redisLockStrategy, defaultExecutorProvider(schema), func(ctx context.Context) error {
d.lock.Lock()
defer d.lock.Unlock()

Expand Down Expand Up @@ -228,11 +229,12 @@ func (d *Driver) Close(ctx context.Context) error {
return d.db.Close(ctx)
}

func NewDriver(name string, db DB) *Driver {
func NewDriver(name string, db DB, redisLockStrategy bool) *Driver {
return &Driver{
db: db,
name: name,
registeredLedgers: map[string]*Store{},
redisLockStrategy: redisLockStrategy,
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/sqlstorage/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func TestNewDriver(t *testing.T) {
d := NewDriver("sqlite", &sqliteDB{
directory: os.TempDir(),
dbName: uuid.New(),
})
}, false)

assert.NoError(t, d.Initialize(context.Background()))

Expand All @@ -39,7 +39,7 @@ func TestConfiguration(t *testing.T) {
d := NewDriver("sqlite", &sqliteDB{
directory: os.TempDir(),
dbName: uuid.New(),
})
}, false)
require.NoError(t, d.Initialize(context.Background()))

require.NoError(t, d.GetSystemStore().InsertConfiguration(context.Background(), "foo", "bar"))
Expand Down
14 changes: 11 additions & 3 deletions pkg/storage/sqlstorage/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,16 @@ func (s *Store) appendLog(ctx context.Context, log ...core.Log) error {
return s.error(err)
}

s.lastLog = &log[len(log)-1]
if !s.multipleInstance {
s.lastLog = &log[len(log)-1]
}
return nil
}

func (s *Store) GetLastLog(ctx context.Context) (*core.Log, error) {
if s.lastLog == nil {
// When having a single instance of the ledger, we can use the cached last log.
// Otherwise, compute it every single time for now.
if s.multipleInstance || s.lastLog == nil {
sb := sqlbuilder.NewSelectBuilder()
sb.From(s.schema.Table("log"))
sb.Select("id", "type", "hash", "date", "data")
Expand Down Expand Up @@ -107,7 +111,11 @@ func (s *Store) GetLastLog(ctx context.Context) (*core.Log, error) {
}
l.Date = l.Date.UTC()

s.lastLog = &l
if !s.multipleInstance {
s.lastLog = &l
}

return &l, nil
}
return s.lastLog, nil
}
Expand Down
141 changes: 73 additions & 68 deletions pkg/storage/sqlstorage/migrates/13-clean-logs/any_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,76 +15,81 @@ import (
)

func TestMigrate(t *testing.T) {
driver, closeFunc, err := ledgertesting.StorageDriver()
driver, closeFunc, err := ledgertesting.StorageDriver(false)
require.NoError(t, err)
defer closeFunc()

require.NoError(t, driver.Initialize(context.Background()))
store, _, err := driver.GetLedgerStore(context.Background(), uuid.New(), true)
driverMultipleInstance, closeMultipleInstanceFunc, err := ledgertesting.StorageDriver(true)
require.NoError(t, err)

schema := store.Schema()

migrations, err := sqlstorage.CollectMigrationFiles(sqlstorage.MigrationsFS)
require.NoError(t, err)

modified, err := sqlstorage.Migrate(context.Background(), schema, migrations[0:13]...)
require.NoError(t, err)
require.True(t, modified)

sqlq, args := sqlbuilder.NewInsertBuilder().
InsertInto(schema.Table("log")).
Cols("id", "type", "hash", "date", "data").
Values("0", core.NewTransactionType, "", time.Now(), `{
"txid": 0,
"postings": [],
"reference": "tx1"
}`).
Values("1", core.NewTransactionType, "", time.Now(), `{
"txid": 1,
"postings": [],
"preCommitVolumes": {},
"postCommitVolumes": {},
"reference": "tx2"
}`).
BuildWithFlavor(schema.Flavor())

_, err = schema.ExecContext(context.Background(), sqlq, args...)
require.NoError(t, err)

modified, err = sqlstorage.Migrate(context.Background(), schema, migrations[13])
require.NoError(t, err)
require.True(t, modified)

sqlq, args = sqlbuilder.NewSelectBuilder().
Select("data").
From(schema.Table("log")).
BuildWithFlavor(schema.Flavor())

rows, err := schema.QueryContext(context.Background(), sqlq, args...)
require.NoError(t, err)

require.True(t, rows.Next())
var dataStr string
require.NoError(t, rows.Scan(&dataStr))

data := map[string]any{}
require.NoError(t, json.Unmarshal([]byte(dataStr), &data))

require.Equal(t, map[string]any{
"txid": float64(0),
"postings": []interface{}{},
"reference": "tx1",
}, data)

require.True(t, rows.Next())
require.NoError(t, rows.Scan(&dataStr))
require.NoError(t, json.Unmarshal([]byte(dataStr), &data))

require.Equal(t, map[string]any{
"txid": float64(1),
"postings": []interface{}{},
"reference": "tx2",
}, data)

defer closeMultipleInstanceFunc()

for _, driver := range []*sqlstorage.Driver{driver, driverMultipleInstance} {
require.NoError(t, driver.Initialize(context.Background()))
store, _, err := driver.GetLedgerStore(context.Background(), uuid.New(), true)
require.NoError(t, err)

schema := store.Schema()

migrations, err := sqlstorage.CollectMigrationFiles(sqlstorage.MigrationsFS)
require.NoError(t, err)

modified, err := sqlstorage.Migrate(context.Background(), schema, migrations[0:13]...)
require.NoError(t, err)
require.True(t, modified)

sqlq, args := sqlbuilder.NewInsertBuilder().
InsertInto(schema.Table("log")).
Cols("id", "type", "hash", "date", "data").
Values("0", core.NewTransactionType, "", time.Now(), `{
"txid": 0,
"postings": [],
"reference": "tx1"
}`).
Values("1", core.NewTransactionType, "", time.Now(), `{
"txid": 1,
"postings": [],
"preCommitVolumes": {},
"postCommitVolumes": {},
"reference": "tx2"
}`).
BuildWithFlavor(schema.Flavor())

_, err = schema.ExecContext(context.Background(), sqlq, args...)
require.NoError(t, err)

modified, err = sqlstorage.Migrate(context.Background(), schema, migrations[13])
require.NoError(t, err)
require.True(t, modified)

sqlq, args = sqlbuilder.NewSelectBuilder().
Select("data").
From(schema.Table("log")).
BuildWithFlavor(schema.Flavor())

rows, err := schema.QueryContext(context.Background(), sqlq, args...)
require.NoError(t, err)

require.True(t, rows.Next())
var dataStr string
require.NoError(t, rows.Scan(&dataStr))

data := map[string]any{}
require.NoError(t, json.Unmarshal([]byte(dataStr), &data))

require.Equal(t, map[string]any{
"txid": float64(0),
"postings": []interface{}{},
"reference": "tx1",
}, data)

require.True(t, rows.Next())
require.NoError(t, rows.Scan(&dataStr))
require.NoError(t, json.Unmarshal([]byte(dataStr), &data))

require.Equal(t, map[string]any{
"txid": float64(1),
"postings": []interface{}{},
"reference": "tx2",
}, data)
}
}
Loading

0 comments on commit 644091f

Please sign in to comment.