Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: 'database is locked' error after some time without calling the API #71

Merged
merged 4 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions api/api.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package api

import (
"database/sql"
"encoding/json"

"github.com/vocdoni/census3/census"
queries "github.com/vocdoni/census3/db/sqlc"
"github.com/vocdoni/census3/db"
"go.vocdoni.io/dvote/httprouter"
api "go.vocdoni.io/dvote/httprouter/apirest"
"go.vocdoni.io/dvote/log"
Expand All @@ -21,18 +20,16 @@ type Census3APIConf struct {

type census3API struct {
conf Census3APIConf
db *sql.DB
sqlc *queries.Queries
db *db.DB
endpoint *api.API
censusDB *census.CensusDB
w3p map[int64]string
}

func Init(db *sql.DB, q *queries.Queries, conf Census3APIConf) error {
func Init(db *db.DB, conf Census3APIConf) error {
newAPI := &census3API{
conf: conf,
db: db,
sqlc: q,
w3p: conf.Web3Providers,
}
// get the current chainID
Expand Down
19 changes: 4 additions & 15 deletions api/censuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,7 @@ func (capi *census3API) getCensus(msg *api.APIdata, ctx *httprouter.HTTPContext)
}
internalCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// begin a transaction for group sql queries
tx, err := capi.db.BeginTx(internalCtx, nil)
if err != nil {
return ErrCantGetCensus
}
defer func() {
if err := tx.Rollback(); err != nil {
log.Errorw(err, "holders transaction rollback failed")
}
}()
qtx := capi.sqlc.WithTx(tx)
currentCensus, err := qtx.CensusByID(internalCtx, int64(censusID))
currentCensus, err := capi.db.QueriesRO.CensusByID(internalCtx, int64(censusID))
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNotFoundCensus
Expand Down Expand Up @@ -90,7 +79,7 @@ func (capi *census3API) createAndPublishCensus(msg *api.APIdata, ctx *httprouter
defer cancel()

// begin a transaction for group sql queries
tx, err := capi.db.BeginTx(internalCtx, nil)
tx, err := capi.db.RW.BeginTx(internalCtx, nil)
if err != nil {
return ErrCantCreateCensus
}
Expand All @@ -99,7 +88,7 @@ func (capi *census3API) createAndPublishCensus(msg *api.APIdata, ctx *httprouter
log.Errorw(err, "holders transaction rollback failed")
}
}()
qtx := capi.sqlc.WithTx(tx)
qtx := capi.db.QueriesRW.WithTx(tx)

strategyTokens, err := qtx.TokensByStrategyID(internalCtx, int64(req.StrategyID))
if err != nil {
Expand Down Expand Up @@ -210,7 +199,7 @@ func (capi *census3API) getStrategyCensuses(msg *api.APIdata, ctx *httprouter.HT
// get censuses by this strategy ID
internalCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
rows, err := capi.sqlc.CensusByStrategyID(internalCtx, int64(strategyID))
rows, err := capi.db.QueriesRO.CensusByStrategyID(internalCtx, int64(strategyID))
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNotFoundCensus
Expand Down
4 changes: 2 additions & 2 deletions api/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (capi *census3API) getTokenHolders(msg *api.APIdata, ctx *httprouter.HTTPCo

// get token holders from the database
addr := common.HexToAddress(ctx.URLParam("address"))
dbHolders, err := capi.sqlc.TokenHoldersByTokenID(ctx2, addr.Bytes())
dbHolders, err := capi.db.QueriesRO.TokenHoldersByTokenID(ctx2, addr.Bytes())
if err != nil {
// if database does not contain any token holder for this token, return
// no content, else return generic error.
Expand Down Expand Up @@ -77,7 +77,7 @@ func (capi *census3API) countHolders(msg *api.APIdata, ctx *httprouter.HTTPConte
defer cancel()

addr := common.HexToAddress(ctx.URLParam("address"))
numberOfHolders, err := capi.sqlc.CountTokenHoldersByTokenID(ctx2, addr.Bytes())
numberOfHolders, err := capi.db.QueriesRO.CountTokenHoldersByTokenID(ctx2, addr.Bytes())
if err != nil {
if errors.Is(sql.ErrNoRows, err) {
log.Errorf("no holders found for address %s: %s", addr, err.Error())
Expand Down
12 changes: 6 additions & 6 deletions api/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ func (capi *census3API) initStrategiesHandlers() error {
func (capi *census3API) createDummyStrategy(tokenID []byte) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
res, err := capi.sqlc.CreateStategy(ctx, "test")
res, err := capi.db.QueriesRW.CreateStategy(ctx, "test")
if err != nil {
return err
}
strategyID, err := res.LastInsertId()
if err != nil {
return err
}
_, err = capi.sqlc.CreateStrategyToken(ctx, queries.CreateStrategyTokenParams{
_, err = capi.db.QueriesRW.CreateStrategyToken(ctx, queries.CreateStrategyTokenParams{
StrategyID: strategyID,
TokenID: tokenID,
MinBalance: big.NewInt(0).Bytes(),
Expand All @@ -62,7 +62,7 @@ func (capi *census3API) getStrategies(msg *api.APIdata, ctx *httprouter.HTTPCont
defer cancel()
// TODO: Support for pagination
// get strategies from the database
rows, err := capi.sqlc.ListStrategies(internalCtx)
rows, err := capi.db.QueriesRO.ListStrategies(internalCtx)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNoStrategies
Expand Down Expand Up @@ -100,7 +100,7 @@ func (capi *census3API) getStrategy(msg *api.APIdata, ctx *httprouter.HTTPContex
// get strategy from the database
internalCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
strategyData, err := capi.sqlc.StrategyByID(internalCtx, int64(strategyID))
strategyData, err := capi.db.QueriesRO.StrategyByID(internalCtx, int64(strategyID))
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNotFoundStrategy
Expand All @@ -115,7 +115,7 @@ func (capi *census3API) getStrategy(msg *api.APIdata, ctx *httprouter.HTTPContex
Tokens: []GetStrategyToken{},
}
// get information of the strategy related tokens
tokensData, err := capi.sqlc.TokensByStrategyID(internalCtx, strategyData.ID)
tokensData, err := capi.db.QueriesRO.TokensByStrategyID(internalCtx, strategyData.ID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
log.Errorw(ErrCantGetTokens, err.Error())
return ErrCantGetTokens
Expand Down Expand Up @@ -147,7 +147,7 @@ func (capi *census3API) getTokenStrategies(msg *api.APIdata, ctx *httprouter.HTT
internalCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
// get strategies associated to the token provided
rows, err := capi.sqlc.StrategiesByTokenID(internalCtx, common.HexToAddress(tokenID).Bytes())
rows, err := capi.db.QueriesRO.StrategiesByTokenID(internalCtx, common.HexToAddress(tokenID).Bytes())
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNoStrategies
Expand Down
12 changes: 6 additions & 6 deletions api/tokens.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (capi *census3API) getTokens(msg *api.APIdata, ctx *httprouter.HTTPContext)
defer cancel()
// TODO: Support for pagination
// get tokens from the database
rows, err := capi.sqlc.ListTokens(internalCtx)
rows, err := capi.db.QueriesRO.ListTokens(internalCtx)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return ErrNoTokens
Expand Down Expand Up @@ -134,7 +134,7 @@ func (capi *census3API) createToken(msg *api.APIdata, ctx *httprouter.HTTPContex
return ErrCantGetToken
}
}
_, err = capi.sqlc.CreateToken(internalCtx, queries.CreateTokenParams{
_, err = capi.db.QueriesRW.CreateToken(internalCtx, queries.CreateTokenParams{
ID: info.Address.Bytes(),
Name: *name,
Symbol: *symbol,
Expand Down Expand Up @@ -168,7 +168,7 @@ func (capi *census3API) getToken(msg *api.APIdata, ctx *httprouter.HTTPContext)
address := common.HexToAddress(ctx.URLParam("tokenID"))
internalCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
tokenData, err := capi.sqlc.TokenByID(internalCtx, address.Bytes())
tokenData, err := capi.db.QueriesRO.TokenByID(internalCtx, address.Bytes())
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
log.Errorw(ErrNotFoundToken, err.Error())
Expand All @@ -178,7 +178,7 @@ func (capi *census3API) getToken(msg *api.APIdata, ctx *httprouter.HTTPContext)
return ErrCantGetToken
}
// TODO: Only for the MVP, consider to remove it
tokenStrategies, err := capi.sqlc.StrategiesByTokenID(internalCtx, tokenData.ID)
tokenStrategies, err := capi.db.QueriesRO.StrategiesByTokenID(internalCtx, tokenData.ID)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
log.Errorw(ErrCantGetToken, err.Error())
return ErrCantGetToken
Expand All @@ -188,7 +188,7 @@ func (capi *census3API) getToken(msg *api.APIdata, ctx *httprouter.HTTPContext)
defaultStrategyID = uint64(tokenStrategies[0].ID)
}
// get last block with token information
atBlock, err := capi.sqlc.LastBlockByTokenID(internalCtx, address.Bytes())
atBlock, err := capi.db.QueriesRO.LastBlockByTokenID(internalCtx, address.Bytes())
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
log.Errorw(ErrCantGetToken, err.Error())
Expand Down Expand Up @@ -221,7 +221,7 @@ func (capi *census3API) getToken(msg *api.APIdata, ctx *httprouter.HTTPContext)
// get token holders count
countHoldersCtx, cancel2 := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel2()
holders, err := capi.sqlc.CountTokenHoldersByTokenID(countHoldersCtx, address.Bytes())
holders, err := capi.db.QueriesRO.CountTokenHoldersByTokenID(countHoldersCtx, address.Bytes())
if err != nil {
return ErrCantGetTokenCount
}
Expand Down
12 changes: 9 additions & 3 deletions cmd/census3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func main() {
flag.Parse()
log.Init(*logLevel, "stdout", nil)

db, q, err := db.Init(*dataDir)
database, err := db.Init(*dataDir)
if err != nil {
log.Fatal(err)
}
Expand All @@ -43,13 +43,13 @@ func main() {
log.Info(w3p)

// Start the holder scanner
hc, err := service.NewHoldersScanner(db, q, w3p)
hc, err := service.NewHoldersScanner(database, w3p)
if err != nil {
log.Fatal(err)
}

// Start the API
err = api.Init(db, q, api.Census3APIConf{
err = api.Init(database, api.Census3APIConf{
Hostname: "0.0.0.0",
Port: *port,
DataDir: *dataDir,
Expand All @@ -69,6 +69,12 @@ func main() {
log.Warnf("received SIGTERM, exiting at %s", time.Now().Format(time.RFC850))
cancel()
log.Infof("waiting for routines to end gracefully...")
// closing database
go func() {
if err := database.Close(); err != nil {
log.Fatal(err)
}
}()
time.Sleep(5 * time.Second)
os.Exit(0)
}
65 changes: 56 additions & 9 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"path/filepath"
"time"

_ "github.com/mattn/go-sqlite3"
"github.com/pressly/goose/v3"
Expand All @@ -15,27 +16,73 @@ import (
//go:embed migrations/*.sql
var migrationsFS embed.FS

func Init(dataDir string) (*sql.DB, *queries.Queries, error) {
// DB struct abstact a safe connection with the database using sqlc queries,
// sqlite as a database engine and go-sqlite3 as a driver.
type DB struct {
RW *sql.DB
RO *sql.DB

QueriesRW *queries.Queries
QueriesRO *queries.Queries
}

// Close function stops all internal connections to the database
func (db *DB) Close() error {
if err := db.RW.Close(); err != nil {
return err
}
return db.RO.Close()
}

// Init function starts a database using the data path provided as argument. It
// opens two different connections, one for read only, and another for read and
// write, with different configurations, optimized for each use case.
func Init(dataDir string) (*DB, error) {
dbFile := filepath.Join(dataDir, "census3.sql")
if _, err := os.Stat(dbFile); os.IsNotExist(err) {
if err := os.MkdirAll(dataDir, os.ModePerm); err != nil {
return nil, nil, fmt.Errorf("error creating a new database file: %w", err)
return nil, fmt.Errorf("error creating a new database file: %w", err)
}
}
// open database file
database, err := sql.Open("sqlite3", dbFile)
// sqlite doesn't support multiple concurrent writers.
// For that reason, rwDB is limited to one open connection.
// Per https://github.com/mattn/go-sqlite3/issues/1022#issuecomment-1067353980,
// we use WAL to allow multiple concurrent readers at the same time.
rwDB, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=rwc&_journal_mode=wal&_txlock=immediate&_synchronous=normal", dbFile))
if err != nil {
return nil, nil, fmt.Errorf("error opening database: %w", err)
return nil, fmt.Errorf("error opening database: %w", err)
}
rwDB.SetMaxOpenConns(1)
rwDB.SetMaxIdleConns(2)
rwDB.SetConnMaxIdleTime(10 * time.Minute)
rwDB.SetConnMaxLifetime(time.Hour)

roDB, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?mode=ro&_journal_mode=wal", dbFile))
if err != nil {
return nil, fmt.Errorf("error opening database: %w", err)
}
// Increasing these numbers can allow for more queries to run concurrently,
// but it also increases the memory used by sqlite and our connection pool.
// Most read-only queries we run are quick enough, so a small number seems OK.
roDB.SetMaxOpenConns(10)
roDB.SetMaxIdleConns(20)
roDB.SetConnMaxIdleTime(5 * time.Minute)
roDB.SetConnMaxLifetime(time.Hour)

// get census3 goose migrations and setup for sqlite3
if err := goose.SetDialect("sqlite3"); err != nil {
return nil, nil, fmt.Errorf("error setting up driver for sqlite: %w", err)
return nil, fmt.Errorf("error setting up driver for sqlite: %w", err)
}
goose.SetBaseFS(migrationsFS)
// perform goose up
if err := goose.Up(database, "migrations"); err != nil {
return nil, nil, fmt.Errorf("error during goose up: %w", err)
if err := goose.Up(rwDB, "migrations"); err != nil {
return nil, fmt.Errorf("error during goose up: %w", err)
}
// init sqlc
return database, queries.New(database), nil
return &DB{
RW: rwDB,
RO: roDB,
QueriesRW: queries.New(rwDB),
QueriesRO: queries.New(roDB),
}, nil
}
12 changes: 6 additions & 6 deletions service/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,23 @@ var (
)

type TestDB struct {
dir string
db *sql.DB
queries *queries.Queries
dir string
db *db.DB
}

func StartTestDB(t *testing.T) *TestDB {
c := qt.New(t)

dir := t.TempDir()
db, q, err := db.Init(dir)
db, err := db.Init(dir)
c.Assert(err, qt.IsNil)
return &TestDB{dir, db, q}
return &TestDB{dir, db}
}

func (testdb *TestDB) Close(t *testing.T) {
c := qt.New(t)
c.Assert(testdb.db.Close(), qt.IsNil)
c.Assert(testdb.db.RW.Close(), qt.IsNil)
c.Assert(testdb.db.RO.Close(), qt.IsNil)
c.Assert(os.RemoveAll(testdb.dir), qt.IsNil)
}

Expand Down
Loading