Skip to content

Commit

Permalink
feat: make cache works in parallel (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored and flemzord committed May 12, 2023
1 parent 2094645 commit 485cba5
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 60 deletions.
2 changes: 1 addition & 1 deletion pkg/core/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (v AccountWithVolumes) MarshalJSON() ([]byte, error) {
type aux AccountWithVolumes
return json.Marshal(struct {
aux
Balances AssetsBalances `json:"balanes"`
Balances AssetsBalances `json:"balances"`
}{
aux: aux(v),
Balances: v.Volumes.Balances(),
Expand Down
72 changes: 51 additions & 21 deletions pkg/ledger/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@ package cache
import (
"context"
"strings"
"sync"

"github.com/bluele/gcache"
"github.com/formancehq/ledger/pkg/core"
"github.com/formancehq/ledger/pkg/storage"
)

type cacheEntry struct {
sync.Mutex
account *core.AccountWithVolumes
}

type Cache struct {
cache gcache.Cache
store storage.LedgerStore
Expand All @@ -18,48 +24,72 @@ func (c *Cache) GetAccountWithVolumes(ctx context.Context, address string) (*cor

address = strings.TrimPrefix(address, "@")

rawAccount, err := c.cache.Get(address)
entry, err := c.cache.Get(address)
if err != nil {
// TODO: Rename later ?
account, err := c.store.ComputeAccount(ctx, address)
if err != nil {
return nil, err
}

if err := c.cache.Set(account.Address, account); err != nil {
ce := &cacheEntry{
account: account,
}

if err := c.cache.Set(account.Address, ce); err != nil {
panic(err)
}

*account = account.Copy()
return account, nil
}
cp := rawAccount.(*core.AccountWithVolumes).Copy()
cp := entry.(*cacheEntry).account.Copy()

return &cp, nil
}

func (c *Cache) Update(accounts core.AccountsAssetsVolumes) {
for address, volumes := range accounts {
rawAccount, err := c.cache.Get(address)
if err != nil {
// Cannot update cache, item maybe evicted
continue
}
account := rawAccount.(*core.AccountWithVolumes)
account.Volumes = volumes
if err := c.cache.Set(address, account); err != nil {
panic(err)
}
func (c *Cache) withLockOnAccount(address string, callback func(account *core.AccountWithVolumes)) {
item, err := c.cache.Get(address)
if err != nil {
return
}
entry := item.(*cacheEntry)
entry.Lock()
defer entry.Unlock()

callback(entry.account)
}

func (c *Cache) UpdateAccountMetadata(ctx context.Context, address string, m core.Metadata) error {
account, err := c.GetAccountWithVolumes(ctx, address)
if err != nil {
return err
func (c *Cache) addOutput(address, asset string, amount *core.MonetaryInt) {
c.withLockOnAccount(address, func(account *core.AccountWithVolumes) {
volumes := account.Volumes[asset]
volumes.Output = volumes.Output.OrZero().Add(amount)
volumes.Input = volumes.Input.OrZero()
account.Volumes[asset] = volumes
})
}

func (c *Cache) addInput(address, asset string, amount *core.MonetaryInt) {
c.withLockOnAccount(address, func(account *core.AccountWithVolumes) {
volumes := account.Volumes[asset]
volumes.Input = volumes.Input.OrZero().Add(amount)
volumes.Output = volumes.Output.OrZero()
account.Volumes[asset] = volumes
})
}

func (c *Cache) UpdateVolumeWithTX(tx core.Transaction) {
for _, posting := range tx.Postings {
c.addOutput(posting.Source, posting.Asset, posting.Amount)
c.addInput(posting.Destination, posting.Asset, posting.Amount)
}
account.Metadata = account.Metadata.Merge(m)
_ = c.cache.Set(address, account)
}

func (c *Cache) UpdateAccountMetadata(address string, m core.Metadata) error {
c.withLockOnAccount(address, func(account *core.AccountWithVolumes) {
account.Metadata = account.Metadata.Merge(m)
})

return nil
}

Expand Down
152 changes: 116 additions & 36 deletions pkg/ledger/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,11 @@ import (

"github.com/formancehq/ledger/pkg/core"
"github.com/formancehq/ledger/pkg/ledgertesting"
"github.com/formancehq/stack/libs/go-libs/pgtesting"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)

func TestCache(t *testing.T) {
require.NoError(t, pgtesting.CreatePostgresServer())
defer func() {
require.NoError(t, pgtesting.DestroyPostgresServer())
}()
func TestComputeAccountFromLogs(t *testing.T) {

driver := ledgertesting.StorageDriver(t)

Expand All @@ -41,27 +36,6 @@ func TestCache(t *testing.T) {
},
},
}))

account, err := cache.GetAccountWithVolumes(context.Background(), "world")
require.NoError(t, err)
require.NotNil(t, account)
require.Equal(t, core.AccountWithVolumes{
Account: core.Account{
Address: "world",
Metadata: core.Metadata{},
},
Volumes: map[string]core.Volumes{
"USD/2": {
Input: core.NewMonetaryInt(100),
Output: core.NewMonetaryInt(0),
},
},
}, *account)

volumes := account.Volumes["USD/2"]
volumes.Output = account.Volumes["USD/2"].Output.Add(core.NewMonetaryInt(10))
account.Volumes["USD/2"] = volumes

log := core.NewTransactionLog(core.Transaction{
TransactionData: core.TransactionData{
Postings: []core.Posting{{
Expand All @@ -72,23 +46,18 @@ func TestCache(t *testing.T) {
}},
},
}, nil)
require.NoError(t, ledgerStore.AppendLog(
context.Background(),
&log,
))
require.NoError(t, ledgerStore.AppendLog(context.Background(), &log))

log2 := core.NewSetMetadataLog(core.Now(), core.SetMetadataLogPayload{
TargetType: core.MetaTargetTypeAccount,
TargetID: "bank",
Metadata: core.Metadata{
"category": "gold",
},
})
require.NoError(t, ledgerStore.AppendLog(
context.Background(),
&log2,
))
require.NoError(t, ledgerStore.AppendLog(context.Background(), &log2))

account, err = cache.GetAccountWithVolumes(context.Background(), "bank")
account, err := cache.GetAccountWithVolumes(context.Background(), "bank")
require.NoError(t, err)
require.NotNil(t, account)

Expand All @@ -108,3 +77,114 @@ func TestCache(t *testing.T) {
}, *account)

}

func TestRetrieveValueFromCache(t *testing.T) {

driver := ledgertesting.StorageDriver(t)

require.NoError(t, driver.Initialize(context.Background()))

manager := NewManager(driver)
ledger := uuid.NewString()
cache, err := manager.ForLedger(context.Background(), ledger)
require.NoError(t, err)

ledgerStore, _, err := driver.GetLedgerStore(context.Background(), ledger, true)
require.NoError(t, err)

_, err = ledgerStore.Initialize(context.Background())
require.NoError(t, err)

require.NoError(t, ledgerStore.EnsureAccountExists(context.Background(), "world"))
account, err := cache.GetAccountWithVolumes(context.Background(), "world")
require.NoError(t, err)
require.Equal(t, core.AccountWithVolumes{
Account: core.Account{
Address: "world",
Metadata: core.Metadata{},
},
Volumes: map[string]core.Volumes{},
}, *account)

cache.UpdateVolumeWithTX(core.NewTransaction().WithPostings(
core.NewPosting("world", "bank", "USD", core.NewMonetaryInt(100)),
))
account, err = cache.GetAccountWithVolumes(context.Background(), "world")
require.NoError(t, err)
require.EqualValues(t, core.AccountWithVolumes{
Account: core.Account{
Address: "world",
Metadata: core.Metadata{},
},
Volumes: map[string]core.Volumes{
"USD": core.NewEmptyVolumes().WithOutput(core.NewMonetaryInt(100)),
},
}, *account)
}

func TestUpdateVolumes(t *testing.T) {

driver := ledgertesting.StorageDriver(t)

require.NoError(t, driver.Initialize(context.Background()))

manager := NewManager(driver)
ledger := uuid.NewString()
cache, err := manager.ForLedger(context.Background(), ledger)
require.NoError(t, err)

ledgerStore, _, err := driver.GetLedgerStore(context.Background(), ledger, true)
require.NoError(t, err)

_, err = ledgerStore.Initialize(context.Background())
require.NoError(t, err)

require.NoError(t, ledgerStore.EnsureAccountExists(context.Background(), "world"))
worldAccount, err := cache.GetAccountWithVolumes(context.Background(), "world")
require.NoError(t, err)
require.Equal(t, core.AccountWithVolumes{
Account: core.Account{
Address: "world",
Metadata: core.Metadata{},
},
Volumes: map[string]core.Volumes{},
}, *worldAccount)

require.NoError(t, ledgerStore.EnsureAccountExists(context.Background(), "bank"))
bankAccount, err := cache.GetAccountWithVolumes(context.Background(), "bank")
require.NoError(t, err)
require.Equal(t, core.AccountWithVolumes{
Account: core.Account{
Address: "bank",
Metadata: core.Metadata{},
},
Volumes: map[string]core.Volumes{},
}, *bankAccount)

cache.UpdateVolumeWithTX(core.NewTransaction().WithPostings(
core.NewPosting("world", "bank", "USD", core.NewMonetaryInt(100)),
))
worldAccount, err = cache.GetAccountWithVolumes(context.Background(), "world")
require.NoError(t, err)
require.EqualValues(t, core.AccountWithVolumes{
Account: core.Account{
Address: "world",
Metadata: core.Metadata{},
},
Volumes: map[string]core.Volumes{
"USD": core.NewEmptyVolumes().WithOutput(core.NewMonetaryInt(100)),
},
}, *worldAccount)

worldAccount, err = cache.GetAccountWithVolumes(context.Background(), "bank")
require.NoError(t, err)
require.Equal(t, core.AccountWithVolumes{
Account: core.Account{
Address: "bank",
Metadata: core.Metadata{},
},
Volumes: map[string]core.Volumes{
"USD": core.NewEmptyVolumes().WithInput(core.NewMonetaryInt(100)),
},
}, *worldAccount)
}
21 changes: 21 additions & 0 deletions pkg/ledger/cache/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package cache

import (
"os"
"testing"

"github.com/formancehq/stack/libs/go-libs/logging"
"github.com/formancehq/stack/libs/go-libs/pgtesting"
)

func TestMain(t *testing.M) {
if err := pgtesting.CreatePostgresServer(); err != nil {
logging.Error(err)
os.Exit(1)
}
code := t.Run()
if err := pgtesting.DestroyPostgresServer(); err != nil {
logging.Error("error stopping pg server:", err)
}
os.Exit(code)
}
2 changes: 1 addition & 1 deletion pkg/ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (l *Ledger) SaveMeta(ctx context.Context, targetType string, targetID inter
}
defer unlock(context.Background())

err = l.dbCache.UpdateAccountMetadata(ctx, targetID.(string), m)
err = l.dbCache.UpdateAccountMetadata(targetID.(string), m)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ledger/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (r *Runner) execute(ctx context.Context, script core.RunScript, dryRun bool
return expandedTx, result.AccountMetadata, nil
}

r.cache.Update(expandedTx.PostCommitVolumes)
r.cache.UpdateVolumeWithTX(expandedTx.Transaction)

return expandedTx, result.AccountMetadata, nil
}
Expand Down

0 comments on commit 485cba5

Please sign in to comment.