Skip to content

Commit

Permalink
feat: implement multi account based lock
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored and flemzord committed May 12, 2023
1 parent 4088a97 commit bb5f2b9
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 77 deletions.
2 changes: 2 additions & 0 deletions pkg/ledger/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ func (c *Cache) GetAccountWithVolumes(ctx context.Context, address string) (*cor
if err != nil {
return nil, err
}
entry.Lock()
defer entry.Unlock()
cp := entry.account.Copy()

return &cp, nil
Expand Down
32 changes: 23 additions & 9 deletions pkg/ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
type Ledger struct {
runner *runner.Runner
store storage.LedgerStore
locker lock.Locker
locker *lock.Locker
dbCache *cache.Cache
queryWorker *query.Worker
metricsRegistry metrics.PerLedgerMetricsRegistry
Expand All @@ -29,7 +29,7 @@ func New(
store storage.LedgerStore,
dbCache *cache.Cache,
runner *runner.Runner,
locker lock.Locker,
locker *lock.Locker,
queryWorker *query.Worker,
metricsRegistry metrics.PerLedgerMetricsRegistry,
) *Ledger {
Expand All @@ -44,14 +44,17 @@ func New(
}

func (l *Ledger) Close(ctx context.Context) error {
if err := l.store.Close(ctx); err != nil {
return errors.Wrap(err, "closing store")
}

if err := l.queryWorker.Stop(ctx); err != nil {
return errors.Wrap(err, "stopping query worker")
}

l.locker.Stop()

if err := l.store.Close(ctx); err != nil {
return errors.Wrap(err, "closing store")
}

return nil
}

Expand Down Expand Up @@ -101,6 +104,7 @@ func (l *Ledger) GetTransaction(ctx context.Context, id uint64) (*core.ExpandedT
}

func (l *Ledger) RevertTransaction(ctx context.Context, id uint64) (*core.ExpandedTransaction, error) {

revertedTx, err := l.store.GetTransaction(ctx, id)
if err != nil && !storage.IsNotFoundError(err) {
return nil, errors.Wrap(err, "get transaction before revert")
Expand Down Expand Up @@ -178,8 +182,9 @@ func (l *Ledger) SaveMeta(ctx context.Context, targetType string, targetID inter

at := core.Now()
var (
err error
log core.Log
err error
log core.Log
release cache.Release = func() {}
)
switch targetType {
case core.MetaTargetTypeTransaction:
Expand All @@ -202,7 +207,9 @@ func (l *Ledger) SaveMeta(ctx context.Context, targetType string, targetID inter

// Machine can access account metadata, so store the metadata until CQRS compute final of the account
// The cache can still evict the account entry before CQRS part compute the view
unlock, err := l.locker.Lock(ctx, l.store.Name(), targetID.(string))
unlock, err := l.locker.Lock(ctx, lock.Accounts{
Write: []string{targetID.(string)},
})
if err != nil {
return errors.Wrap(err, "lock account")
}
Expand All @@ -228,11 +235,18 @@ func (l *Ledger) SaveMeta(ctx context.Context, targetType string, targetID inter
}

err = l.store.AppendLog(ctx, &log)
logHolder := core.NewLogHolder(&log)
if err == nil {
logHolder := core.NewLogHolder(&log)
if err := l.writeLog(ctx, logHolder); err != nil {
release()
return err
}
go func() {
<-logHolder.Ingested
release()
}()
} else {
release()
}

return err
Expand Down
154 changes: 121 additions & 33 deletions pkg/ledger/lock/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,144 @@ package lock

import (
"context"
"sync"
)

type Unlock func(ctx context.Context)

type Locker interface {
Lock(ctx context.Context, ledger string, accounts ...string) (Unlock, error)
type Accounts struct {
Read []string
Write []string
}
type LockerFn func(ctx context.Context, ledger string, accounts ...string) (Unlock, error)

func (fn LockerFn) Lock(ctx context.Context, ledger string, accounts ...string) (Unlock, error) {
return fn(ctx, ledger, accounts...)
type lockQuery struct {
accounts Accounts
ready chan Unlock
}

var NoOpLocker = LockerFn(func(ctx context.Context, ledger string, accounts ...string) (Unlock, error) {
return func(ctx context.Context) {}, nil
})
type unlockQuery struct {
accounts Accounts
done chan struct{}
}

type InMemory struct {
globalLock sync.RWMutex
locks map[string]*sync.Mutex
type Locker struct {
readLocks map[string]struct{}
writeLocks map[string]struct{}
ledger string
lockQueries chan lockQuery
unlockQueries chan unlockQuery
pending []*lockQuery
stopChan chan chan struct{}
}

func (d *InMemory) Lock(ctx context.Context, ledger string, accounts ...string) (Unlock, error) {
d.globalLock.RLock()
lock, ok := d.locks[ledger]
d.globalLock.RUnlock()
if !ok {
d.globalLock.Lock()
lock, ok = d.locks[ledger] // Double check, the lock can have been acquired by another go routing between RUnlock and Lock
if !ok {
lock = &sync.Mutex{}
d.locks[ledger] = lock
func (d *Locker) Run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case query := <-d.unlockQueries:
d.unlock(ctx, query.accounts)
close(query.done)
d.tryNext(ctx)
case query := <-d.lockQueries:
if d.process(ctx, query) {
continue
}
d.pending = append(d.pending, &query)
case ch := <-d.stopChan:
close(ch)
return nil
}
d.globalLock.Unlock()
}
}

unlocked := false
lock.Lock()
return func(ctx context.Context) {
if unlocked {
func (d *Locker) process(ctx context.Context, query lockQuery) bool {
unlock, acquired := d.tryLock(ctx, query.accounts)
if acquired {
query.ready <- unlock
return true
}
return false
}

func (d *Locker) tryNext(ctx context.Context) {
for _, query := range d.pending {
if d.process(ctx, *query) {
return
}
lock.Unlock()
unlocked = true
}, nil
}
}

func (d *Locker) tryLock(ctx context.Context, accounts Accounts) (Unlock, bool) {

for _, account := range accounts.Read {
_, ok := d.writeLocks[account]
if ok {
return nil, false
}
}

for _, account := range accounts.Write {
_, ok := d.readLocks[account]
if ok {
return nil, false
}
_, ok = d.writeLocks[account]
if ok {
return nil, false
}
}

for _, account := range accounts.Read {
d.readLocks[account] = struct{}{}
}
for _, account := range accounts.Write {
d.writeLocks[account] = struct{}{}
}

return func(ctx context.Context) {
q := unlockQuery{
accounts: accounts,
done: make(chan struct{}),
}
d.unlockQueries <- q
select {
case <-ctx.Done():
case <-q.done:
}
}, true
}

func (d *Locker) unlock(ctx context.Context, accounts Accounts) {
for _, account := range accounts.Read {
delete(d.readLocks, account)
}
for _, account := range accounts.Write {
delete(d.writeLocks, account)
}
}

func (d *Locker) Lock(ctx context.Context, accounts Accounts) (Unlock, error) {
q := lockQuery{
accounts: accounts,
ready: make(chan Unlock, 1),
}
d.lockQueries <- q
return <-q.ready, nil
}

func (d *Locker) Stop() {
ch := make(chan struct{})
d.stopChan <- ch
<-ch
}

func NewInMemory() *InMemory {
return &InMemory{
locks: map[string]*sync.Mutex{},
func New(ledger string) *Locker {
return &Locker{
readLocks: map[string]struct{}{},
writeLocks: map[string]struct{}{},
ledger: ledger,
lockQueries: make(chan lockQuery),
unlockQueries: make(chan unlockQuery),
stopChan: make(chan chan struct{}),
}
}
11 changes: 0 additions & 11 deletions pkg/ledger/lock/module.go

This file was deleted.

7 changes: 2 additions & 5 deletions pkg/ledger/module.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package ledger

import (
"github.com/formancehq/ledger/pkg/ledger/lock"
"github.com/formancehq/ledger/pkg/ledger/monitor"
"github.com/formancehq/ledger/pkg/ledger/query"
"github.com/formancehq/ledger/pkg/opentelemetry/metrics"
Expand All @@ -11,14 +10,12 @@ import (

func Module(allowPastTimestamp bool) fx.Option {
return fx.Options(
lock.Module(),
fx.Provide(func(
storageDriver storage.Driver,
monitor monitor.Monitor,
locker lock.Locker,
metricsRegsitry metrics.GlobalMetricsRegistry,
metricsRegistry metrics.GlobalMetricsRegistry,
) *Resolver {
return NewResolver(storageDriver, monitor, locker, allowPastTimestamp, metricsRegsitry)
return NewResolver(storageDriver, monitor, allowPastTimestamp, metricsRegistry)
}),
fx.Provide(fx.Annotate(monitor.NewNoOpMonitor, fx.As(new(monitor.Monitor)))),
fx.Provide(fx.Annotate(metrics.NewNoOpMetricsRegistry, fx.As(new(metrics.GlobalMetricsRegistry)))),
Expand Down
27 changes: 13 additions & 14 deletions pkg/ledger/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,21 @@ type Resolver struct {
storageDriver storage.Driver
monitor monitor.Monitor
lock sync.RWMutex
metricsRegsitry metrics.GlobalMetricsRegistry
locker lock.Locker
metricsRegistry metrics.GlobalMetricsRegistry
//TODO(gfyrag): add a routine to clean old ledger
ledgers map[string]*Ledger
compiler *numscript.Compiler
allowPastTimestamps bool
}

func NewResolver(
storageDriver storage.Driver,
monitor monitor.Monitor,
locker lock.Locker,
allowPastTimestamps bool,
metricsRegsitry metrics.GlobalMetricsRegistry,
) *Resolver {
func NewResolver(storageDriver storage.Driver, monitor monitor.Monitor, allowPastTimestamps bool, metricsRegistry metrics.GlobalMetricsRegistry) *Resolver {
return &Resolver{
storageDriver: storageDriver,
monitor: monitor,
locker: locker,
metricsRegsitry: metricsRegsitry,
compiler: numscript.NewCompiler(),
ledgers: map[string]*Ledger{},
allowPastTimestamps: allowPastTimestamps,
metricsRegistry: metricsRegistry,
}
}

Expand All @@ -64,13 +56,20 @@ func (r *Resolver) GetLedger(ctx context.Context, name string) (*Ledger, error)
}
}

locker := lock.New(name)
go func() {
if err := locker.Run(context.Background()); err != nil {
panic(err)
}
}()

metricsRegistry, err := metrics.RegisterPerLedgerMetricsRegistry(name)
if err != nil {
return nil, errors.Wrap(err, "registering metrics")
}

cache := cache.New(store, metricsRegistry)
runner, err := runner.New(store, r.locker, cache, r.compiler, name, r.allowPastTimestamps)
runner, err := runner.New(store, locker, cache, r.compiler, name, r.allowPastTimestamps)
if err != nil {
return nil, errors.Wrap(err, "creating ledger runner")
}
Expand All @@ -86,9 +85,9 @@ func (r *Resolver) GetLedger(ctx context.Context, name string) (*Ledger, error)
}
}()

ledger = New(store, cache, runner, r.locker, queryWorker, metricsRegistry)
ledger = New(store, cache, runner, locker, queryWorker, metricsRegistry)
r.ledgers[name] = ledger
r.metricsRegsitry.ActiveLedgers().Add(ctx, +1)
r.metricsRegistry.ActiveLedgers().Add(ctx, +1)
}

return ledger, nil
Expand Down
Loading

0 comments on commit bb5f2b9

Please sign in to comment.