diff --git a/bulkerlib/implementations/sql/abstract.go b/bulkerlib/implementations/sql/abstract.go index bf634d8..de855fc 100644 --- a/bulkerlib/implementations/sql/abstract.go +++ b/bulkerlib/implementations/sql/abstract.go @@ -94,12 +94,12 @@ func (ps *AbstractSQLStream) postComplete(err error) (bulker.State, error) { } func (ps *AbstractSQLStream) init(ctx context.Context) error { - if ps.inited { - return nil - } if err := ps.sqlAdapter.Ping(ctx); err != nil { return err } + if ps.inited { + return nil + } //setup required db object like 'schema' or 'dataset' if doesn't exist err := ps.sqlAdapter.InitDatabase(ctx) if err != nil { diff --git a/bulkerlib/implementations/sql/postgres.go b/bulkerlib/implementations/sql/postgres.go index ad11900..ec07b18 100644 --- a/bulkerlib/implementations/sql/postgres.go +++ b/bulkerlib/implementations/sql/postgres.go @@ -50,6 +50,7 @@ FROM information_schema.table_constraints tco WHERE tco.constraint_type = 'PRIMARY KEY' AND kcu.table_schema = $1 AND kcu.table_name = $2` + pgSetSearchPath = `SET search_path TO "%s";` pgCreateDbSchemaIfNotExistsTemplate = `CREATE SCHEMA IF NOT EXISTS "%s"; SET search_path TO "%s";` pgCreateIndexTemplate = `CREATE INDEX ON %s (%s);` @@ -512,6 +513,16 @@ func (p *Postgres) createIndex(ctx context.Context, table *Table) error { return nil } +func (p *Postgres) Ping(ctx context.Context) error { + err := p.SQLAdapterBase.Ping(ctx) + if err != nil { + return err + } + if _, err = p.txOrDb(ctx).ExecContext(ctx, fmt.Sprintf(pgSetSearchPath, p.config.Schema)); err != nil { + return err + } + return nil +} // Close underlying sql.DB func (p *Postgres) Close() error {