Skip to content

Commit

Permalink
Retry when sql busy
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandros Filios <[email protected]>
  • Loading branch information
alexandrosfilios committed Feb 5, 2025
1 parent 9a925ca commit 4320ed0
Show file tree
Hide file tree
Showing 26 changed files with 143 additions and 78 deletions.
23 changes: 18 additions & 5 deletions platform/fabric/services/endorser/endorsement.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,34 @@ func (c *collectEndorsementsView) Call(context view.Context) (interface{}, error
vProviders = append(vProviders, &verifierProviderWrapper{m: mspManager})

// Get results to send
span.AddEvent("Get results to send")
res, err := c.tx.Results()
if err != nil {
return nil, errors.Wrapf(err, "failed getting tx results")
}

// Contact sequantially all parties.
logger.Debugf("Collect Endorsements from [%d] parties [%v]", len(c.parties), c.parties)
span.AddEvent("Start collecting endorsement")
for _, party := range c.parties {
span.AddEvent("start_collect_endorsement")
span.AddEvent("Collect endorsement iteration")
logger.Debugf("Collect Endorsements On Simulation from [%s]", party)

var err error
if context.IsMe(party) {
span.AddEvent("Start endorsing locally")
logger.Debugf("This is me %s, endorse locally.", party)
// Endorse it
err = c.tx.EndorseWithIdentity(party)
if err != nil {
return nil, errors.Wrap(err, "failed endorsing transaction")
}
span.AddEvent("Finish endorsing locally")
continue
}

var txRaw []byte
span.AddEvent("Get transaction bytes")
if c.deleteTransient {
txRaw, err = c.tx.BytesNoTransient()
if err != nil {
Expand All @@ -75,24 +80,26 @@ func (c *collectEndorsementsView) Call(context view.Context) (interface{}, error
}
}

span.AddEvent("Get session with party")
session, err := context.GetSession(context.Initiator(), party)
if err != nil {
return nil, errors.Wrap(err, "failed getting session")
}

// Get a channel to receive the answer
span.AddEvent("Start receiving from party")
ch := session.Receive()

// Send transaction
span.AddEvent("send_tx")
span.AddEvent("Send raw transaction to party")
err = session.SendWithContext(context.Context(), txRaw)
if err != nil {
return nil, errors.Wrap(err, "failed sending transaction content")
}

timeout := time.NewTimer(time.Minute)

span.AddEvent("wait_tx")
span.AddEvent("Wait for transaction")
// Wait for the answer
var msg *view.Message
select {
Expand All @@ -102,7 +109,7 @@ func (c *collectEndorsementsView) Call(context view.Context) (interface{}, error
timeout.Stop()
return nil, errors.Errorf("Timeout from party %s", party)
}
span.AddEvent("receive_tx")
span.AddEvent("Received transaction from party")
if msg.Status == view.ERROR {
return nil, errors.New(string(msg.Payload))
}
Expand All @@ -122,6 +129,7 @@ func (c *collectEndorsementsView) Call(context view.Context) (interface{}, error
}
tm := fns.TransactionManager()
for _, response := range responses {
span.AddEvent("Parse proposal response")
proposalResponse, err := tm.NewProposalResponseFromBytes(response)
if err != nil {
return nil, errors.Wrap(err, "failed unmarshalling received proposal response")
Expand All @@ -130,15 +138,18 @@ func (c *collectEndorsementsView) Call(context view.Context) (interface{}, error
endorser := view.Identity(proposalResponse.Endorser())

// Check the validity of the response
span.AddEvent("Check response validity")
if view2.GetEndpointService(context).IsBoundTo(endorser, party) {
found = true
}

// TODO: check the verifier providers, if any
verified := false
span.AddEvent("Start verification with providers")
for _, provider := range vProviders {
span.AddEvent("verify_endorsement")
span.AddEvent("Verify endorsement")
err := proposalResponse.VerifyEndorsement(provider)
span.AddEvent("Verified endorsement")
if err == nil {
logger.Debugf("endorsement [%s] is valid", endorser)
verified = true
Expand All @@ -156,7 +167,9 @@ func (c *collectEndorsementsView) Call(context view.Context) (interface{}, error
}

logger.Debugf("append response from party [%s]", party)
span.AddEvent("Append proposal response")
err = c.tx.AppendProposalResponse(proposalResponse)
span.AddEvent("Appended proposal response")
if err != nil {
return nil, errors.Wrap(err, "failed appending received proposal response")
}
Expand Down
2 changes: 2 additions & 0 deletions platform/view/services/db/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ var (
UniqueKeyViolation = errors.New("unique key violation")
// DeadlockDetected happens when two transactions are taking place at the same time and interact with the same rows
DeadlockDetected = errors.New("deadlock detected")
// SqlBusy happens when two transactions are trying to write at the same time. Can be avoided by opening the database in exclusive mode
SqlBusy = errors.New("sql is busy")
)

type SQLError = error
Expand Down
4 changes: 2 additions & 2 deletions platform/view/services/db/driver/sql/common/auditinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/pkg/errors"
)

func NewAuditInfoPersistence(writeDB *sql.DB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *AuditInfoPersistence {
func NewAuditInfoPersistence(writeDB WriteDB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *AuditInfoPersistence {
return &AuditInfoPersistence{
table: table,
errorWrapper: errorWrapper,
Expand All @@ -29,7 +29,7 @@ type AuditInfoPersistence struct {
table string
errorWrapper driver.SQLErrorWrapper
readDB *sql.DB
writeDB *sql.DB
writeDB WriteDB
ci Interpreter
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package dbtest
package common

import (
"fmt"
Expand Down
4 changes: 2 additions & 2 deletions platform/view/services/db/driver/sql/common/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/pkg/errors"
)

func NewBindingPersistence(writeDB *sql.DB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *BindingPersistence {
func NewBindingPersistence(readDB *sql.DB, writeDB WriteDB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *BindingPersistence {
return &BindingPersistence{
table: table,
errorWrapper: errorWrapper,
Expand All @@ -29,7 +29,7 @@ type BindingPersistence struct {
table string
errorWrapper driver.SQLErrorWrapper
readDB *sql.DB
writeDB *sql.DB
writeDB WriteDB
ci Interpreter
}

Expand Down
2 changes: 1 addition & 1 deletion platform/view/services/db/driver/sql/common/endorsetx.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver"
)

func NewEndorseTxPersistence(writeDB *sql.DB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *EndorseTxPersistence {
func NewEndorseTxPersistence(writeDB WriteDB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *EndorseTxPersistence {
return &EndorseTxPersistence{p: newSimpleKeyDataPersistence(writeDB, readDB, table, errorWrapper, ci)}
}

Expand Down
2 changes: 1 addition & 1 deletion platform/view/services/db/driver/sql/common/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver"
)

func NewEnvelopePersistence(writeDB *sql.DB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *EnvelopePersistence {
func NewEnvelopePersistence(writeDB WriteDB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *EnvelopePersistence {
return &EnvelopePersistence{p: newSimpleKeyDataPersistence(writeDB, readDB, table, errorWrapper, ci)}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package dbtest
package common

import (
"database/sql"
Expand Down Expand Up @@ -59,12 +59,12 @@ var UnversionedNotifierCases = []struct {

var ErrorCases = []struct {
Name string
Fn func(t *testing.T, readDB *sql.DB, writeDB *sql.DB, errorWrapper driver.SQLErrorWrapper, table string)
Fn func(t *testing.T, readDB *sql.DB, writeDB WriteDB, errorWrapper driver.SQLErrorWrapper, table string)
}{
{"Duplicate", TTestDuplicate},
}

func TTestDuplicate(t *testing.T, _ *sql.DB, writeDB *sql.DB, errorWrapper driver.SQLErrorWrapper, table string) {
func TTestDuplicate(t *testing.T, _ *sql.DB, writeDB WriteDB, errorWrapper driver.SQLErrorWrapper, table string) {
ns := "namespace"

tx, err := writeDB.Begin()
Expand Down Expand Up @@ -201,7 +201,7 @@ func TTestSimpleReadWrite(t *testing.T, db driver.UnversionedPersistence) {
assert.NoError(t, err)
assert.Equal(t, driver.UnversionedValue("val1"), vv)

// delete state
// deleteOp state
err = db.BeginUpdate()
assert.NoError(t, err)
err = db.DeleteState(ns, key)
Expand Down Expand Up @@ -857,24 +857,24 @@ func TTestUnversionedNotifierSimple(t *testing.T, db driver.UnversionedNotifier)

results, err := waitForResults(ch, 3, 1*time.Second)
assert.NoError(t, err)
assert.Equal(t, []notifyEvent{{upsert, "ns", "key"}, {upsert, "ns", "key"}, {delete, "ns", "key"}}, results)
assert.Equal(t, []notifyEvent{{upsertOp, "ns", "key"}, {upsertOp, "ns", "key"}, {deleteOp, "ns", "key"}}, results)
}

type opType int

const (
unknown opType = iota
delete
upsert
unknownOp opType = iota
deleteOp
upsertOp
)

// We treat update/inserts as the same, because we don't need the operation type.
// Distinguishing the two cases for sqlite would require more logic.
var opTypeMap = map[driver.Operation]opType{
driver.Unknown: unknown,
driver.Update: upsert,
driver.Insert: upsert,
driver.Delete: delete,
driver.Unknown: unknownOp,
driver.Update: upsertOp,
driver.Insert: upsertOp,
driver.Delete: deleteOp,
}

type notifier interface {
Expand Down
2 changes: 1 addition & 1 deletion platform/view/services/db/driver/sql/common/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver"
)

func NewMetadataPersistence(writeDB *sql.DB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *MetadataPersistence {
func NewMetadataPersistence(writeDB WriteDB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *MetadataPersistence {
return &MetadataPersistence{p: newSimpleKeyDataPersistence(writeDB, readDB, table, errorWrapper, ci)}
}

Expand Down
19 changes: 10 additions & 9 deletions platform/view/services/db/driver/sql/common/signerinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/pkg/errors"
)

func NewSignerInfoPersistence(writeDB *sql.DB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *SignerInfoPersistence {
func NewSignerInfoPersistence(writeDB WriteDB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *SignerInfoPersistence {
return &SignerInfoPersistence{
table: table,
errorWrapper: errorWrapper,
Expand All @@ -29,7 +29,7 @@ type SignerInfoPersistence struct {
table string
errorWrapper driver.SQLErrorWrapper
readDB *sql.DB
writeDB *sql.DB
writeDB WriteDB
ci Interpreter
}

Expand All @@ -49,7 +49,7 @@ func (db *SignerInfoPersistence) FilterExistingSigners(ids ...view.Identity) ([]
if err != nil {
return nil, errors.Wrapf(err, "error querying db")
}
defer func() { _ = rows.Close() }()
defer rows.Close()

existingSigners := make([]view.Identity, 0)
for rows.Next() {
Expand All @@ -67,15 +67,16 @@ func (db *SignerInfoPersistence) PutSigner(id view.Identity) error {
query := fmt.Sprintf("INSERT INTO %s (id) VALUES ($1)", db.table)
logger.Debug(query, id)
_, err := db.writeDB.Exec(query, id.UniqueID())
if err != nil && errors.Is(db.errorWrapper.WrapError(err), driver.UniqueKeyViolation) {
logger.Warnf("Signer [%s] already in db. Skipping...", id)
if err == nil {
logger.Debugf("Signer [%s] registered", id)
return nil
}
if err != nil {
return errors.Wrapf(err, "failed executing query [%s]", query)
if errors.Is(db.errorWrapper.WrapError(err), driver.UniqueKeyViolation) {
logger.Warnf("Signer [%s] already in db. Skipping...", id)
return nil
}
logger.Debugf("Signer [%s] registered", id)
return nil

return errors.Wrapf(err, "failed executing query [%s]", query)
}

func (db *SignerInfoPersistence) CreateSchema() error {
Expand Down
4 changes: 2 additions & 2 deletions platform/view/services/db/driver/sql/common/simplekeydata.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/pkg/errors"
)

func newSimpleKeyDataPersistence(writeDB *sql.DB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *simpleKeyDataPersistence {
func newSimpleKeyDataPersistence(writeDB WriteDB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *simpleKeyDataPersistence {
return &simpleKeyDataPersistence{
table: table,
errorWrapper: errorWrapper,
Expand All @@ -28,7 +28,7 @@ type simpleKeyDataPersistence struct {
table string
errorWrapper driver.SQLErrorWrapper
readDB *sql.DB
writeDB *sql.DB
writeDB WriteDB
ci Interpreter
}

Expand Down
7 changes: 3 additions & 4 deletions platform/view/services/db/driver/sql/common/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package common
import (
"testing"

"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/dbtest"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/db/driver"
_ "modernc.org/sqlite"
)
Expand All @@ -20,7 +19,7 @@ func TestCases(t *testing.T,
unversionedProvider provider[driver.UnversionedPersistence],
unversionedNotifierProvider provider[driver.UnversionedNotifier],
baseUnpacker func(p driver.UnversionedPersistence) *UnversionedPersistence) {
for _, c := range dbtest.UnversionedCases {
for _, c := range UnversionedCases {
un, err := unversionedProvider(c.Name)
if err != nil {
t.Fatal(err)
Expand All @@ -30,7 +29,7 @@ func TestCases(t *testing.T,
c.Fn(xt, un)
})
}
for _, c := range dbtest.ErrorCases {
for _, c := range ErrorCases {
un, err := unversionedProvider(c.Name)
if err != nil {
t.Fatal(err)
Expand All @@ -41,7 +40,7 @@ func TestCases(t *testing.T,
c.Fn(xt, b.readDB, b.writeDB, b.errorWrapper, b.table)
})
}
for _, c := range dbtest.UnversionedNotifierCases {
for _, c := range UnversionedNotifierCases {
un, err := unversionedNotifierProvider(c.Name)
if err != nil {
t.Fatal(err)
Expand Down
8 changes: 4 additions & 4 deletions platform/view/services/db/driver/sql/common/unversioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ var logger = logging.MustGetLogger("view-sdk.db.driver.sql")

type UnversionedPersistence struct {
*common.BaseDB[*sql.Tx]
writeDB *sql.DB
writeDB WriteDB
readDB *sql.DB
table string

errorWrapper driver.SQLErrorWrapper
ci Interpreter
}

func NewUnversionedPersistence(writeDB *sql.DB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *UnversionedPersistence {
func NewUnversionedPersistence(writeDB WriteDB, readDB *sql.DB, table string, errorWrapper driver.SQLErrorWrapper, ci Interpreter) *UnversionedPersistence {
return &UnversionedPersistence{
BaseDB: common.NewBaseDB(func() (*sql.Tx, error) { return writeDB.Begin() }),
readDB: readDB,
Expand Down Expand Up @@ -142,7 +142,7 @@ func (db *UnversionedPersistence) DeleteStatesWithTx(tx *sql.Tx, namespace drive
if err != nil {
errs := make(map[driver2.PKey]error)
for _, key := range keys {
errs[key] = errors.Wrapf(db.errorWrapper.WrapError(err), "could not delete val for key [%s]", key)
errs[key] = errors.Wrapf(db.errorWrapper.WrapError(err), "could not deleteOp val for key [%s]", key)
}
return errs
}
Expand Down Expand Up @@ -170,7 +170,7 @@ func (db *UnversionedPersistence) SetStateWithTx(tx *sql.Tx, ns driver2.Namespac

val = append([]byte(nil), val...)

// Portable upsert
// Portable upsertOp
exists, err := db.exists(tx, ns, pkey)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 4320ed0

Please sign in to comment.