Skip to content

Commit

Permalink
feat: add connection pooling configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored and flemzord committed May 12, 2023
1 parent eb77e84 commit d3001af
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 33 deletions.
2 changes: 1 addition & 1 deletion cmd/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func resolveOptions(output io.Writer, userOptions ...fx.Option) []fx.Option {
api.Module(api.Config{
Version: Version,
}),
sqlstorage.CLIDriverModule(v, output),
sqlstorage.CLIDriverModule(v, output, debug),
internal.NewAnalyticsModule(v, Version),
ledger.Module(ledger.Configuration{
AllowPastTimestamp: v.GetString(commitPolicyFlag) == "allow-past-timestamps",
Expand Down
45 changes: 25 additions & 20 deletions pkg/storage/sqlstorage/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package sqlstorage

import (
"io"
"time"

"github.com/formancehq/ledger/pkg/storage"
ledgerstore "github.com/formancehq/ledger/pkg/storage/sqlstorage/ledger"
"github.com/formancehq/ledger/pkg/storage/sqlstorage/schema"
"github.com/formancehq/ledger/pkg/storage/sqlstorage/utils"
"github.com/formancehq/ledger/pkg/storage/sqlstorage/worker"
"github.com/formancehq/stack/libs/go-libs/health"
"github.com/formancehq/stack/libs/go-libs/service"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/uptrace/bun"
Expand All @@ -20,6 +20,9 @@ const (
StoreWorkerMaxPendingSize = "store-worker-max-pending-size"
StoreWorkerMaxWriteChanSize = "store-worker-max-write-chan-size"
StoragePostgresConnectionStringFlag = "storage-postgres-conn-string"
StoragePostgresMaxIdleConnsFlag = "storage-postgres-max-idle-conns"
StoragePostgresConnMaxIdleTimeFlag = "storage-postgres-conn-max-idle-time"
StoragePostgresMaxOpenConns = "storage-postgres-max-open-conns"
)

// TODO(gfyrag): maybe move flag handling inside cmd/internal (as telemetry flags)
Expand All @@ -29,42 +32,44 @@ func InitCLIFlags(cmd *cobra.Command) {
cmd.PersistentFlags().Int(StoreWorkerMaxPendingSize, 0, "Max pending size for store worker")
cmd.PersistentFlags().Int(StoreWorkerMaxWriteChanSize, 1024, "Max write channel size for store worker")
cmd.PersistentFlags().String(StoragePostgresConnectionStringFlag, "postgresql://localhost/postgres", "Postgres connection string")
cmd.PersistentFlags().Int(StoragePostgresMaxIdleConnsFlag, 20, "Max idle connections to database")
cmd.PersistentFlags().Duration(StoragePostgresConnMaxIdleTimeFlag, time.Minute, "Max idle time of idle connections")
cmd.PersistentFlags().Int(StoragePostgresMaxOpenConns, 20, "Max open connections")
}

type PostgresConfig struct {
ConnString string
}

type ModuleConfig struct {
PostgresConfig *PostgresConfig
StoreConfig ledgerstore.StoreConfig
Debug bool
PostgresConnectionOptions utils.ConnectionOptions
StoreConfig ledgerstore.StoreConfig
Debug bool
}

func CLIDriverModule(v *viper.Viper, output io.Writer) fx.Option {
cfg := ModuleConfig{
PostgresConfig: &PostgresConfig{
ConnString: v.GetString(StoragePostgresConnectionStringFlag),
},
StoreConfig: ledgerstore.StoreConfig{
StoreWorkerConfig: worker.Config{
MaxPendingSize: v.GetInt(StoreWorkerMaxPendingSize),
MaxWriteChanSize: v.GetInt(StoreWorkerMaxWriteChanSize),
},
},
Debug: viper.GetBool(service.DebugFlag),
}
func CLIDriverModule(v *viper.Viper, output io.Writer, debug bool) fx.Option {

options := make([]fx.Option, 0)

options = append(options, fx.Provide(func() (*bun.DB, error) {
return utils.OpenSQLDB(cfg.PostgresConfig.ConnString, cfg.Debug, output)
return utils.OpenSQLDB(utils.ConnectionOptions{
DatabaseSourceName: v.GetString(StoragePostgresConnectionStringFlag),
Debug: debug,
Writer: output,
MaxIdleConns: v.GetInt(StoragePostgresMaxIdleConnsFlag),
ConnMaxIdleTime: v.GetDuration(StoragePostgresConnMaxIdleTimeFlag),
MaxOpenConns: v.GetInt(StoragePostgresMaxOpenConns),
})
}))
options = append(options, fx.Provide(func(db *bun.DB) schema.DB {
return schema.NewPostgresDB(db)
}))
options = append(options, fx.Provide(func(db schema.DB) (*Driver, error) {
return NewDriver("postgres", db, cfg.StoreConfig), nil
return NewDriver("postgres", db, ledgerstore.StoreConfig{
StoreWorkerConfig: worker.Config{
MaxPendingSize: v.GetInt(StoreWorkerMaxPendingSize),
MaxWriteChanSize: v.GetInt(StoreWorkerMaxWriteChanSize),
},
}), nil
}))
options = append(options, health.ProvideHealthCheck(func(db *bun.DB) health.NamedCheck {
return health.NewNamedCheck("postgres", health.CheckFn(db.PingContext))
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/sqlstorage/ledger/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ func newLedgerStore(t *testing.T) *ledgerstore.Store {
t.Helper()

pgServer := pgtesting.NewPostgresDatabase(t)
db, err := utils.OpenSQLDB(pgServer.ConnString(), testing.Verbose(), os.Stdout)
db, err := utils.OpenSQLDB(utils.ConnectionOptions{
DatabaseSourceName: pgServer.ConnString(),
Debug: testing.Verbose(),
})
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, db.Close())
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/sqlstorage/migrations/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ func TestMigrationsOrders(t *testing.T) {

func TestMigrates(t *testing.T) {
pgServer := pgtesting.NewPostgresDatabase(t)
sqlDB, err := utils.OpenSQLDB(pgServer.ConnString(), testing.Verbose(), os.Stdout)
sqlDB, err := utils.OpenSQLDB(utils.ConnectionOptions{
DatabaseSourceName: pgServer.ConnString(),
Debug: testing.Verbose(),
})
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/sqlstorage/pagination/pagination_column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pagination_test

import (
"context"
"os"
"testing"

"github.com/formancehq/ledger/pkg/storage"
Expand All @@ -20,7 +19,10 @@ func ptr[T any](t T) *T {
func TestColumnPagination(t *testing.T) {

pgServer := pgtesting.NewPostgresDatabase(t)
db, err := utils.OpenSQLDB(pgServer.ConnString(), testing.Verbose(), os.Stdout)
db, err := utils.OpenSQLDB(utils.ConnectionOptions{
DatabaseSourceName: pgServer.ConnString(),
Debug: testing.Verbose(),
})
require.NoError(t, err)

_, err = db.Exec(`
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/sqlstorage/pagination/pagination_offset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pagination_test

import (
"context"
"os"
"testing"

"github.com/formancehq/ledger/pkg/storage"
Expand All @@ -15,7 +14,10 @@ import (
func TestOffsetPagination(t *testing.T) {

pgServer := pgtesting.NewPostgresDatabase(t)
db, err := utils.OpenSQLDB(pgServer.ConnString(), testing.Verbose(), os.Stdout)
db, err := utils.OpenSQLDB(utils.ConnectionOptions{
DatabaseSourceName: pgServer.ConnString(),
Debug: testing.Verbose(),
})
require.NoError(t, err)

_, err = db.Exec(`
Expand Down
10 changes: 8 additions & 2 deletions pkg/storage/sqlstorage/sqlstoragetesting/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package sqlstoragetesting

import (
"context"
"os"
"testing"
"time"

"github.com/formancehq/ledger/pkg/storage/sqlstorage"
ledgerstore "github.com/formancehq/ledger/pkg/storage/sqlstorage/ledger"
Expand All @@ -16,7 +16,13 @@ import (
func StorageDriver(t pgtesting.TestingT) *sqlstorage.Driver {
pgServer := pgtesting.NewPostgresDatabase(t)

db, err := utils.OpenSQLDB(pgServer.ConnString(), testing.Verbose(), os.Stdout)
db, err := utils.OpenSQLDB(utils.ConnectionOptions{
DatabaseSourceName: pgServer.ConnString(),
Debug: testing.Verbose(),
MaxIdleConns: 40,
MaxOpenConns: 40,
ConnMaxIdleTime: time.Minute,
})
require.NoError(t, err)

t.Cleanup(func() {
Expand Down
32 changes: 28 additions & 4 deletions pkg/storage/sqlstorage/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,47 @@ package utils
import (
"database/sql"
"io"
"os"
"time"

"github.com/uptrace/bun"
"github.com/uptrace/bun/dialect/pgdialect"
"github.com/uptrace/bun/extra/bundebug"
)

func OpenSQLDB(dataSourceName string, debug bool, w io.Writer) (*bun.DB, error) {
sqldb, err := sql.Open("postgres", dataSourceName)
type ConnectionOptions struct {
DatabaseSourceName string
Debug bool
Writer io.Writer
MaxIdleConns int
MaxOpenConns int
ConnMaxIdleTime time.Duration
}

func OpenSQLDB(options ConnectionOptions) (*bun.DB, error) {
sqldb, err := sql.Open("postgres", options.DatabaseSourceName)
if err != nil {
return nil, err
}
if options.MaxIdleConns != 0 {
sqldb.SetMaxIdleConns(options.MaxIdleConns)
}
if options.ConnMaxIdleTime != 0 {
sqldb.SetConnMaxIdleTime(options.ConnMaxIdleTime)
}
if options.MaxOpenConns != 0 {
sqldb.SetMaxOpenConns(options.MaxOpenConns)
}

db := bun.NewDB(sqldb, pgdialect.New())
if debug {
if options.Debug {
writer := options.Writer
if writer == nil {
writer = os.Stdout
}
db.AddQueryHook(bundebug.NewQueryHook(
bundebug.WithVerbose(true),
bundebug.WithWriter(w),
bundebug.WithWriter(writer),
))
}

Expand Down

0 comments on commit d3001af

Please sign in to comment.