From 91ec7b2dab529a463e399888c83f1356c1377033 Mon Sep 17 00:00:00 2001 From: Mitja T Date: Mon, 30 Jan 2023 16:56:47 +0100 Subject: [PATCH] genesis-test: Create DB snapshot using full tx isolation --- storage/api.go | 6 ++++++ storage/postgres/client.go | 6 +++++- tests/genesis/genesis_test.go | 6 +++++- 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/storage/api.go b/storage/api.go index 8565c930e..7760213d0 100644 --- a/storage/api.go +++ b/storage/api.go @@ -40,6 +40,9 @@ type QueryResults = pgx.Rows // QueryResult represents the result from a read query. type QueryResult = pgx.Row +// TxOptions encodes the way DB transactions are executed. +type TxOptions = pgx.TxOptions + // Queue adds query to a batch. func (b *QueryBatch) Queue(cmd string, args ...interface{}) { b.items = append(b.items, &batchItem{ @@ -283,6 +286,9 @@ type TargetStorage interface { // SendBatch sends a batch of queries to be applied to target storage. SendBatch(ctx context.Context, batch *QueryBatch) error + // SendBatchWithOptions is like SendBatch, with custom DB options (e.g. level of tx isolation). + SendBatchWithOptions(ctx context.Context, batch *QueryBatch, opts TxOptions) error + // Query submits a query to fetch data from target storage. Query(ctx context.Context, sql string, args ...interface{}) (QueryResults, error) diff --git a/storage/postgres/client.go b/storage/postgres/client.go index afb52d520..07e91e93d 100644 --- a/storage/postgres/client.go +++ b/storage/postgres/client.go @@ -89,8 +89,12 @@ func NewClient(connString string, l *log.Logger) (*Client, error) { // by any indexer. We only care about atomic success or failure of the batch of queries // corresponding to a new block. func (c *Client) SendBatch(ctx context.Context, batch *storage.QueryBatch) error { + return c.SendBatchWithOptions(ctx, batch, pgx.TxOptions{}) +} + +func (c *Client) SendBatchWithOptions(ctx context.Context, batch *storage.QueryBatch, opts pgx.TxOptions) error { pgxBatch := batch.AsPgxBatch() - if err := c.pool.BeginTxFunc(ctx, pgx.TxOptions{}, func(tx pgx.Tx) error { + if err := c.pool.BeginTxFunc(ctx, opts, func(tx pgx.Tx) error { batchResults := tx.SendBatch(ctx, &pgxBatch) defer common.CloseOrLog(batchResults, c.logger) for i := 0; i < pgxBatch.Len(); i++ { diff --git a/tests/genesis/genesis_test.go b/tests/genesis/genesis_test.go index 289a0d11c..1ea01c534 100644 --- a/tests/genesis/genesis_test.go +++ b/tests/genesis/genesis_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/iancoleman/strcase" + "github.com/jackc/pgx/v4" "github.com/oasisprotocol/oasis-core/go/common/entity" "github.com/oasisprotocol/oasis-core/go/common/node" genesisAPI "github.com/oasisprotocol/oasis-core/go/genesis/api" @@ -147,7 +148,10 @@ func checkpointBackends(t *testing.T, source *oasis.ConsensusClient, target *pos ON CONFLICT DO NOTHING; `, chainID, chainID)) - if err := target.SendBatch(ctx, batch); err != nil { + // Create the snapshot using a high level of isolation; we don't want another + // tx to be able to modify the tables while this is running, creating a snapshot that + // represents indexer state at two (or more) blockchain heights. + if err := target.SendBatchWithOptions(ctx, batch, pgx.TxOptions{IsoLevel: pgx.Serializable}); err != nil { return 0, err }