From 4320ed033dde11e4eb910b87c654668cb38c334d Mon Sep 17 00:00:00 2001 From: Alexandros Filios Date: Tue, 4 Feb 2025 15:25:08 +0100 Subject: [PATCH] Retry when sql busy Signed-off-by: Alexandros Filios --- .../fabric/services/endorser/endorsement.go | 23 +++++++--- platform/view/services/db/driver/driver.go | 2 + .../db/driver/sql/common/auditinfo.go | 4 +- .../db/{dbtest => driver/sql/common}/bench.go | 2 +- .../services/db/driver/sql/common/binding.go | 4 +- .../db/driver/sql/common/endorsetx.go | 2 +- .../services/db/driver/sql/common/envelope.go | 2 +- .../{dbtest => driver/sql/common}/helpers.go | 24 +++++------ .../services/db/driver/sql/common/metadata.go | 2 +- .../db/driver/sql/common/signerinfo.go | 19 +++++---- .../db/driver/sql/common/simplekeydata.go | 4 +- .../db/driver/sql/common/test_utils.go | 7 ++-- .../db/driver/sql/common/unversioned.go | 8 ++-- .../services/db/driver/sql/common/utils.go | 8 +++- .../services/db/driver/sql/common/vault.go | 4 +- .../db/driver/sql/postgres/bench_test.go | 12 +++--- .../db/driver/sql/sqlite/bench_test.go | 10 ++--- .../services/db/driver/sql/sqlite/binding.go | 6 +-- .../db/driver/sql/sqlite/endorsetx.go | 6 +-- .../services/db/driver/sql/sqlite/envelope.go | 6 +-- .../db/driver/sql/sqlite/errormapper.go | 2 + .../services/db/driver/sql/sqlite/metadata.go | 6 +-- .../db/driver/sql/sqlite/retrywritedb.go | 42 +++++++++++++++++++ .../db/driver/sql/sqlite/signerinfo.go | 6 +-- .../db/driver/sql/sqlite/unversioned.go | 2 +- .../services/db/driver/sql/sqlite/vault.go | 8 ++-- 26 files changed, 143 insertions(+), 78 deletions(-) rename platform/view/services/db/{dbtest => driver/sql/common}/bench.go (99%) rename platform/view/services/db/{dbtest => driver/sql/common}/helpers.go (98%) create mode 100644 platform/view/services/db/driver/sql/sqlite/retrywritedb.go diff --git a/platform/fabric/services/endorser/endorsement.go b/platform/fabric/services/endorser/endorsement.go index b4b929dbc..7d8a84d6a 100644 --- a/platform/fabric/services/endorser/endorsement.go +++ b/platform/fabric/services/endorser/endorsement.go @@ -40,6 +40,7 @@ func (c *collectEndorsementsView) Call(context view.Context) (interface{}, error vProviders = append(vProviders, &verifierProviderWrapper{m: mspManager}) // Get results to send + span.AddEvent("Get results to send") res, err := c.tx.Results() if err != nil { return nil, errors.Wrapf(err, "failed getting tx results") @@ -47,22 +48,26 @@ func (c *collectEndorsementsView) Call(context view.Context) (interface{}, error // Contact sequantially all parties. logger.Debugf("Collect Endorsements from [%d] parties [%v]", len(c.parties), c.parties) + span.AddEvent("Start collecting endorsement") for _, party := range c.parties { - span.AddEvent("start_collect_endorsement") + span.AddEvent("Collect endorsement iteration") logger.Debugf("Collect Endorsements On Simulation from [%s]", party) var err error if context.IsMe(party) { + span.AddEvent("Start endorsing locally") logger.Debugf("This is me %s, endorse locally.", party) // Endorse it err = c.tx.EndorseWithIdentity(party) if err != nil { return nil, errors.Wrap(err, "failed endorsing transaction") } + span.AddEvent("Finish endorsing locally") continue } var txRaw []byte + span.AddEvent("Get transaction bytes") if c.deleteTransient { txRaw, err = c.tx.BytesNoTransient() if err != nil { @@ -75,16 +80,18 @@ func (c *collectEndorsementsView) Call(context view.Context) (interface{}, error } } + span.AddEvent("Get session with party") session, err := context.GetSession(context.Initiator(), party) if err != nil { return nil, errors.Wrap(err, "failed getting session") } // Get a channel to receive the answer + span.AddEvent("Start receiving from party") ch := session.Receive() // Send transaction - span.AddEvent("send_tx") + span.AddEvent("Send raw transaction to party") err = session.SendWithContext(context.Context(), txRaw) if err != nil { return nil, errors.Wrap(err, "failed sending transaction content") @@ -92,7 +99,7 @@ func (c *collectEndorsementsView) Call(context view.Context) (interface{}, error timeout := time.NewTimer(time.Minute) - span.AddEvent("wait_tx") + span.AddEvent("Wait for transaction") // Wait for the answer var msg *view.Message select { @@ -102,7 +109,7 @@ func (c *collectEndorsementsView) Call(context view.Context) (interface{}, error timeout.Stop() return nil, errors.Errorf("Timeout from party %s", party) } - span.AddEvent("receive_tx") + span.AddEvent("Received transaction from party") if msg.Status == view.ERROR { return nil, errors.New(string(msg.Payload)) } @@ -122,6 +129,7 @@ func (c *collectEndorsementsView) Call(context view.Context) (interface{}, error } tm := fns.TransactionManager() for _, response := range responses { + span.AddEvent("Parse proposal response") proposalResponse, err := tm.NewProposalResponseFromBytes(response) if err != nil { return nil, errors.Wrap(err, "failed unmarshalling received proposal response") @@ -130,15 +138,18 @@ func (c *collectEndorsementsView) Call(context view.Context) (interface{}, error endorser := view.Identity(proposalResponse.Endorser()) // Check the validity of the response + span.AddEvent("Check response validity") if view2.GetEndpointService(context).IsBoundTo(endorser, party) { found = true } // TODO: check the verifier providers, if any verified := false + span.AddEvent("Start verification with providers") for _, provider := range vProviders { - span.AddEvent("verify_endorsement") + span.AddEvent("Verify endorsement") err := proposalResponse.VerifyEndorsement(provider) + span.AddEvent("Verified endorsement") if err == nil { logger.Debugf("endorsement [%s] is valid", endorser) verified = true @@ -156,7 +167,9 @@ func (c *collectEndorsementsView) Call(context view.Context) (interface{}, error } logger.Debugf("append response from party [%s]", party) + span.AddEvent("Append proposal response") err = c.tx.AppendProposalResponse(proposalResponse) + span.AddEvent("Appended proposal response") if err != nil { return nil, errors.Wrap(err, "failed appending received proposal response") } diff --git a/platform/view/services/db/driver/driver.go b/platform/view/services/db/driver/driver.go index 23affa4b5..2e36bac4d 100644 --- a/platform/view/services/db/driver/driver.go +++ b/platform/view/services/db/driver/driver.go @@ -17,6 +17,8 @@ var ( UniqueKeyViolation = errors.New("unique key violation") // DeadlockDetected happens when two transactions are taking place at the same time and interact with the same rows DeadlockDetected = errors.New("deadlock detected") + // SqlBusy happens when two transactions are trying to write at the same time. Can be avoided by opening the database in exclusive mode + SqlBusy = errors.New("sql is busy") ) type SQLError = error diff --git a/platform/view/services/db/driver/sql/common/auditinfo.go b/platform/view/services/db/driver/sql/common/auditinfo.go index cb0b19dd3..9ee7f8aed 100644 --- a/platform/view/services/db/driver/sql/common/auditinfo.go +++ b/platform/view/services/db/driver/sql/common/auditinfo.go @@ -15,7 +15,7 @@ import ( "github.com/pkg/errors" ) -func NewAuditInfoPersistence(writeDB *sql.DB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *AuditInfoPersistence { +func NewAuditInfoPersistence(writeDB WriteDB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *AuditInfoPersistence { return &AuditInfoPersistence{ table: table, errorWrapper: errorWrapper, @@ -29,7 +29,7 @@ type AuditInfoPersistence struct { table string errorWrapper driver.SQLErrorWrapper readDB *sql.DB - writeDB *sql.DB + writeDB WriteDB ci Interpreter } diff --git a/platform/view/services/db/dbtest/bench.go b/platform/view/services/db/driver/sql/common/bench.go similarity index 99% rename from platform/view/services/db/dbtest/bench.go rename to platform/view/services/db/driver/sql/common/bench.go index fdda6e256..143a77392 100644 --- a/platform/view/services/db/dbtest/bench.go +++ b/platform/view/services/db/driver/sql/common/bench.go @@ -4,7 +4,7 @@ Copyright IBM Corp. All Rights Reserved. SPDX-License-Identifier: Apache-2.0 */ -package dbtest +package common import ( "fmt" diff --git a/platform/view/services/db/driver/sql/common/binding.go b/platform/view/services/db/driver/sql/common/binding.go index ddc8c4cdf..a0e47838f 100644 --- a/platform/view/services/db/driver/sql/common/binding.go +++ b/platform/view/services/db/driver/sql/common/binding.go @@ -15,7 +15,7 @@ import ( "github.com/pkg/errors" ) -func NewBindingPersistence(writeDB *sql.DB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *BindingPersistence { +func NewBindingPersistence(readDB *sql.DB, writeDB WriteDB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *BindingPersistence { return &BindingPersistence{ table: table, errorWrapper: errorWrapper, @@ -29,7 +29,7 @@ type BindingPersistence struct { table string errorWrapper driver.SQLErrorWrapper readDB *sql.DB - writeDB *sql.DB + writeDB WriteDB ci Interpreter } diff --git a/platform/view/services/db/driver/sql/common/endorsetx.go b/platform/view/services/db/driver/sql/common/endorsetx.go index 6b5b7ca7c..da5e59b30 100644 --- a/platform/view/services/db/driver/sql/common/endorsetx.go +++ b/platform/view/services/db/driver/sql/common/endorsetx.go @@ -12,7 +12,7 @@ import ( "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver" ) -func NewEndorseTxPersistence(writeDB *sql.DB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *EndorseTxPersistence { +func NewEndorseTxPersistence(writeDB WriteDB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *EndorseTxPersistence { return &EndorseTxPersistence{p: newSimpleKeyDataPersistence(writeDB, readDB, table, errorWrapper, ci)} } diff --git a/platform/view/services/db/driver/sql/common/envelope.go b/platform/view/services/db/driver/sql/common/envelope.go index 1dda073f5..135d80171 100644 --- a/platform/view/services/db/driver/sql/common/envelope.go +++ b/platform/view/services/db/driver/sql/common/envelope.go @@ -12,7 +12,7 @@ import ( "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver" ) -func NewEnvelopePersistence(writeDB *sql.DB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *EnvelopePersistence { +func NewEnvelopePersistence(writeDB WriteDB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *EnvelopePersistence { return &EnvelopePersistence{p: newSimpleKeyDataPersistence(writeDB, readDB, table, errorWrapper, ci)} } diff --git a/platform/view/services/db/dbtest/helpers.go b/platform/view/services/db/driver/sql/common/helpers.go similarity index 98% rename from platform/view/services/db/dbtest/helpers.go rename to platform/view/services/db/driver/sql/common/helpers.go index f36e38ab2..6e5602cb7 100644 --- a/platform/view/services/db/dbtest/helpers.go +++ b/platform/view/services/db/driver/sql/common/helpers.go @@ -4,7 +4,7 @@ Copyright IBM Corp. All Rights Reserved. SPDX-License-Identifier: Apache-2.0 */ -package dbtest +package common import ( "database/sql" @@ -59,12 +59,12 @@ var UnversionedNotifierCases = []struct { var ErrorCases = []struct { Name string - Fn func(t *testing.T, readDB *sql.DB, writeDB *sql.DB, errorWrapper driver.SQLErrorWrapper, table string) + Fn func(t *testing.T, readDB *sql.DB, writeDB WriteDB, errorWrapper driver.SQLErrorWrapper, table string) }{ {"Duplicate", TTestDuplicate}, } -func TTestDuplicate(t *testing.T, _ *sql.DB, writeDB *sql.DB, errorWrapper driver.SQLErrorWrapper, table string) { +func TTestDuplicate(t *testing.T, _ *sql.DB, writeDB WriteDB, errorWrapper driver.SQLErrorWrapper, table string) { ns := "namespace" tx, err := writeDB.Begin() @@ -201,7 +201,7 @@ func TTestSimpleReadWrite(t *testing.T, db driver.UnversionedPersistence) { assert.NoError(t, err) assert.Equal(t, driver.UnversionedValue("val1"), vv) - // delete state + // deleteOp state err = db.BeginUpdate() assert.NoError(t, err) err = db.DeleteState(ns, key) @@ -857,24 +857,24 @@ func TTestUnversionedNotifierSimple(t *testing.T, db driver.UnversionedNotifier) results, err := waitForResults(ch, 3, 1*time.Second) assert.NoError(t, err) - assert.Equal(t, []notifyEvent{{upsert, "ns", "key"}, {upsert, "ns", "key"}, {delete, "ns", "key"}}, results) + assert.Equal(t, []notifyEvent{{upsertOp, "ns", "key"}, {upsertOp, "ns", "key"}, {deleteOp, "ns", "key"}}, results) } type opType int const ( - unknown opType = iota - delete - upsert + unknownOp opType = iota + deleteOp + upsertOp ) // We treat update/inserts as the same, because we don't need the operation type. // Distinguishing the two cases for sqlite would require more logic. var opTypeMap = map[driver.Operation]opType{ - driver.Unknown: unknown, - driver.Update: upsert, - driver.Insert: upsert, - driver.Delete: delete, + driver.Unknown: unknownOp, + driver.Update: upsertOp, + driver.Insert: upsertOp, + driver.Delete: deleteOp, } type notifier interface { diff --git a/platform/view/services/db/driver/sql/common/metadata.go b/platform/view/services/db/driver/sql/common/metadata.go index 7eb6ec1ba..1ea7bb76f 100644 --- a/platform/view/services/db/driver/sql/common/metadata.go +++ b/platform/view/services/db/driver/sql/common/metadata.go @@ -12,7 +12,7 @@ import ( "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver" ) -func NewMetadataPersistence(writeDB *sql.DB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *MetadataPersistence { +func NewMetadataPersistence(writeDB WriteDB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *MetadataPersistence { return &MetadataPersistence{p: newSimpleKeyDataPersistence(writeDB, readDB, table, errorWrapper, ci)} } diff --git a/platform/view/services/db/driver/sql/common/signerinfo.go b/platform/view/services/db/driver/sql/common/signerinfo.go index 67949a114..b5e8c31b7 100644 --- a/platform/view/services/db/driver/sql/common/signerinfo.go +++ b/platform/view/services/db/driver/sql/common/signerinfo.go @@ -15,7 +15,7 @@ import ( "github.com/pkg/errors" ) -func NewSignerInfoPersistence(writeDB *sql.DB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *SignerInfoPersistence { +func NewSignerInfoPersistence(writeDB WriteDB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *SignerInfoPersistence { return &SignerInfoPersistence{ table: table, errorWrapper: errorWrapper, @@ -29,7 +29,7 @@ type SignerInfoPersistence struct { table string errorWrapper driver.SQLErrorWrapper readDB *sql.DB - writeDB *sql.DB + writeDB WriteDB ci Interpreter } @@ -49,7 +49,7 @@ func (db *SignerInfoPersistence) FilterExistingSigners(ids ...view.Identity) ([] if err != nil { return nil, errors.Wrapf(err, "error querying db") } - defer func() { _ = rows.Close() }() + defer rows.Close() existingSigners := make([]view.Identity, 0) for rows.Next() { @@ -67,15 +67,16 @@ func (db *SignerInfoPersistence) PutSigner(id view.Identity) error { query := fmt.Sprintf("INSERT INTO %s (id) VALUES ($1)", db.table) logger.Debug(query, id) _, err := db.writeDB.Exec(query, id.UniqueID()) - if err != nil && errors.Is(db.errorWrapper.WrapError(err), driver.UniqueKeyViolation) { - logger.Warnf("Signer [%s] already in db. Skipping...", id) + if err == nil { + logger.Debugf("Signer [%s] registered", id) return nil } - if err != nil { - return errors.Wrapf(err, "failed executing query [%s]", query) + if errors.Is(db.errorWrapper.WrapError(err), driver.UniqueKeyViolation) { + logger.Warnf("Signer [%s] already in db. Skipping...", id) + return nil } - logger.Debugf("Signer [%s] registered", id) - return nil + + return errors.Wrapf(err, "failed executing query [%s]", query) } func (db *SignerInfoPersistence) CreateSchema() error { diff --git a/platform/view/services/db/driver/sql/common/simplekeydata.go b/platform/view/services/db/driver/sql/common/simplekeydata.go index 915b9b51b..890d5d881 100644 --- a/platform/view/services/db/driver/sql/common/simplekeydata.go +++ b/platform/view/services/db/driver/sql/common/simplekeydata.go @@ -14,7 +14,7 @@ import ( "github.com/pkg/errors" ) -func newSimpleKeyDataPersistence(writeDB *sql.DB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *simpleKeyDataPersistence { +func newSimpleKeyDataPersistence(writeDB WriteDB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *simpleKeyDataPersistence { return &simpleKeyDataPersistence{ table: table, errorWrapper: errorWrapper, @@ -28,7 +28,7 @@ type simpleKeyDataPersistence struct { table string errorWrapper driver.SQLErrorWrapper readDB *sql.DB - writeDB *sql.DB + writeDB WriteDB ci Interpreter } diff --git a/platform/view/services/db/driver/sql/common/test_utils.go b/platform/view/services/db/driver/sql/common/test_utils.go index ef1193fca..fdd009e0a 100644 --- a/platform/view/services/db/driver/sql/common/test_utils.go +++ b/platform/view/services/db/driver/sql/common/test_utils.go @@ -9,7 +9,6 @@ package common import ( "testing" - "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/dbtest" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver" _ "modernc.org/sqlite" ) @@ -20,7 +19,7 @@ func TestCases(t *testing.T, unversionedProvider provider[driver.UnversionedPersistence], unversionedNotifierProvider provider[driver.UnversionedNotifier], baseUnpacker func(p driver.UnversionedPersistence) *UnversionedPersistence) { - for _, c := range dbtest.UnversionedCases { + for _, c := range UnversionedCases { un, err := unversionedProvider(c.Name) if err != nil { t.Fatal(err) @@ -30,7 +29,7 @@ func TestCases(t *testing.T, c.Fn(xt, un) }) } - for _, c := range dbtest.ErrorCases { + for _, c := range ErrorCases { un, err := unversionedProvider(c.Name) if err != nil { t.Fatal(err) @@ -41,7 +40,7 @@ func TestCases(t *testing.T, c.Fn(xt, b.readDB, b.writeDB, b.errorWrapper, b.table) }) } - for _, c := range dbtest.UnversionedNotifierCases { + for _, c := range UnversionedNotifierCases { un, err := unversionedNotifierProvider(c.Name) if err != nil { t.Fatal(err) diff --git a/platform/view/services/db/driver/sql/common/unversioned.go b/platform/view/services/db/driver/sql/common/unversioned.go index 982435d33..833fb888d 100644 --- a/platform/view/services/db/driver/sql/common/unversioned.go +++ b/platform/view/services/db/driver/sql/common/unversioned.go @@ -24,7 +24,7 @@ var logger = logging.MustGetLogger("view-sdk.db.driver.sql") type UnversionedPersistence struct { *common.BaseDB[*sql.Tx] - writeDB *sql.DB + writeDB WriteDB readDB *sql.DB table string @@ -32,7 +32,7 @@ type UnversionedPersistence struct { ci Interpreter } -func NewUnversionedPersistence(writeDB *sql.DB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *UnversionedPersistence { +func NewUnversionedPersistence(writeDB WriteDB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *UnversionedPersistence { return &UnversionedPersistence{ BaseDB: common.NewBaseDB(func() (*sql.Tx, error) { return writeDB.Begin() }), readDB: readDB, @@ -142,7 +142,7 @@ func (db *UnversionedPersistence) DeleteStatesWithTx(tx *sql.Tx, namespace drive if err != nil { errs := make(map[driver2.PKey]error) for _, key := range keys { - errs[key] = errors.Wrapf(db.errorWrapper.WrapError(err), "could not delete val for key [%s]", key) + errs[key] = errors.Wrapf(db.errorWrapper.WrapError(err), "could not deleteOp val for key [%s]", key) } return errs } @@ -170,7 +170,7 @@ func (db *UnversionedPersistence) SetStateWithTx(tx *sql.Tx, ns driver2.Namespac val = append([]byte(nil), val...) - // Portable upsert + // Portable upsertOp exists, err := db.exists(tx, ns, pkey) if err != nil { return err diff --git a/platform/view/services/db/driver/sql/common/utils.go b/platform/view/services/db/driver/sql/common/utils.go index 90fe86d82..094006496 100644 --- a/platform/view/services/db/driver/sql/common/utils.go +++ b/platform/view/services/db/driver/sql/common/utils.go @@ -25,6 +25,12 @@ const ( DefaultMaxIdleTime = time.Minute ) +type WriteDB interface { + Begin() (*sql.Tx, error) + Exec(query string, args ...any) (sql.Result, error) + Close() error +} + type Sanitizer interface { Encode(string) (string, error) Decode(string) (string, error) @@ -99,7 +105,7 @@ func (c *TableNameCreator) MustGetTableName(name string) string { return fmt.Sprintf("%s%s", c.prefix, name) } -func InitSchema(db *sql.DB, schemas ...string) (err error) { +func InitSchema(db WriteDB, schemas ...string) (err error) { logger.Info("creating tables") tx, err := db.Begin() if err != nil { diff --git a/platform/view/services/db/driver/sql/common/vault.go b/platform/view/services/db/driver/sql/common/vault.go index 3b5edc229..bf4604772 100644 --- a/platform/view/services/db/driver/sql/common/vault.go +++ b/platform/view/services/db/driver/sql/common/vault.go @@ -28,7 +28,7 @@ type VaultTables struct { type stateRow = [5]any -func NewVaultPersistence(writeDB *sql.DB, readDB *sql.DB, tables VaultTables, errorWrapper driver2.SQLErrorWrapper, ci Interpreter, sanitizer Sanitizer) *VaultPersistence { +func NewVaultPersistence(writeDB WriteDB, readDB *sql.DB, tables VaultTables, errorWrapper driver2.SQLErrorWrapper, ci Interpreter, sanitizer Sanitizer) *VaultPersistence { return &VaultPersistence{ tables: tables, errorWrapper: errorWrapper, @@ -44,7 +44,7 @@ type VaultPersistence struct { tables VaultTables errorWrapper driver2.SQLErrorWrapper readDB *sql.DB - writeDB *sql.DB + writeDB WriteDB ci Interpreter lockManager *vaultLockManager sanitizer *sanitizer diff --git a/platform/view/services/db/driver/sql/postgres/bench_test.go b/platform/view/services/db/driver/sql/postgres/bench_test.go index 69cdd816c..339391b05 100644 --- a/platform/view/services/db/driver/sql/postgres/bench_test.go +++ b/platform/view/services/db/driver/sql/postgres/bench_test.go @@ -10,7 +10,7 @@ import ( "testing" "time" - "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/dbtest" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver/sql/common" ) func BenchmarkReadExistingPostgres(b *testing.B) { @@ -25,7 +25,7 @@ func BenchmarkReadExistingPostgres(b *testing.B) { } defer db.Close() - dbtest.ReadExisting(b, db) + common.ReadExisting(b, db) } func BenchmarkReadNonExistingPostgres(b *testing.B) { @@ -40,7 +40,7 @@ func BenchmarkReadNonExistingPostgres(b *testing.B) { } defer db.Close() - dbtest.ReadNonExisting(b, db) + common.ReadNonExisting(b, db) } func BenchmarkWriteOnePostgres(b *testing.B) { @@ -55,7 +55,7 @@ func BenchmarkWriteOnePostgres(b *testing.B) { } defer db.Close() - dbtest.WriteOne(b, db) + common.WriteOne(b, db) } func BenchmarkWriteManyPostgres(b *testing.B) { @@ -70,7 +70,7 @@ func BenchmarkWriteManyPostgres(b *testing.B) { } defer db.Close() - dbtest.WriteMany(b, db) + common.WriteMany(b, db) } func BenchmarkWriteManyPostgresWithIdle(b *testing.B) { @@ -85,5 +85,5 @@ func BenchmarkWriteManyPostgresWithIdle(b *testing.B) { } defer db.Close() - dbtest.WriteParallel(b, db) + common.WriteParallel(b, db) } diff --git a/platform/view/services/db/driver/sql/sqlite/bench_test.go b/platform/view/services/db/driver/sql/sqlite/bench_test.go index 6737a7dd1..8ae9a713a 100644 --- a/platform/view/services/db/driver/sql/sqlite/bench_test.go +++ b/platform/view/services/db/driver/sql/sqlite/bench_test.go @@ -9,8 +9,8 @@ package sqlite import ( "testing" - "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/dbtest" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver/sql/common" "github.com/stretchr/testify/assert" ) @@ -19,7 +19,7 @@ func BenchmarkReadExistingSqlite(b *testing.B) { assert.NoError(b, err) defer db.Close() - dbtest.ReadExisting(b, db) + common.ReadExisting(b, db) } func BenchmarkReadNonExistingSqlite(b *testing.B) { @@ -27,7 +27,7 @@ func BenchmarkReadNonExistingSqlite(b *testing.B) { assert.NoError(b, err) defer db.Close() - dbtest.ReadNonExisting(b, db) + common.ReadNonExisting(b, db) } func BenchmarkWriteOneSqlite(b *testing.B) { @@ -35,7 +35,7 @@ func BenchmarkWriteOneSqlite(b *testing.B) { assert.NoError(b, err) defer db.Close() - dbtest.WriteOne(b, db) + common.WriteOne(b, db) } func BenchmarkWriteManySqlite(b *testing.B) { @@ -43,7 +43,7 @@ func BenchmarkWriteManySqlite(b *testing.B) { assert.NoError(b, err) defer db.Close() - dbtest.WriteMany(b, db) + common.WriteMany(b, db) } func newVersionedPersistence(dir string) (driver.UnversionedPersistence, error) { diff --git a/platform/view/services/db/driver/sql/sqlite/binding.go b/platform/view/services/db/driver/sql/sqlite/binding.go index 60f8c1d73..70aa22cd5 100644 --- a/platform/view/services/db/driver/sql/sqlite/binding.go +++ b/platform/view/services/db/driver/sql/sqlite/binding.go @@ -19,7 +19,7 @@ import ( type BindingPersistence struct { *common.BindingPersistence table string - writeDB *sql.DB + writeDB common.WriteDB errorWrapper driver.SQLErrorWrapper } @@ -28,10 +28,10 @@ func NewBindingPersistence(opts common.Opts, table string) (*BindingPersistence, if err != nil { return nil, fmt.Errorf("error opening db: %w", err) } - return newBindingPersistence(readDB, writeDB, table), nil + return newBindingPersistence(readDB, newRetryWriteDB(writeDB), table), nil } -func newBindingPersistence(readDB, writeDB *sql.DB, table string) *BindingPersistence { +func newBindingPersistence(readDB *sql.DB, writeDB common.WriteDB, table string) *BindingPersistence { errorWrapper := &errorMapper{} return &BindingPersistence{ BindingPersistence: common.NewBindingPersistence(readDB, writeDB, table, errorWrapper, NewInterpreter()), diff --git a/platform/view/services/db/driver/sql/sqlite/endorsetx.go b/platform/view/services/db/driver/sql/sqlite/endorsetx.go index 058a61c2f..7c865b4ce 100644 --- a/platform/view/services/db/driver/sql/sqlite/endorsetx.go +++ b/platform/view/services/db/driver/sql/sqlite/endorsetx.go @@ -22,9 +22,9 @@ func NewEndorseTxPersistence(opts common.Opts, table string) (*EndorseTxPersiste if err != nil { return nil, fmt.Errorf("error opening db: %w", err) } - return newEndorseTxPersistence(readDB, writeDB, table), nil + return newEndorseTxPersistence(readDB, newRetryWriteDB(writeDB), table), nil } -func newEndorseTxPersistence(readDB, writeDB *sql.DB, table string) *EndorseTxPersistence { - return &EndorseTxPersistence{EndorseTxPersistence: common.NewEndorseTxPersistence(readDB, writeDB, table, &errorMapper{}, NewInterpreter())} +func newEndorseTxPersistence(readDB *sql.DB, writeDB common.WriteDB, table string) *EndorseTxPersistence { + return &EndorseTxPersistence{EndorseTxPersistence: common.NewEndorseTxPersistence(writeDB, readDB, table, &errorMapper{}, NewInterpreter())} } diff --git a/platform/view/services/db/driver/sql/sqlite/envelope.go b/platform/view/services/db/driver/sql/sqlite/envelope.go index d17e52cd9..de1b156eb 100644 --- a/platform/view/services/db/driver/sql/sqlite/envelope.go +++ b/platform/view/services/db/driver/sql/sqlite/envelope.go @@ -22,9 +22,9 @@ func NewEnvelopePersistence(opts common.Opts, table string) (*EnvelopePersistenc if err != nil { return nil, fmt.Errorf("error opening db: %w", err) } - return newEnvelopePersistence(readDB, writeDB, table), nil + return newEnvelopePersistence(readDB, newRetryWriteDB(writeDB), table), nil } -func newEnvelopePersistence(readDB, writeDB *sql.DB, table string) *EnvelopePersistence { - return &EnvelopePersistence{EnvelopePersistence: common.NewEnvelopePersistence(readDB, writeDB, table, &errorMapper{}, NewInterpreter())} +func newEnvelopePersistence(readDB *sql.DB, writeDB common.WriteDB, table string) *EnvelopePersistence { + return &EnvelopePersistence{EnvelopePersistence: common.NewEnvelopePersistence(writeDB, readDB, table, &errorMapper{}, NewInterpreter())} } diff --git a/platform/view/services/db/driver/sql/sqlite/errormapper.go b/platform/view/services/db/driver/sql/sqlite/errormapper.go index c8013f0a7..b87cb02cd 100644 --- a/platform/view/services/db/driver/sql/sqlite/errormapper.go +++ b/platform/view/services/db/driver/sql/sqlite/errormapper.go @@ -19,6 +19,8 @@ func (m *errorMapper) WrapError(err error) error { switch err.Code() { case 1555: return errors.Wrapf(driver.UniqueKeyViolation, "%s", err) + case 5: + return errors.Wrapf(driver.SqlBusy, "%s", err) default: logger.Warnf("Unmapped sqlite error with code [%d]", err.Code()) } diff --git a/platform/view/services/db/driver/sql/sqlite/metadata.go b/platform/view/services/db/driver/sql/sqlite/metadata.go index f08fe4312..fdaa16c4b 100644 --- a/platform/view/services/db/driver/sql/sqlite/metadata.go +++ b/platform/view/services/db/driver/sql/sqlite/metadata.go @@ -22,9 +22,9 @@ func NewMetadataPersistence(opts common.Opts, table string) (*MetadataPersistenc if err != nil { return nil, fmt.Errorf("error opening db: %w", err) } - return newMetadataPersistence(readDB, writeDB, table), nil + return newMetadataPersistence(readDB, newRetryWriteDB(writeDB), table), nil } -func newMetadataPersistence(readDB, writeDB *sql.DB, table string) *MetadataPersistence { - return &MetadataPersistence{MetadataPersistence: common.NewMetadataPersistence(readDB, writeDB, table, &errorMapper{}, NewInterpreter())} +func newMetadataPersistence(readDB *sql.DB, writeDB common.WriteDB, table string) *MetadataPersistence { + return &MetadataPersistence{MetadataPersistence: common.NewMetadataPersistence(writeDB, readDB, table, &errorMapper{}, NewInterpreter())} } diff --git a/platform/view/services/db/driver/sql/sqlite/retrywritedb.go b/platform/view/services/db/driver/sql/sqlite/retrywritedb.go new file mode 100644 index 000000000..2c47910e0 --- /dev/null +++ b/platform/view/services/db/driver/sql/sqlite/retrywritedb.go @@ -0,0 +1,42 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package sqlite + +import ( + "context" + "database/sql" + + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver" + "github.com/pkg/errors" +) + +func newRetryWriteDB(db *sql.DB) *retryWriteDB { + return &retryWriteDB{ + DB: db, + errorWrapper: &errorMapper{}, + } +} + +type retryWriteDB struct { + *sql.DB + + errorWrapper driver.SQLErrorWrapper +} + +func (db *retryWriteDB) Exec(query string, args ...any) (sql.Result, error) { + return db.ExecContext(context.Background(), query, args...) +} + +func (db *retryWriteDB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { + res, err := db.DB.ExecContext(ctx, query, args...) + if err != nil && errors.Is(db.errorWrapper.WrapError(err), driver.SqlBusy) { + // TODO: AF Maybe limit the amount of retries + logger.Warnf("Sql busy. Retrying query [%s]...", query) + return db.ExecContext(ctx, query, args...) + } + return res, err +} diff --git a/platform/view/services/db/driver/sql/sqlite/signerinfo.go b/platform/view/services/db/driver/sql/sqlite/signerinfo.go index d2a924032..a1c2359e3 100644 --- a/platform/view/services/db/driver/sql/sqlite/signerinfo.go +++ b/platform/view/services/db/driver/sql/sqlite/signerinfo.go @@ -22,9 +22,9 @@ func NewSignerInfoPersistence(opts common.Opts, table string) (*SignerInfoPersis if err != nil { return nil, fmt.Errorf("error opening db: %w", err) } - return newSignerInfoPersistence(readDB, writeDB, table), nil + return newSignerInfoPersistence(readDB, newRetryWriteDB(writeDB), table), nil } -func newSignerInfoPersistence(readDB, writeDB *sql.DB, table string) *SignerInfoPersistence { - return &SignerInfoPersistence{SignerInfoPersistence: common.NewSignerInfoPersistence(readDB, writeDB, table, &errorMapper{}, NewInterpreter())} +func newSignerInfoPersistence(readDB *sql.DB, writeDB common.WriteDB, table string) *SignerInfoPersistence { + return &SignerInfoPersistence{SignerInfoPersistence: common.NewSignerInfoPersistence(writeDB, readDB, table, &errorMapper{}, NewInterpreter())} } diff --git a/platform/view/services/db/driver/sql/sqlite/unversioned.go b/platform/view/services/db/driver/sql/sqlite/unversioned.go index 200d41a97..1fe861758 100644 --- a/platform/view/services/db/driver/sql/sqlite/unversioned.go +++ b/platform/view/services/db/driver/sql/sqlite/unversioned.go @@ -36,7 +36,7 @@ func NewUnversionedNotifier(opts common.Opts, table string) (*notifier.Unversion return notifier.NewUnversioned(newUnversioned(readDB, writeDB, table)), nil } -func newUnversioned(readDB, writeDB *sql.DB, table string) *UnversionedPersistence { +func newUnversioned(readDB *sql.DB, writeDB common.WriteDB, table string) *UnversionedPersistence { var wrapper driver.SQLErrorWrapper = &errorMapper{} return &UnversionedPersistence{ UnversionedPersistence: common.NewUnversionedPersistence(writeDB, readDB, table, wrapper, NewInterpreter()), diff --git a/platform/view/services/db/driver/sql/sqlite/vault.go b/platform/view/services/db/driver/sql/sqlite/vault.go index f6e183966..b612318f4 100644 --- a/platform/view/services/db/driver/sql/sqlite/vault.go +++ b/platform/view/services/db/driver/sql/sqlite/vault.go @@ -23,7 +23,7 @@ type VaultPersistence struct { *common.VaultPersistence tables common.VaultTables - writeDB *sql.DB + writeDB common.WriteDB ci common.Interpreter } @@ -33,16 +33,16 @@ func NewVaultPersistence(opts common.Opts, tablePrefix string) (*VaultPersistenc if err != nil { return nil, fmt.Errorf("error opening db: %w", err) } - return newTxCodePersistence(readDB, writeDB, common.VaultTables{ + return newTxCodePersistence(readDB, newRetryWriteDB(writeDB), common.VaultTables{ StateTable: fmt.Sprintf("%s_state", tablePrefix), StatusTable: fmt.Sprintf("%s_status", tablePrefix), }), nil } -func newTxCodePersistence(readDB, writeDB *sql.DB, tables common.VaultTables) *VaultPersistence { +func newTxCodePersistence(readDB *sql.DB, writeDB common.WriteDB, tables common.VaultTables) *VaultPersistence { ci := NewInterpreter() return &VaultPersistence{ - VaultPersistence: common.NewVaultPersistence(readDB, writeDB, tables, &errorMapper{}, ci, newSanitizer()), + VaultPersistence: common.NewVaultPersistence(writeDB, readDB, tables, &errorMapper{}, ci, newSanitizer()), tables: tables, writeDB: writeDB, ci: ci,