Skip to content

Commit

Permalink
bulker: open transaction right before working with db instead of 1st …
Browse files Browse the repository at this point in the history
…message consumed.
  • Loading branch information
absorbb committed Dec 13, 2024
1 parent 9c6bb53 commit 6981f07
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 10 deletions.
20 changes: 19 additions & 1 deletion bulkerlib/implementations/sql/abstract_transactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,23 @@ func (ps *AbstractTransactionalSQLStream) init(ctx context.Context) (err error)
if err != nil {
return err
}
return nil
}

func (ps *AbstractTransactionalSQLStream) initTx(ctx context.Context) (err error) {
err = ps.init(ctx)
if err != nil {
return err
}
if ps.tx == nil {
if err = ps.sqlAdapter.Ping(ctx); err != nil {
return err
}
ps.tx, err = ps.sqlAdapter.OpenTx(ctx)
if err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -171,6 +181,10 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (s
ps.eventsInBatch = 0
}()
if ps.batchFile != nil && ps.eventsInBatch > 0 {
err = ps.initTx(ctx)
if err != nil {
return state, errorj.Decorate(err, "failed to init transaction")
}
_, err = ps.sqlAdapter.TableHelper().EnsureTableWithoutCaching(ctx, ps.tx, ps.id, table)
if err != nil {
return state, errorj.Decorate(err, "failed to create table")
Expand Down Expand Up @@ -424,6 +438,10 @@ func (ps *AbstractTransactionalSQLStream) writeToBatchFile(ctx context.Context,
func (ps *AbstractTransactionalSQLStream) insert(ctx context.Context, targetTable *Table, processedObject types.Object) (err error) {
ps.adjustTables(ctx, targetTable, processedObject)
ps.updateRepresentationTable(ps.tmpTable)
err = ps.initTx(ctx)
if err != nil {
return errorj.Decorate(err, "failed to init transaction")
}
ps.tmpTable, err = ps.sqlAdapter.TableHelper().EnsureTableWithoutCaching(ctx, ps.tx, ps.id, ps.tmpTable)
if err != nil {
return errorj.Decorate(err, "failed to ensure table")
Expand Down
1 change: 1 addition & 0 deletions bulkerlib/implementations/sql/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func NewClickHouse(bulkerConfig bulkerlib.Config) (bulkerlib.Bulker, error) {
return nil, err
}
dataSource.SetMaxIdleConns(10)
dataSource.SetConnMaxLifetime(time.Minute * 10)
dataSource.SetConnMaxIdleTime(time.Minute * 3)
}
return dataSource, nil
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/replacepartition_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (ps *ReplacePartitionStream) Complete(ctx context.Context) (state bulker.St
if ps.state.LastError == nil {
//we have to clear all previous data even if no objects was consumed
//if stream was empty we need to open transaction.
if err = ps.init(ctx); err != nil {
if err = ps.initTx(ctx); err != nil {
return
}
err = ps.clearPartition(ctx, ps.tx)
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/replacetable_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (ps *ReplaceTableStream) Complete(ctx context.Context) (state bulker.State,
//truncation seems like a more straightforward approach.
if ps.sqlAdapter.Type() == PostgresBulkerTypeId {
// workaround for neon search_path issue
err = ps.init(ctx)
err = ps.initTx(ctx)
if err == nil {
var table *Table
table, err = ps.tx.GetTableSchema(ctx, ps.namespace, ps.tableName)
Expand Down
7 changes: 0 additions & 7 deletions bulkerlib/implementations/sql/transactional_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,6 @@ func newTransactionalStream(id string, p SQLAdapter, tableName string, streamOpt
return &ps, nil
}

func (ps *TransactionalStream) init(ctx context.Context) (err error) {
if ps.inited {
return nil
}
return ps.AbstractTransactionalSQLStream.init(ctx)
}

func (ps *TransactionalStream) Complete(ctx context.Context) (state bulker.State, err error) {
if ps.state.Status != bulker.Active {
return ps.state, errors.New("stream is not active")
Expand Down

0 comments on commit 6981f07

Please sign in to comment.