Skip to content

Commit

Permalink
feat: rework storage errors (#183)
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-nicolas authored and flemzord committed May 12, 2023
1 parent a099980 commit 2f444ea
Show file tree
Hide file tree
Showing 18 changed files with 156 additions and 212 deletions.
4 changes: 2 additions & 2 deletions pkg/analytics/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ func FromStorageAppIdProvider(driver storage.Driver) AppIdProvider {
var err error
if appId == "" {
appId, err = driver.GetSystemStore().GetConfiguration(ctx, "appId")
if err != nil && err != storage.ErrConfigurationNotFound {
if err != nil && !storage.IsNotFound(err) {
return "", err
}
if err == storage.ErrConfigurationNotFound {
if storage.IsNotFound(err) {
appId = uuid.New()
if err := driver.GetSystemStore().InsertConfiguration(ctx, "appId", appId); err != nil {
return "", err
Expand Down
10 changes: 5 additions & 5 deletions pkg/ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,23 @@ func (l *Ledger) CountTransactions(ctx context.Context, q storage.TransactionsQu

func (l *Ledger) GetTransaction(ctx context.Context, id uint64) (*core.ExpandedTransaction, error) {
tx, err := l.store.GetTransaction(ctx, id)
if err != nil {
if err != nil && !storage.IsNotFound(err) {
return nil, err
}
if tx == nil {

if storage.IsNotFound(err) {
return nil, runner.NewNotFoundError("transaction not found")
}

return tx, nil
}

func (l *Ledger) RevertTransaction(ctx context.Context, id uint64) (*core.ExpandedTransaction, error) {

revertedTx, err := l.store.GetTransaction(ctx, id)
if err != nil {
if err != nil && !storage.IsNotFound(err) {
return nil, errors.Wrap(err, fmt.Sprintf("getting transaction %d", id))
}
if revertedTx == nil {
if storage.IsNotFound(err) {
return nil, runner.NewNotFoundError(fmt.Sprintf("transaction %d not found", id))
}
if revertedTx.IsReverted() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ledger/query/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (w *Worker) initLedger(ctx context.Context, ledger string) error {
}

lastReadLogID, err := store.GetNextLogID(ctx)
if err != nil {
if err != nil && !storage.IsNotFound(err) {
return errors.Wrap(err, "reading last log")
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/ledger/query/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ func TestWorker(t *testing.T) {
"category": "gold",
}

nextLogID, err := ledgerStore.GetNextLogID(context.Background())
require.True(t, storage.IsNotFound(err))
require.Equal(t, uint64(0), nextLogID)

logs := []core.Log{
core.NewTransactionLog(tx0, nil),
core.NewTransactionLog(tx1, nil),
Expand Down
6 changes: 2 additions & 4 deletions pkg/ledger/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package runner

import (
"context"
"database/sql"
"fmt"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -50,7 +49,6 @@ func (r *Runner) Execute(
dryRun bool,
logComputer logComputer,
) (*core.ExpandedTransaction, *core.LogHolder, error) {

inFlight, err := r.acquireInflight(ctx, script)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -105,7 +103,7 @@ func (r *Runner) checkConstraints(ctx context.Context, script core.RunScript) er
// Log found
return NewConflictError("reference found in storage")
}
if err != sql.ErrNoRows {
if !storage.IsNotFound(err) {
return err
}
}
Expand Down Expand Up @@ -253,7 +251,7 @@ func (r *Runner) releaseInFlightWithTransaction(inFlight *inFlight, transaction

func New(store storage.LedgerStore, locker lock.Locker, cache *cache.Cache, compiler *numscript.Compiler, allowPastTimestamps bool) (*Runner, error) {
log, err := store.ReadLastLogWithType(context.Background(), core.NewTransactionLogType, core.RevertedTransactionLogType)
if err != nil && err != sql.ErrNoRows {
if err != nil && !storage.IsNotFound(err) {
return nil, err
}
var (
Expand Down
1 change: 0 additions & 1 deletion pkg/storage/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ type LedgerStore interface {
GetAccountWithVolumes(ctx context.Context, addr string) (*core.AccountWithVolumes, error)
UpdateVolumes(ctx context.Context, volumes core.AccountsAssetsVolumes) error
EnsureAccountExists(ctx context.Context, account string) error
GetLastTransaction(ctx context.Context) (*core.ExpandedTransaction, error)
CountTransactions(context.Context, TransactionsQuery) (uint64, error)
GetTransactions(context.Context, TransactionsQuery) (api.Cursor[core.ExpandedTransaction], error)
GetTransaction(ctx context.Context, txid uint64) (*core.ExpandedTransaction, error)
Expand Down
53 changes: 21 additions & 32 deletions pkg/storage/errors.go
Original file line number Diff line number Diff line change
@@ -1,54 +1,43 @@
package storage

import (
"errors"
"fmt"

"github.com/lib/pq"
"github.com/pkg/errors"
)

var (
ErrConfigurationNotFound = errors.New("configuration not found")
ErrNotFound = errors.New("not found")
ErrStoreNotInitialized = errors.New("store not initialized")
)

type Code string

const (
ConstraintFailed Code = "CONSTRAINT_FAILED"
TooManyClient Code = "TOO_MANY_CLIENT"
var (
// Specific pq sql errors
ErrConstraintFailed = pq.ErrorCode("23505")
ErrTooManyClient = pq.ErrorCode("53300")
)

type Error struct {
Code Code
OriginalError error
func IsNotFound(err error) bool {
return errors.Is(err, ErrNotFound)
}

func (e Error) Is(err error) bool {
storageErr, ok := err.(*Error)
if !ok {
return false
}
if storageErr.Code == "" {
return true
}
return storageErr.Code == e.Code
}

func (e Error) Error() string {
return fmt.Sprintf("%s [%s]", e.OriginalError, e.Code)
type Error struct {
code pq.ErrorCode
err error
}

func NewError(code Code, originalError error) *Error {
func NewError(code pq.ErrorCode, err error) *Error {
return &Error{
Code: code,
OriginalError: originalError,
code: code,
err: err,
}
}

func IsError(err error) bool {
return IsErrorCode(err, "")
func (e *Error) Error() string {
return fmt.Sprintf("[%s] %s", e.code, e.err)
}

func IsErrorCode(err error, code Code) bool {
return errors.Is(err, &Error{
Code: code,
})
func IsError(err error) bool {
return errors.Is(err, &Error{})
}
11 changes: 11 additions & 0 deletions pkg/storage/sqlstorage/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

"github.com/formancehq/ledger/pkg/ledgertesting"
"github.com/formancehq/ledger/pkg/storage"
"github.com/formancehq/ledger/pkg/storage/sqlstorage"
ledgerstore "github.com/formancehq/ledger/pkg/storage/sqlstorage/ledger"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -44,3 +45,13 @@ func TestConfiguration(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "bar", bar)
}

func TestConfigurationError(t *testing.T) {
d := ledgertesting.StorageDriver(t)

require.NoError(t, d.Initialize(context.Background()))

_, err := d.GetSystemStore().GetConfiguration(context.Background(), "not_existing")
require.Error(t, err)
require.True(t, storage.IsNotFound(err))
}
11 changes: 9 additions & 2 deletions pkg/storage/sqlstorage/errors/errors.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
package errors

import (
"database/sql"

"github.com/formancehq/ledger/pkg/storage"
"github.com/lib/pq"
"github.com/pkg/errors"
)

// postgresError is an helper to wrap postgres errors into storage errors
func PostgresError(err error) error {
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return storage.ErrNotFound
}

switch pge := err.(type) {
case *pq.Error:
switch pge.Code {
case "23505":
return storage.NewError(storage.ConstraintFailed, err)
return storage.NewError(storage.ErrConstraintFailed, err)
case "53300":
return storage.NewError(storage.TooManyClient, err)
return storage.NewError(storage.ErrTooManyClient, err)
}
}
}
Expand Down
44 changes: 19 additions & 25 deletions pkg/storage/sqlstorage/ledger/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/formancehq/ledger/pkg/core"
"github.com/formancehq/ledger/pkg/storage"
sqlerrors "github.com/formancehq/ledger/pkg/storage/sqlstorage/errors"
"github.com/formancehq/stack/libs/go-libs/api"
"github.com/pkg/errors"
"github.com/uptrace/bun"
Expand Down Expand Up @@ -105,7 +106,7 @@ func (s *Store) buildAccountsQuery(ctx context.Context, p storage.AccountsQuery)

func (s *Store) GetAccounts(ctx context.Context, q storage.AccountsQuery) (api.Cursor[core.Account], error) {
if !s.isInitialized {
return api.Cursor[core.Account]{}, ErrStoreNotInitialized
return api.Cursor[core.Account]{}, storage.ErrStoreNotInitialized
}

accounts := make([]core.Account, 0)
Expand All @@ -129,7 +130,7 @@ func (s *Store) GetAccounts(ctx context.Context, q storage.AccountsQuery) (api.C

rows, err := s.schema.QueryContext(ctx, sb.String())
if err != nil {
return api.Cursor[core.Account]{}, s.error(err)
return api.Cursor[core.Account]{}, sqlerrors.PostgresError(err)
}
defer rows.Close()

Expand Down Expand Up @@ -157,7 +158,7 @@ func (s *Store) GetAccounts(ctx context.Context, q storage.AccountsQuery) (api.C
}
raw, err := json.Marshal(t)
if err != nil {
return api.Cursor[core.Account]{}, s.error(err)
return api.Cursor[core.Account]{}, sqlerrors.PostgresError(err)
}
previous = base64.RawURLEncoding.EncodeToString(raw)
}
Expand All @@ -167,7 +168,7 @@ func (s *Store) GetAccounts(ctx context.Context, q storage.AccountsQuery) (api.C
t.Offset = q.Offset + q.PageSize
raw, err := json.Marshal(t)
if err != nil {
return api.Cursor[core.Account]{}, s.error(err)
return api.Cursor[core.Account]{}, sqlerrors.PostgresError(err)
}
next = base64.RawURLEncoding.EncodeToString(raw)
}
Expand All @@ -184,31 +185,24 @@ func (s *Store) GetAccounts(ctx context.Context, q storage.AccountsQuery) (api.C

func (s *Store) GetAccount(ctx context.Context, addr string) (*core.Account, error) {
if !s.isInitialized {
return nil, ErrStoreNotInitialized
return nil, storage.ErrStoreNotInitialized
}

query := s.schema.NewSelect(accountsTableName).
Model((*Accounts)(nil)).
Where("address = ?", addr).
String()

account := core.Account{
Address: addr,
Metadata: core.Metadata{},
}

row := s.schema.QueryRowContext(ctx, query)
if err := row.Err(); err != nil {
return nil, err
}

if err := row.Scan(&account.Address, &account.Metadata); err != nil {
if err == sql.ErrNoRows {
return &account, nil
}
return nil, err
var account core.Account
err := row.Scan(&account.Address, &account.Metadata)
if err != nil {
return nil, sqlerrors.PostgresError(err)
}

return &account, nil
}

Expand All @@ -227,7 +221,7 @@ func (s *Store) getAccountWithVolumes(ctx context.Context, exec interface {

rows, err := exec.QueryContext(ctx, query)
if err != nil {
return nil, s.error(err)
return nil, sqlerrors.PostgresError(err)
}
defer rows.Close()

Expand All @@ -240,7 +234,7 @@ func (s *Store) getAccountWithVolumes(ctx context.Context, exec interface {
for rows.Next() {
var asset, inputStr, outputStr sql.NullString
if err := rows.Scan(&acc.Metadata, &asset, &inputStr, &outputStr); err != nil {
return nil, s.error(err)
return nil, sqlerrors.PostgresError(err)
}

if asset.Valid {
Expand Down Expand Up @@ -273,7 +267,7 @@ func (s *Store) getAccountWithVolumes(ctx context.Context, exec interface {
}
}
if err := rows.Err(); err != nil {
return nil, s.error(err)
return nil, sqlerrors.PostgresError(err)
}

return &core.AccountWithVolumes{
Expand All @@ -284,25 +278,25 @@ func (s *Store) getAccountWithVolumes(ctx context.Context, exec interface {

func (s *Store) GetAccountWithVolumes(ctx context.Context, account string) (*core.AccountWithVolumes, error) {
if !s.isInitialized {
return nil, ErrStoreNotInitialized
return nil, storage.ErrStoreNotInitialized
}

return s.getAccountWithVolumes(ctx, s.schema, account)
}

func (s *Store) CountAccounts(ctx context.Context, q storage.AccountsQuery) (uint64, error) {
if !s.isInitialized {
return 0, ErrStoreNotInitialized
return 0, storage.ErrStoreNotInitialized
}

sb, _ := s.buildAccountsQuery(ctx, q)
count, err := sb.Count(ctx)
return uint64(count), s.error(err)
return uint64(count), sqlerrors.PostgresError(err)
}

func (s *Store) EnsureAccountExists(ctx context.Context, account string) error {
if !s.isInitialized {
return ErrStoreNotInitialized
return storage.ErrStoreNotInitialized
}

a := &Accounts{
Expand All @@ -315,12 +309,12 @@ func (s *Store) EnsureAccountExists(ctx context.Context, account string) error {
Ignore().
Exec(ctx)

return s.error(err)
return sqlerrors.PostgresError(err)
}

func (s *Store) UpdateAccountMetadata(ctx context.Context, address string, metadata core.Metadata) error {
if !s.isInitialized {
return ErrStoreNotInitialized
return storage.ErrStoreNotInitialized
}

a := &Accounts{
Expand Down
Loading

0 comments on commit 2f444ea

Please sign in to comment.