Skip to content

Commit

Permalink
bulker: autocommit: run Ping before every message
Browse files Browse the repository at this point in the history
bulker: postgres: set search_path on ping
  • Loading branch information
absorbb committed Dec 30, 2023
1 parent 9705fca commit 1dfa68c
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
6 changes: 3 additions & 3 deletions bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ func (ps *AbstractSQLStream) postComplete(err error) (bulker.State, error) {
}

func (ps *AbstractSQLStream) init(ctx context.Context) error {
if ps.inited {
return nil
}
if err := ps.sqlAdapter.Ping(ctx); err != nil {
return err
}
if ps.inited {
return nil
}
//setup required db object like 'schema' or 'dataset' if doesn't exist
err := ps.sqlAdapter.InitDatabase(ctx)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions bulkerlib/implementations/sql/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ FROM information_schema.table_constraints tco
WHERE tco.constraint_type = 'PRIMARY KEY' AND
kcu.table_schema = $1 AND
kcu.table_name = $2`
pgSetSearchPath = `SET search_path TO "%s";`
pgCreateDbSchemaIfNotExistsTemplate = `CREATE SCHEMA IF NOT EXISTS "%s"; SET search_path TO "%s";`
pgCreateIndexTemplate = `CREATE INDEX ON %s (%s);`

Expand Down Expand Up @@ -512,6 +513,16 @@ func (p *Postgres) createIndex(ctx context.Context, table *Table) error {

return nil
}
func (p *Postgres) Ping(ctx context.Context) error {
err := p.SQLAdapterBase.Ping(ctx)
if err != nil {
return err
}
if _, err = p.txOrDb(ctx).ExecContext(ctx, fmt.Sprintf(pgSetSearchPath, p.config.Schema)); err != nil {
return err
}
return nil
}

// Close underlying sql.DB
func (p *Postgres) Close() error {
Expand Down

0 comments on commit 1dfa68c

Please sign in to comment.