Skip to content

Commit

Permalink
feat: review
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Aug 16, 2023
1 parent 2a9a44c commit 2c4ccbf
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 11 deletions.
19 changes: 17 additions & 2 deletions pkg/storage/sqlstorage/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/numary/ledger/pkg/core"
"github.com/numary/ledger/pkg/storage"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -44,7 +45,21 @@ func (s *Store) commit(ctx context.Context, txs ...core.ExpandedTransaction) ([]
return logs, nil
}

func (s *Store) Commit(ctx context.Context, txs ...core.ExpandedTransaction) error {
_, err := s.commit(ctx, txs...)
func (s *Store) Commit(ctx context.Context, txs ...core.ExpandedTransaction) (err error) {
if !storage.IsTransactional(ctx) {
ctx = storage.TransactionalContext(ctx)
defer func() {
if err == nil {
if commitErr := storage.CommitTransaction(ctx); commitErr != nil {
panic(commitErr)
}
} else {
if rollbackErr := storage.RollbackTransaction(ctx); rollbackErr != nil {
panic(rollbackErr)
}
}
}()
}
_, err = s.commit(ctx, txs...)
return err
}
5 changes: 5 additions & 0 deletions pkg/storage/sqlstorage/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ func (d *Driver) Initialize(ctx context.Context) (err error) {
}

func (d *Driver) Close(ctx context.Context) error {
for _, store := range d.registeredLedgers {
if err := store.Close(ctx); err != nil {
return err
}
}
err := d.systemSchema.Close(ctx)
if err != nil {
return err
Expand Down
28 changes: 27 additions & 1 deletion pkg/storage/sqlstorage/store_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package sqlstorage

import (
"context"
"fmt"
"os"
"strconv"
"time"

"github.com/bits-and-blooms/bloom"
Expand Down Expand Up @@ -68,12 +71,35 @@ func (s *Store) Close(ctx context.Context) error {
func NewStore(schema Schema, executorProvider func(ctx context.Context) (executor, error),
onClose, onDelete func(ctx context.Context) error) *Store {

const (
bloomFilterSizeEnvVar = "NUMARY_BLOOM_FILTER_SIZE"
bloomFilterErrorRateEnvVar = "NUMARY_BLOOM_FILTER_ERROR_RATE"
)

var (
bloomSize uint64 = 100000
bloomErrorRate = 0.01
err error
)
if bloomSizeFromEnv := os.Getenv(bloomFilterSizeEnvVar); bloomSizeFromEnv != "" {
bloomSize, err = strconv.ParseUint(bloomSizeFromEnv, 10, 64)
if err != nil {
panic(errors.Wrap(err, fmt.Sprint("Parsing", bloomFilterSizeEnvVar, "env var")))
}
}
if bloomErrorRateFromEnv := os.Getenv(bloomFilterErrorRateEnvVar); bloomErrorRateFromEnv != "" {
bloomErrorRate, err = strconv.ParseFloat(bloomErrorRateFromEnv, 64)
if err != nil {
panic(errors.Wrap(err, fmt.Sprint("Parsing", bloomFilterErrorRateEnvVar, "env var")))
}
}

return &Store{
executorProvider: executorProvider,
schema: schema,
onClose: onClose,
onDelete: onDelete,
bloom: bloom.NewWithEstimates(1000000, 0.01), // TODO: Configure
bloom: bloom.NewWithEstimates(uint(bloomSize), bloomErrorRate),
cache: cache.New(5*time.Minute, 10*time.Minute),
}
}
Expand Down
20 changes: 13 additions & 7 deletions pkg/storage/sqlstorage/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,25 @@ import (

"github.com/huandu/go-sqlbuilder"
"github.com/numary/ledger/pkg/core"
"github.com/numary/ledger/pkg/storage"
)

func (s *Store) updateVolumes(ctx context.Context, volumes core.AccountsAssetsVolumes) error {
for address, accountVolumes := range volumes {

entry, ok := s.cache.Get(address)
if ok {
account := entry.(*core.AccountWithVolumes)
for asset, volumes := range accountVolumes {
account.Volumes[asset] = volumes
account.Balances[asset] = volumes.Balance()
storage.OnTransactionCommitted(ctx, func() {
for address, accountVolumes := range volumes {
entry, ok := s.cache.Get(address)
if ok {
account := entry.(*core.AccountWithVolumes)
for asset, volumes := range accountVolumes {
account.Volumes[asset] = volumes
account.Balances[asset] = volumes.Balance()
}
}
}
})

for address, accountVolumes := range volumes {

accountBy, err := json.Marshal(strings.Split(address, ":"))
if err != nil {
Expand Down
19 changes: 18 additions & 1 deletion pkg/storage/transactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type contextHolder struct {
transaction any
commit func(ctx context.Context) error
rollback func(ctx context.Context) error
onCommit []func()
}

type contextHolderKeyStruct struct{}
Expand Down Expand Up @@ -77,7 +78,15 @@ func CommitTransaction(ctx context.Context) error {
if holder.transaction == nil {
return errors.New("transaction not initialized")
}
return holder.commit(ctx)
err := holder.commit(ctx)
if err != nil {
return err
}

for _, onCommit := range holder.onCommit {
onCommit()
}
return nil
}

func RollbackTransaction(ctx context.Context) error {
Expand All @@ -90,3 +99,11 @@ func RollbackTransaction(ctx context.Context) error {
}
return holder.rollback(ctx)
}

func OnTransactionCommitted(ctx context.Context, callback func()) {
holder := getContextHolder(ctx)
if holder == nil {
panic("context holder is nil")
}
holder.onCommit = append(holder.onCommit, callback)
}

0 comments on commit 2c4ccbf

Please sign in to comment.