From 485cba5afb17a5ce031b61018e1d3653e4c3e17c Mon Sep 17 00:00:00 2001 From: Ragot Geoffrey Date: Wed, 22 Mar 2023 14:51:15 +0100 Subject: [PATCH] feat: make cache works in parallel (#170) --- pkg/core/account.go | 2 +- pkg/ledger/cache/cache.go | 72 +++++++++++----- pkg/ledger/cache/cache_test.go | 152 +++++++++++++++++++++++++-------- pkg/ledger/cache/main_test.go | 21 +++++ pkg/ledger/ledger.go | 2 +- pkg/ledger/runner/runner.go | 2 +- 6 files changed, 191 insertions(+), 60 deletions(-) create mode 100644 pkg/ledger/cache/main_test.go diff --git a/pkg/core/account.go b/pkg/core/account.go index c3979e3be..35fd553d1 100644 --- a/pkg/core/account.go +++ b/pkg/core/account.go @@ -36,7 +36,7 @@ func (v AccountWithVolumes) MarshalJSON() ([]byte, error) { type aux AccountWithVolumes return json.Marshal(struct { aux - Balances AssetsBalances `json:"balanes"` + Balances AssetsBalances `json:"balances"` }{ aux: aux(v), Balances: v.Volumes.Balances(), diff --git a/pkg/ledger/cache/cache.go b/pkg/ledger/cache/cache.go index 3c2545670..3ae97ded6 100644 --- a/pkg/ledger/cache/cache.go +++ b/pkg/ledger/cache/cache.go @@ -3,12 +3,18 @@ package cache import ( "context" "strings" + "sync" "github.com/bluele/gcache" "github.com/formancehq/ledger/pkg/core" "github.com/formancehq/ledger/pkg/storage" ) +type cacheEntry struct { + sync.Mutex + account *core.AccountWithVolumes +} + type Cache struct { cache gcache.Cache store storage.LedgerStore @@ -18,7 +24,7 @@ func (c *Cache) GetAccountWithVolumes(ctx context.Context, address string) (*cor address = strings.TrimPrefix(address, "@") - rawAccount, err := c.cache.Get(address) + entry, err := c.cache.Get(address) if err != nil { // TODO: Rename later ? account, err := c.store.ComputeAccount(ctx, address) @@ -26,40 +32,64 @@ func (c *Cache) GetAccountWithVolumes(ctx context.Context, address string) (*cor return nil, err } - if err := c.cache.Set(account.Address, account); err != nil { + ce := &cacheEntry{ + account: account, + } + + if err := c.cache.Set(account.Address, ce); err != nil { panic(err) } *account = account.Copy() return account, nil } - cp := rawAccount.(*core.AccountWithVolumes).Copy() + cp := entry.(*cacheEntry).account.Copy() return &cp, nil } -func (c *Cache) Update(accounts core.AccountsAssetsVolumes) { - for address, volumes := range accounts { - rawAccount, err := c.cache.Get(address) - if err != nil { - // Cannot update cache, item maybe evicted - continue - } - account := rawAccount.(*core.AccountWithVolumes) - account.Volumes = volumes - if err := c.cache.Set(address, account); err != nil { - panic(err) - } +func (c *Cache) withLockOnAccount(address string, callback func(account *core.AccountWithVolumes)) { + item, err := c.cache.Get(address) + if err != nil { + return } + entry := item.(*cacheEntry) + entry.Lock() + defer entry.Unlock() + + callback(entry.account) } -func (c *Cache) UpdateAccountMetadata(ctx context.Context, address string, m core.Metadata) error { - account, err := c.GetAccountWithVolumes(ctx, address) - if err != nil { - return err +func (c *Cache) addOutput(address, asset string, amount *core.MonetaryInt) { + c.withLockOnAccount(address, func(account *core.AccountWithVolumes) { + volumes := account.Volumes[asset] + volumes.Output = volumes.Output.OrZero().Add(amount) + volumes.Input = volumes.Input.OrZero() + account.Volumes[asset] = volumes + }) +} + +func (c *Cache) addInput(address, asset string, amount *core.MonetaryInt) { + c.withLockOnAccount(address, func(account *core.AccountWithVolumes) { + volumes := account.Volumes[asset] + volumes.Input = volumes.Input.OrZero().Add(amount) + volumes.Output = volumes.Output.OrZero() + account.Volumes[asset] = volumes + }) +} + +func (c *Cache) UpdateVolumeWithTX(tx core.Transaction) { + for _, posting := range tx.Postings { + c.addOutput(posting.Source, posting.Asset, posting.Amount) + c.addInput(posting.Destination, posting.Asset, posting.Amount) } - account.Metadata = account.Metadata.Merge(m) - _ = c.cache.Set(address, account) +} + +func (c *Cache) UpdateAccountMetadata(address string, m core.Metadata) error { + c.withLockOnAccount(address, func(account *core.AccountWithVolumes) { + account.Metadata = account.Metadata.Merge(m) + }) + return nil } diff --git a/pkg/ledger/cache/cache_test.go b/pkg/ledger/cache/cache_test.go index 46842e182..cbce3f989 100644 --- a/pkg/ledger/cache/cache_test.go +++ b/pkg/ledger/cache/cache_test.go @@ -6,16 +6,11 @@ import ( "github.com/formancehq/ledger/pkg/core" "github.com/formancehq/ledger/pkg/ledgertesting" - "github.com/formancehq/stack/libs/go-libs/pgtesting" "github.com/google/uuid" "github.com/stretchr/testify/require" ) -func TestCache(t *testing.T) { - require.NoError(t, pgtesting.CreatePostgresServer()) - defer func() { - require.NoError(t, pgtesting.DestroyPostgresServer()) - }() +func TestComputeAccountFromLogs(t *testing.T) { driver := ledgertesting.StorageDriver(t) @@ -41,27 +36,6 @@ func TestCache(t *testing.T) { }, }, })) - - account, err := cache.GetAccountWithVolumes(context.Background(), "world") - require.NoError(t, err) - require.NotNil(t, account) - require.Equal(t, core.AccountWithVolumes{ - Account: core.Account{ - Address: "world", - Metadata: core.Metadata{}, - }, - Volumes: map[string]core.Volumes{ - "USD/2": { - Input: core.NewMonetaryInt(100), - Output: core.NewMonetaryInt(0), - }, - }, - }, *account) - - volumes := account.Volumes["USD/2"] - volumes.Output = account.Volumes["USD/2"].Output.Add(core.NewMonetaryInt(10)) - account.Volumes["USD/2"] = volumes - log := core.NewTransactionLog(core.Transaction{ TransactionData: core.TransactionData{ Postings: []core.Posting{{ @@ -72,10 +46,8 @@ func TestCache(t *testing.T) { }}, }, }, nil) - require.NoError(t, ledgerStore.AppendLog( - context.Background(), - &log, - )) + require.NoError(t, ledgerStore.AppendLog(context.Background(), &log)) + log2 := core.NewSetMetadataLog(core.Now(), core.SetMetadataLogPayload{ TargetType: core.MetaTargetTypeAccount, TargetID: "bank", @@ -83,12 +55,9 @@ func TestCache(t *testing.T) { "category": "gold", }, }) - require.NoError(t, ledgerStore.AppendLog( - context.Background(), - &log2, - )) + require.NoError(t, ledgerStore.AppendLog(context.Background(), &log2)) - account, err = cache.GetAccountWithVolumes(context.Background(), "bank") + account, err := cache.GetAccountWithVolumes(context.Background(), "bank") require.NoError(t, err) require.NotNil(t, account) @@ -108,3 +77,114 @@ func TestCache(t *testing.T) { }, *account) } + +func TestRetrieveValueFromCache(t *testing.T) { + + driver := ledgertesting.StorageDriver(t) + + require.NoError(t, driver.Initialize(context.Background())) + + manager := NewManager(driver) + ledger := uuid.NewString() + cache, err := manager.ForLedger(context.Background(), ledger) + require.NoError(t, err) + + ledgerStore, _, err := driver.GetLedgerStore(context.Background(), ledger, true) + require.NoError(t, err) + + _, err = ledgerStore.Initialize(context.Background()) + require.NoError(t, err) + + require.NoError(t, ledgerStore.EnsureAccountExists(context.Background(), "world")) + account, err := cache.GetAccountWithVolumes(context.Background(), "world") + require.NoError(t, err) + require.Equal(t, core.AccountWithVolumes{ + Account: core.Account{ + Address: "world", + Metadata: core.Metadata{}, + }, + Volumes: map[string]core.Volumes{}, + }, *account) + + cache.UpdateVolumeWithTX(core.NewTransaction().WithPostings( + core.NewPosting("world", "bank", "USD", core.NewMonetaryInt(100)), + )) + account, err = cache.GetAccountWithVolumes(context.Background(), "world") + require.NoError(t, err) + require.EqualValues(t, core.AccountWithVolumes{ + Account: core.Account{ + Address: "world", + Metadata: core.Metadata{}, + }, + Volumes: map[string]core.Volumes{ + "USD": core.NewEmptyVolumes().WithOutput(core.NewMonetaryInt(100)), + }, + }, *account) +} + +func TestUpdateVolumes(t *testing.T) { + + driver := ledgertesting.StorageDriver(t) + + require.NoError(t, driver.Initialize(context.Background())) + + manager := NewManager(driver) + ledger := uuid.NewString() + cache, err := manager.ForLedger(context.Background(), ledger) + require.NoError(t, err) + + ledgerStore, _, err := driver.GetLedgerStore(context.Background(), ledger, true) + require.NoError(t, err) + + _, err = ledgerStore.Initialize(context.Background()) + require.NoError(t, err) + + require.NoError(t, ledgerStore.EnsureAccountExists(context.Background(), "world")) + worldAccount, err := cache.GetAccountWithVolumes(context.Background(), "world") + require.NoError(t, err) + require.Equal(t, core.AccountWithVolumes{ + Account: core.Account{ + Address: "world", + Metadata: core.Metadata{}, + }, + Volumes: map[string]core.Volumes{}, + }, *worldAccount) + + require.NoError(t, ledgerStore.EnsureAccountExists(context.Background(), "bank")) + bankAccount, err := cache.GetAccountWithVolumes(context.Background(), "bank") + require.NoError(t, err) + require.Equal(t, core.AccountWithVolumes{ + Account: core.Account{ + Address: "bank", + Metadata: core.Metadata{}, + }, + Volumes: map[string]core.Volumes{}, + }, *bankAccount) + + cache.UpdateVolumeWithTX(core.NewTransaction().WithPostings( + core.NewPosting("world", "bank", "USD", core.NewMonetaryInt(100)), + )) + worldAccount, err = cache.GetAccountWithVolumes(context.Background(), "world") + require.NoError(t, err) + require.EqualValues(t, core.AccountWithVolumes{ + Account: core.Account{ + Address: "world", + Metadata: core.Metadata{}, + }, + Volumes: map[string]core.Volumes{ + "USD": core.NewEmptyVolumes().WithOutput(core.NewMonetaryInt(100)), + }, + }, *worldAccount) + + worldAccount, err = cache.GetAccountWithVolumes(context.Background(), "bank") + require.NoError(t, err) + require.Equal(t, core.AccountWithVolumes{ + Account: core.Account{ + Address: "bank", + Metadata: core.Metadata{}, + }, + Volumes: map[string]core.Volumes{ + "USD": core.NewEmptyVolumes().WithInput(core.NewMonetaryInt(100)), + }, + }, *worldAccount) +} diff --git a/pkg/ledger/cache/main_test.go b/pkg/ledger/cache/main_test.go new file mode 100644 index 000000000..1e4958ea7 --- /dev/null +++ b/pkg/ledger/cache/main_test.go @@ -0,0 +1,21 @@ +package cache + +import ( + "os" + "testing" + + "github.com/formancehq/stack/libs/go-libs/logging" + "github.com/formancehq/stack/libs/go-libs/pgtesting" +) + +func TestMain(t *testing.M) { + if err := pgtesting.CreatePostgresServer(); err != nil { + logging.Error(err) + os.Exit(1) + } + code := t.Run() + if err := pgtesting.DestroyPostgresServer(); err != nil { + logging.Error("error stopping pg server:", err) + } + os.Exit(code) +} diff --git a/pkg/ledger/ledger.go b/pkg/ledger/ledger.go index b84f76fd9..971302085 100644 --- a/pkg/ledger/ledger.go +++ b/pkg/ledger/ledger.go @@ -167,7 +167,7 @@ func (l *Ledger) SaveMeta(ctx context.Context, targetType string, targetID inter } defer unlock(context.Background()) - err = l.dbCache.UpdateAccountMetadata(ctx, targetID.(string), m) + err = l.dbCache.UpdateAccountMetadata(targetID.(string), m) if err != nil { return err } diff --git a/pkg/ledger/runner/runner.go b/pkg/ledger/runner/runner.go index 78e2d3abb..d26033090 100644 --- a/pkg/ledger/runner/runner.go +++ b/pkg/ledger/runner/runner.go @@ -185,7 +185,7 @@ func (r *Runner) execute(ctx context.Context, script core.RunScript, dryRun bool return expandedTx, result.AccountMetadata, nil } - r.cache.Update(expandedTx.PostCommitVolumes) + r.cache.UpdateVolumeWithTX(expandedTx.Transaction) return expandedTx, result.AccountMetadata, nil }