From 1a424ddc5d689329bfa35faf3fdf8826cfd9ad0d Mon Sep 17 00:00:00 2001 From: Ildar Nurislamov Date: Tue, 28 Nov 2023 14:53:16 +0400 Subject: [PATCH] bulker: bigquery: return data processing stats. bulker: fixes and cleanups --- bulkerapp/app/router.go | 2 +- bulkerlib/bulker.go | 15 +++++ .../implementations/file_storage/abstract.go | 2 +- .../file_storage/bulker_test.go | 2 +- bulkerlib/implementations/sql/abstract.go | 4 +- .../sql/abstract_transactional.go | 34 +++++----- bulkerlib/implementations/sql/bigdata_test.go | 2 +- bulkerlib/implementations/sql/bigquery.go | 64 +++++++++++-------- bulkerlib/implementations/sql/bulker_test.go | 4 +- bulkerlib/implementations/sql/clickhouse.go | 22 +++---- .../implementations/sql/mergewindow_test.go | 6 +- bulkerlib/implementations/sql/mysql.go | 30 ++++----- bulkerlib/implementations/sql/naming_test.go | 2 +- bulkerlib/implementations/sql/options.go | 10 +-- bulkerlib/implementations/sql/postgres.go | 26 ++++---- bulkerlib/implementations/sql/redshift.go | 16 ++--- .../sql/replacepartition_stream.go | 7 +- .../sql/replacetable_stream.go | 4 +- bulkerlib/implementations/sql/snowflake.go | 18 +++--- bulkerlib/implementations/sql/sql_adapter.go | 9 +-- .../sql/transactional_stream.go | 9 ++- .../sql/transactional_stream_test.go | 16 ++--- bulkerlib/options.go | 15 ++--- 23 files changed, 173 insertions(+), 146 deletions(-) diff --git a/bulkerapp/app/router.go b/bulkerapp/app/router.go index a2f0134..ebf381e 100644 --- a/bulkerapp/app/router.go +++ b/bulkerapp/app/router.go @@ -185,7 +185,7 @@ func (r *Router) BulkHandler(c *gin.Context) { } var streamOptions []bulker.StreamOption if len(pkeys) > 0 { - streamOptions = append(streamOptions, bulker.WithPrimaryKey(pkeys...), bulker.WithMergeRows()) + streamOptions = append(streamOptions, bulker.WithPrimaryKey(pkeys...), bulker.WithDeduplicate()) } destination.InitBulkerInstance() bulkerStream, err := destination.bulker.CreateStream(jobId, tableName, bulkMode, streamOptions...) diff --git a/bulkerlib/bulker.go b/bulkerlib/bulker.go index 16b2819..7ef0716 100644 --- a/bulkerlib/bulker.go +++ b/bulkerlib/bulker.go @@ -140,6 +140,21 @@ type State struct { ProcessedRows int `json:"processedRows"` SuccessfulRows int `json:"successfulRows"` ErrorRowIndex int `json:"errorRowIndex,omitempty"` + WarehouseState `json:",inline,omitempty"` +} + +type WarehouseState struct { + BytesProcessed int `json:"bytesProcessed,omitempty"` + EstimatedCost float64 `json:"estimatedCost,omitempty"` + AdditionalInfo map[string]any `json:",inline,omitempty"` +} + +func (ws *WarehouseState) Merge(second WarehouseState) { + ws.BytesProcessed += second.BytesProcessed + ws.EstimatedCost += second.EstimatedCost + for k, v := range second.AdditionalInfo { + ws.AdditionalInfo[k] = v + } } // SetError sets error to the state diff --git a/bulkerlib/implementations/file_storage/abstract.go b/bulkerlib/implementations/file_storage/abstract.go index 8ea3494..2004027 100644 --- a/bulkerlib/implementations/file_storage/abstract.go +++ b/bulkerlib/implementations/file_storage/abstract.go @@ -53,7 +53,7 @@ func newAbstractFileStorageStream(id string, p implementations2.FileAdapter, fil for _, option := range streamOptions { ps.options.Add(option) } - ps.merge = bulker.MergeRowsOption.Get(&ps.options) + ps.merge = bulker.DeduplicateOption.Get(&ps.options) pkColumns := bulker.PrimaryKeyOption.Get(&ps.options) if ps.merge && len(pkColumns) == 0 { return AbstractFileStorageStream{}, fmt.Errorf("MergeRows option requires primary key option. Please provide WithPrimaryKey option") diff --git a/bulkerlib/implementations/file_storage/bulker_test.go b/bulkerlib/implementations/file_storage/bulker_test.go index bafc5b6..de907e0 100644 --- a/bulkerlib/implementations/file_storage/bulker_test.go +++ b/bulkerlib/implementations/file_storage/bulker_test.go @@ -197,7 +197,7 @@ func TestBasics(t *testing.T) { {"_timestamp": constantTimeStr, "id": 1, "name": "test7"}, }, configIds: allBulkerConfigs, - streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithMergeRows()}, + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithDeduplicate()}, }, } for _, tt := range tests { diff --git a/bulkerlib/implementations/sql/abstract.go b/bulkerlib/implementations/sql/abstract.go index ca0b77f..b805b43 100644 --- a/bulkerlib/implementations/sql/abstract.go +++ b/bulkerlib/implementations/sql/abstract.go @@ -38,13 +38,13 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu for _, option := range streamOptions { ps.options.Add(option) } - ps.merge = bulker.MergeRowsOption.Get(&ps.options) + ps.merge = bulker.DeduplicateOption.Get(&ps.options) pkColumns := bulker.PrimaryKeyOption.Get(&ps.options) if ps.merge && len(pkColumns) == 0 { return nil, fmt.Errorf("MergeRows option requires primary key in the destination table. Please provide WithPrimaryKey option") } if ps.merge { - ps.mergeWindow = MergeWindow.Get(&ps.options) + ps.mergeWindow = DeduplicateWindow.Get(&ps.options) } var customFields = ColumnTypesOption.Get(&ps.options) diff --git a/bulkerlib/implementations/sql/abstract_transactional.go b/bulkerlib/implementations/sql/abstract_transactional.go index 89fe3cb..78e6b56 100644 --- a/bulkerlib/implementations/sql/abstract_transactional.go +++ b/bulkerlib/implementations/sql/abstract_transactional.go @@ -118,11 +118,11 @@ func (ps *AbstractTransactionalSQLStream) postComplete(ctx context.Context, err return ps.AbstractSQLStream.postComplete(err) } -func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (err error) { +func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (state bulker.WarehouseState, err error) { table := ps.tmpTable err = ps.tx.CreateTable(ctx, table) if err != nil { - return errorj.Decorate(err, "failed to create table") + return bulker.WarehouseState{}, errorj.Decorate(err, "failed to create table") } columns := table.SortedColumnNames() defer func() { @@ -135,11 +135,11 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (e }() if ps.eventsInBatch > 0 { if err != nil { - return errorj.Decorate(err, "failed to flush marshaller") + return bulker.WarehouseState{}, errorj.Decorate(err, "failed to flush marshaller") } err = ps.batchFile.Sync() if err != nil { - return errorj.Decorate(err, "failed to sync batch file") + return bulker.WarehouseState{}, errorj.Decorate(err, "failed to sync batch file") } workingFile := ps.batchFile needToConvert := false @@ -150,7 +150,7 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (e if len(ps.batchFileSkipLines) > 0 || needToConvert { workingFile, err = os.CreateTemp("", path.Base(ps.batchFile.Name())+"_2") if err != nil { - return errorj.Decorate(err, "failed to create tmp file for deduplication") + return bulker.WarehouseState{}, errorj.Decorate(err, "failed to create tmp file for deduplication") } defer func() { _ = workingFile.Close() @@ -159,12 +159,12 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (e if needToConvert { err = ps.targetMarshaller.Init(workingFile, columns) if err != nil { - return errorj.Decorate(err, "failed to write header for converted batch file") + return bulker.WarehouseState{}, errorj.Decorate(err, "failed to write header for converted batch file") } } file, err := os.Open(ps.batchFile.Name()) if err != nil { - return errorj.Decorate(err, "failed to open tmp file") + return bulker.WarehouseState{}, errorj.Decorate(err, "failed to open tmp file") } scanner := bufio.NewScanner(file) scanner.Buffer(make([]byte, 1024*100), 1024*1024*10) @@ -177,13 +177,13 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (e obj := make(map[string]any) err = dec.Decode(&obj) if err != nil { - return errorj.Decorate(err, "failed to decode json object from batch filer") + return bulker.WarehouseState{}, errorj.Decorate(err, "failed to decode json object from batch filer") } ps.targetMarshaller.Marshal(obj) } else { _, err = workingFile.Write(scanner.Bytes()) if err != nil { - return errorj.Decorate(err, "failed write to deduplication file") + return bulker.WarehouseState{}, errorj.Decorate(err, "failed write to deduplication file") } _, _ = workingFile.Write([]byte("\n")) } @@ -191,7 +191,7 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (e i++ } if err = scanner.Err(); err != nil { - return errorj.Decorate(err, "failed to read batch file") + return bulker.WarehouseState{}, errorj.Decorate(err, "failed to read batch file") } ps.targetMarshaller.Flush() workingFile.Sync() @@ -203,7 +203,7 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (e s3Config := s3BatchFileOption.Get(&ps.options) rFile, err := os.Open(workingFile.Name()) if err != nil { - return errorj.Decorate(err, "failed to open tmp file") + return bulker.WarehouseState{}, errorj.Decorate(err, "failed to open tmp file") } s3FileName := path.Base(workingFile.Name()) if s3Config.Folder != "" { @@ -211,21 +211,21 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (e } err = ps.s3.Upload(s3FileName, rFile) if err != nil { - return errorj.Decorate(err, "failed to upload file to s3") + return bulker.WarehouseState{}, errorj.Decorate(err, "failed to upload file to s3") } defer ps.s3.DeleteObject(s3FileName) - err = ps.tx.LoadTable(ctx, table, &LoadSource{Type: AmazonS3, Path: s3FileName, Format: ps.sqlAdapter.GetBatchFileFormat(), S3Config: s3Config}) + state, err = ps.tx.LoadTable(ctx, table, &LoadSource{Type: AmazonS3, Path: s3FileName, Format: ps.sqlAdapter.GetBatchFileFormat(), S3Config: s3Config}) if err != nil { - return errorj.Decorate(err, "failed to flush tmp file to the warehouse") + return state, errorj.Decorate(err, "failed to flush tmp file to the warehouse") } } else { - err = ps.tx.LoadTable(ctx, table, &LoadSource{Type: LocalFile, Path: workingFile.Name(), Format: ps.sqlAdapter.GetBatchFileFormat()}) + state, err = ps.tx.LoadTable(ctx, table, &LoadSource{Type: LocalFile, Path: workingFile.Name(), Format: ps.sqlAdapter.GetBatchFileFormat()}) if err != nil { - return errorj.Decorate(err, "failed to flush tmp file to the warehouse") + return state, errorj.Decorate(err, "failed to flush tmp file to the warehouse") } } } - return nil + return } //func (ps *AbstractTransactionalSQLStream) ensureSchema(ctx context.Context, targetTable **Table, tableForObject *Table, initTable func(ctx context.Context) (*Table, error)) (err error) { diff --git a/bulkerlib/implementations/sql/bigdata_test.go b/bulkerlib/implementations/sql/bigdata_test.go index a28f3b0..6ee48e3 100644 --- a/bulkerlib/implementations/sql/bigdata_test.go +++ b/bulkerlib/implementations/sql/bigdata_test.go @@ -69,7 +69,7 @@ func TestMillionRowsBatched(t *testing.T) { expectedRowsCount: eventsCount, leaveResultingTable: false, configIds: configIds, - streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithMergeRows()}, + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithDeduplicate()}, }, } for _, tt := range tests { diff --git a/bulkerlib/implementations/sql/bigquery.go b/bulkerlib/implementations/sql/bigquery.go index d90efc3..8e0f961 100644 --- a/bulkerlib/implementations/sql/bigquery.go +++ b/bulkerlib/implementations/sql/bigquery.go @@ -35,7 +35,7 @@ const ( BigQueryAutocommitUnsupported = "BigQuery bulker doesn't support auto commit mode as not efficient" BigqueryBulkerTypeId = "bigquery" - bigqueryInsertFromSelectTemplate = "INSERT %s SELECT * FROM %s" + bigqueryInsertFromSelectTemplate = "INSERT INTO %s(%s) SELECT %s FROM %s" bigqueryMergeTemplate = "MERGE INTO %s T USING %s S ON %s WHEN MATCHED THEN UPDATE SET %s WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s)" bigqueryDeleteTemplate = "DELETE FROM %s WHERE %s" bigqueryUpdateTemplate = "UPDATE %s SET %s WHERE %s" @@ -160,11 +160,11 @@ func (bq *BigQuery) validateOptions(streamOptions []bulker.StreamOption) error { return nil } -func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (err error) { +func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulker.WarehouseState, err error) { if mergeWindow <= 0 { defer func() { if err != nil { - err = errorj.CopyError.Wrap(err, "failed to run BQ copier"). + err = errorj.CopyError.Wrap(err, "failed to run CopyTables"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ Dataset: bq.config.Dataset, Bucket: bq.config.Bucket, @@ -177,15 +177,22 @@ func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTa copier := dataset.Table(targetTable.Name).CopierFrom(dataset.Table(sourceTable.Name)) copier.WriteDisposition = bigquery.WriteAppend copier.CreateDisposition = bigquery.CreateIfNeeded - _, err = bq.RunJob(ctx, copier, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name)) + _, state, err = bq.RunJob(ctx, copier, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name)) if err != nil { // try to insert from select as a fallback - insertFromSelectStatement := fmt.Sprintf(bigqueryInsertFromSelectTemplate, bq.fullTableName(targetTable.Name), bq.fullTableName(sourceTable.Name)) + columns := sourceTable.SortedColumnNames() + quotedColumns := make([]string, len(columns)) + for i, name := range columns { + quotedColumns[i] = bq.quotedColumnName(name) + } + columnsString := strings.Join(quotedColumns, ",") + insertFromSelectStatement := fmt.Sprintf(bigqueryInsertFromSelectTemplate, bq.fullTableName(targetTable.Name), columnsString, columnsString, bq.fullTableName(sourceTable.Name)) query := bq.client.Query(insertFromSelectStatement) - _, err = bq.RunJob(ctx, query, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name)) - return err + _, state2, err := bq.RunJob(ctx, query, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name)) + state.Merge(state2) + return state, err } else { - return nil + return state, nil } } else { defer func() { @@ -199,8 +206,12 @@ func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTa }) } }() - columns := sourceTable.SortedColumnNames() + quotedColumns := make([]string, len(columns)) + for i, name := range columns { + quotedColumns[i] = bq.quotedColumnName(name) + } + columnsString := strings.Join(quotedColumns, ",") updateSet := make([]string, len(columns)) for i, name := range columns { updateSet[i] = fmt.Sprintf("T.%s = S.%s", bq.quotedColumnName(name), bq.quotedColumnName(name)) @@ -214,17 +225,12 @@ func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTa monthBefore := timestamp.Now().Add(time.Duration(mergeWindow) * -24 * time.Hour).Format("2006-01-02") joinConditions = append(joinConditions, fmt.Sprintf("T.%s >= '%s'", bq.quotedColumnName(targetTable.TimestampColumn), monthBefore)) } - quotedColumns := make([]string, len(columns)) - for i, name := range columns { - quotedColumns[i] = bq.quotedColumnName(name) - } - columnsString := strings.Join(quotedColumns, ",") insertFromSelectStatement := fmt.Sprintf(bigqueryMergeTemplate, bq.fullTableName(targetTable.Name), bq.fullTableName(sourceTable.Name), strings.Join(joinConditions, " AND "), strings.Join(updateSet, ", "), columnsString, columnsString) query := bq.client.Query(insertFromSelectStatement) - _, err = bq.RunJob(ctx, query, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name)) - return err + _, state, err = bq.RunJob(ctx, query, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name)) + return state, err } } @@ -538,7 +544,7 @@ func (bq *BigQuery) Insert(ctx context.Context, table *Table, merge bool, object return nil } -func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (err error) { +func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error) { tableName := bq.TableName(targetTable.Name) defer func() { if err != nil { @@ -556,7 +562,7 @@ func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSourc file, err := os.Open(loadSource.Path) if err != nil { - return err + return state, err } bqTable := bq.client.Dataset(bq.config.Dataset).Table(tableName) meta, err := bqTable.Metadata(ctx) @@ -568,7 +574,7 @@ func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSourc } for i, field := range targetTable.SortedColumnNames() { if _, ok := mp[field]; !ok { - return fmt.Errorf("field %s is not in table schema", field) + return state, fmt.Errorf("field %s is not in table schema", field) } meta.Schema[i] = mp[field] } @@ -587,8 +593,8 @@ func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSourc loader := bq.client.Dataset(bq.config.Dataset).Table(tableName).LoaderFrom(source) loader.CreateDisposition = bigquery.CreateIfNeeded loader.WriteDisposition = bigquery.WriteAppend - _, err = bq.RunJob(ctx, loader, fmt.Sprintf("load into table '%s'", tableName)) - return err + _, state, err = bq.RunJob(ctx, loader, fmt.Sprintf("load into table '%s'", tableName)) + return state, err } // DropTable drops table from BigQuery @@ -637,7 +643,7 @@ func (bq *BigQuery) ReplaceTable(ctx context.Context, targetTableName string, re copier := dataset.Table(targetTableName).CopierFrom(dataset.Table(replacementTableName)) copier.WriteDisposition = bigquery.WriteTruncate copier.CreateDisposition = bigquery.CreateIfNeeded - _, err = bq.RunJob(ctx, copier, fmt.Sprintf("replace table '%s' with '%s'", targetTableName, replacementTableName)) + _, _, err = bq.RunJob(ctx, copier, fmt.Sprintf("replace table '%s' with '%s'", targetTableName, replacementTableName)) if err != nil { return err } @@ -777,7 +783,7 @@ func (bq *BigQuery) Update(ctx context.Context, tableName string, object types2. query := bq.client.Query(updateQuery) query.Parameters = values - _, err = bq.RunJob(ctx, query, fmt.Sprintf("update table '%s'", tableName)) + _, _, err = bq.RunJob(ctx, query, fmt.Sprintf("update table '%s'", tableName)) return err } @@ -818,7 +824,7 @@ func (bq *BigQuery) selectFrom(ctx context.Context, tableName string, selectExpr query := bq.client.Query(selectQuery) query.Parameters = values - job, err := bq.RunJob(ctx, query, fmt.Sprintf("select from table '%s'", tableName)) + job, _, err := bq.RunJob(ctx, query, fmt.Sprintf("select from table '%s'", tableName)) if err != nil { return nil, err } @@ -906,7 +912,7 @@ func (bq *BigQuery) Delete(ctx context.Context, tableName string, deleteConditio }() query := bq.client.Query(deleteQuery) query.Parameters = values - _, err = bq.RunJob(ctx, query, fmt.Sprintf("delete from table '%s'", tableName)) + _, _, err = bq.RunJob(ctx, query, fmt.Sprintf("delete from table '%s'", tableName)) return err } func (bq *BigQuery) Type() string { @@ -978,7 +984,7 @@ type JobRunner interface { Run(ctx context.Context) (*bigquery.Job, error) } -func (bq *BigQuery) RunJob(ctx context.Context, runner JobRunner, jobDescription string) (job *bigquery.Job, err error) { +func (bq *BigQuery) RunJob(ctx context.Context, runner JobRunner, jobDescription string) (job *bigquery.Job, state bulker.WarehouseState, err error) { defer func() { bq.logQuery(jobDescription, runner, err) }() @@ -994,6 +1000,8 @@ func (bq *BigQuery) RunJob(ctx context.Context, runner JobRunner, jobDescription } bytesProcessed := "" if status != nil && status.Statistics != nil { + state.BytesProcessed = int(status.Statistics.TotalBytesProcessed) + state.EstimatedCost = float64(status.Statistics.TotalBytesProcessed) * 6.25 / 1_000_000_000_000 bytesProcessed = fmt.Sprintf(" Bytes processed: %d", status.Statistics.TotalBytesProcessed) } if err != nil { @@ -1004,9 +1012,9 @@ func (bq *BigQuery) RunJob(ctx context.Context, runner JobRunner, jobDescription } err = errors.New(builder.String()) } - return job, fmt.Errorf("Failed to %s.%s Completed with error: %v%s", jobDescription, jobID, err, bytesProcessed) + return job, state, fmt.Errorf("Failed to %s.%s Completed with error: %v%s", jobDescription, jobID, err, bytesProcessed) } else { bq.Infof("Successfully %s.%s%s", jobDescription, jobID, bytesProcessed) - return job, nil + return job, state, nil } } diff --git a/bulkerlib/implementations/sql/bulker_test.go b/bulkerlib/implementations/sql/bulker_test.go index 2ee3cff..129883e 100644 --- a/bulkerlib/implementations/sql/bulker_test.go +++ b/bulkerlib/implementations/sql/bulker_test.go @@ -325,7 +325,7 @@ func TestBasics(t *testing.T) { }, expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported}, configIds: allBulkerConfigs, - streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithMergeRows()}, + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithDeduplicate()}, }, { name: "multi_pk", @@ -346,7 +346,7 @@ func TestBasics(t *testing.T) { expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported}, configIds: allBulkerConfigs, orderBy: []string{"id", "id2"}, - streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id", "id2"), bulker.WithMergeRows()}, + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id", "id2"), bulker.WithDeduplicate()}, }, { name: "timestamp_option", diff --git a/bulkerlib/implementations/sql/clickhouse.go b/bulkerlib/implementations/sql/clickhouse.go index d57eb06..083e489 100644 --- a/bulkerlib/implementations/sql/clickhouse.go +++ b/bulkerlib/implementations/sql/clickhouse.go @@ -540,12 +540,12 @@ func (ch *ClickHouse) Insert(ctx context.Context, table *Table, _ bool, objects } // LoadTable transfer data from local file to ClickHouse table -func (ch *ClickHouse) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (err error) { +func (ch *ClickHouse) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulkerlib.WarehouseState, err error) { if loadSource.Type != LocalFile { - return fmt.Errorf("LoadTable: only local file is supported") + return state, fmt.Errorf("LoadTable: only local file is supported") } if loadSource.Format != ch.batchFileFormat { - return fmt.Errorf("LoadTable: only %s format is supported", ch.batchFileFormat) + return state, fmt.Errorf("LoadTable: only %s format is supported", ch.batchFileFormat) } tableName := ch.quotedTableName(targetTable.Name) @@ -572,7 +572,7 @@ func (ch *ClickHouse) LoadTable(ctx context.Context, targetTable *Table, loadSou file, err := os.Open(loadSource.Path) if err != nil { - return err + return state, err } scanner := bufio.NewScanner(file) scanner.Buffer(make([]byte, 1024*100), 1024*1024*10) @@ -582,14 +582,14 @@ func (ch *ClickHouse) LoadTable(ctx context.Context, targetTable *Table, loadSou decoder.UseNumber() err = decoder.Decode(&object) if err != nil { - return err + return state, err } placeholdersBuilder.WriteString(",(") for i, v := range columns { column := targetTable.Columns[v] l, err := convertType(object[v], column) if err != nil { - return err + return state, err } //ch.Infof("%s: %v (%T) was %v", v, l, l, object[v]) if i > 0 { @@ -601,19 +601,19 @@ func (ch *ClickHouse) LoadTable(ctx context.Context, targetTable *Table, loadSou placeholdersBuilder.WriteString(")") } if err = scanner.Err(); err != nil { - return fmt.Errorf("LoadTable: failed to read file: %v", err) + return state, fmt.Errorf("LoadTable: failed to read file: %v", err) } if len(args) > 0 { copyStatement = fmt.Sprintf(chLoadStatement, tableName, strings.Join(columnNames, ", "), placeholdersBuilder.String()[1:]) if _, err := ch.txOrDb(ctx).ExecContext(ctx, copyStatement, args...); err != nil { - return checkErr(err) + return state, checkErr(err) } } - return nil + return state, nil } -func (ch *ClickHouse) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (err error) { - return ch.copy(ctx, targetTable, sourceTable) +func (ch *ClickHouse) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulkerlib.WarehouseState, err error) { + return state, ch.copy(ctx, targetTable, sourceTable) } func (ch *ClickHouse) Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) error { diff --git a/bulkerlib/implementations/sql/mergewindow_test.go b/bulkerlib/implementations/sql/mergewindow_test.go index 48cfe61..7119557 100644 --- a/bulkerlib/implementations/sql/mergewindow_test.go +++ b/bulkerlib/implementations/sql/mergewindow_test.go @@ -31,7 +31,7 @@ func TestMergeWindow(t *testing.T) { expectedRowsCount: 10, configIds: []string{BigqueryBulkerTypeId}, frozenTime: mergeWindowTestTime, - streamOptions: []bulker.StreamOption{bulker.WithTimestamp("_timestamp"), bulker.WithPrimaryKey("id"), bulker.WithMergeRows()}, + streamOptions: []bulker.StreamOption{bulker.WithTimestamp("_timestamp"), bulker.WithPrimaryKey("id"), bulker.WithDeduplicate()}, }, { name: "merge_window_default", @@ -56,7 +56,7 @@ func TestMergeWindow(t *testing.T) { {"_timestamp": timestamp.MustParseTime(time.RFC3339Nano, "2023-02-02T00:00:00.000Z"), "id": 9, "name": "test9B"}, {"_timestamp": timestamp.MustParseTime(time.RFC3339Nano, "2023-02-07T00:00:00.000Z"), "id": 10, "name": "test10B"}, }, - streamOptions: []bulker.StreamOption{bulker.WithTimestamp("_timestamp"), bulker.WithPrimaryKey("id"), bulker.WithMergeRows()}, + streamOptions: []bulker.StreamOption{bulker.WithTimestamp("_timestamp"), bulker.WithPrimaryKey("id"), bulker.WithDeduplicate()}, }, { name: "merge_window_6_days", @@ -83,7 +83,7 @@ func TestMergeWindow(t *testing.T) { {"_timestamp": timestamp.MustParseTime(time.RFC3339Nano, "2023-02-02T00:00:00.000Z"), "id": 9, "name": "test9C"}, {"_timestamp": timestamp.MustParseTime(time.RFC3339Nano, "2023-02-07T00:00:00.000Z"), "id": 10, "name": "test10C"}, }, - streamOptions: []bulker.StreamOption{bulker.WithTimestamp("_timestamp"), bulker.WithPrimaryKey("id"), bulker.WithMergeRows(), WithMergeWindow(5)}, + streamOptions: []bulker.StreamOption{bulker.WithTimestamp("_timestamp"), bulker.WithPrimaryKey("id"), bulker.WithDeduplicate(), WithDeduplicateWindow(5)}, }, { name: "dummy_test_table_cleanup", diff --git a/bulkerlib/implementations/sql/mysql.go b/bulkerlib/implementations/sql/mysql.go index f95123c..2a639bf 100644 --- a/bulkerlib/implementations/sql/mysql.go +++ b/bulkerlib/implementations/sql/mysql.go @@ -195,26 +195,26 @@ func (m *MySQL) Insert(ctx context.Context, table *Table, merge bool, objects .. } } -func (m *MySQL) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) error { +func (m *MySQL) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (bulker.WarehouseState, error) { if mergeWindow <= 0 { - return m.copy(ctx, targetTable, sourceTable) + return bulker.WarehouseState{}, m.copy(ctx, targetTable, sourceTable) } else { - return m.copyOrMerge(ctx, targetTable, sourceTable, mySQLBulkMergeQueryTemplate, "S") + return bulker.WarehouseState{}, m.copyOrMerge(ctx, targetTable, sourceTable, mySQLBulkMergeQueryTemplate, "S") } } -func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (err error) { +func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error) { quotedTableName := m.quotedTableName(targetTable.Name) if loadSource.Type != LocalFile { - return fmt.Errorf("LoadTable: only local file is supported") + return state, fmt.Errorf("LoadTable: only local file is supported") } if loadSource.Format != m.batchFileFormat { mode := "LOCAL INFILE" if !m.infileEnabled { mode = "prepared statement" } - return fmt.Errorf("LoadTable: only %s format is supported in %s mode", m.batchFileFormat, mode) + return state, fmt.Errorf("LoadTable: only %s format is supported in %s mode", m.batchFileFormat, mode) } if m.infileEnabled { mysql.RegisterLocalFile(loadSource.Path) @@ -227,14 +227,14 @@ func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *L } loadStatement := fmt.Sprintf(mySQLLoadTemplate, loadSource.Path, quotedTableName, strings.Join(header, ", ")) if _, err := m.txOrDb(ctx).ExecContext(ctx, loadStatement); err != nil { - return errorj.LoadError.Wrap(err, "failed to load data from local file system"). + return state, errorj.LoadError.Wrap(err, "failed to load data from local file system"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ Database: m.config.Db, Table: quotedTableName, Statement: loadStatement, }) } - return nil + return state, nil } else { columns := targetTable.SortedColumnNames() columnNames := make([]string, len(columns)) @@ -252,7 +252,7 @@ func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *L buf := strings.Builder{} err := insertQueryTemplate.Execute(&buf, insertPayload) if err != nil { - return errorj.ExecuteInsertError.Wrap(err, "failed to build query from template") + return state, errorj.ExecuteInsertError.Wrap(err, "failed to build query from template") } statement := buf.String() defer func() { @@ -268,7 +268,7 @@ func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *L stmt, err := m.txOrDb(ctx).PrepareContext(ctx, statement) if err != nil { - return err + return state, err } defer func() { _ = stmt.Close() @@ -278,7 +278,7 @@ func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *L file, err := os.Open(loadSource.Path) if err != nil { - return err + return state, err } scanner := bufio.NewScanner(file) scanner.Buffer(make([]byte, 1024*100), 1024*1024*10) @@ -288,7 +288,7 @@ func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *L decoder.UseNumber() err = decoder.Decode(&object) if err != nil { - return err + return state, err } args := make([]any, len(columns)) for i, v := range columns { @@ -296,13 +296,13 @@ func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *L args[i] = l } if _, err := stmt.ExecContext(ctx, args...); err != nil { - return checkErr(err) + return state, checkErr(err) } } if err = scanner.Err(); err != nil { - return fmt.Errorf("LoadTable: failed to read file: %v", err) + return state, fmt.Errorf("LoadTable: failed to read file: %v", err) } - return nil + return state, nil } } diff --git a/bulkerlib/implementations/sql/naming_test.go b/bulkerlib/implementations/sql/naming_test.go index 9405c03..18321a5 100644 --- a/bulkerlib/implementations/sql/naming_test.go +++ b/bulkerlib/implementations/sql/naming_test.go @@ -66,7 +66,7 @@ func TestNaming(t *testing.T) { Columns: justColumns("id", "name", "column_12b241e808ae6c964a5bb9f1c012e63d", "_1test_name", "_2", "column_c16da609b86c01f16a2c609eac4ccb0c", "Test_Name", "Test_Name__DROP_DATABASE_public__SELECT_1_from_DUAL_", "Universit_Franais", "_timestamp", "_unnamed", "lorem_ipsum_dolor_sit_amet_consectetur_adipiscing_elit_sed_do_eiusmod_tempor_incididunt_ut_labore_et_dolore_magna_aliqua_ut_enim_ad_minim_veniam_quis_nostrud_exercitation_ullamco_laboris_nisi_ut_aliquip_ex_ea_commodo_consequat", "column_c41d0d6c9ff6db34c6df393bdd283e19", "column_b4de5a5c8f92f77af9904705b3f08253", "camelCase", "int", "user", "select", "___ROOT__", "hash"), }, configIds: []string{BigqueryBulkerTypeId}, - streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithMergeRows()}, + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithDeduplicate()}, }, { name: "naming_test2", diff --git a/bulkerlib/implementations/sql/options.go b/bulkerlib/implementations/sql/options.go index 55d7789..c67dbca 100644 --- a/bulkerlib/implementations/sql/options.go +++ b/bulkerlib/implementations/sql/options.go @@ -36,8 +36,8 @@ var ( }, } - MergeWindow = bulker.ImplementationOption[int]{ - Key: "mergeWindow", + DeduplicateWindow = bulker.ImplementationOption[int]{ + Key: "deduplicateWindow", DefaultValue: 31, ParseFunc: utils.ParseInt, } @@ -48,7 +48,7 @@ var ( ) func init() { - bulker.RegisterOption(&MergeWindow) + bulker.RegisterOption(&DeduplicateWindow) bulker.RegisterOption(&ColumnTypesOption) } @@ -60,8 +60,8 @@ type S3OptionConfig struct { Folder string `mapstructure:"folder,omitempty" json:"folder,omitempty" yaml:"folder,omitempty"` } -func WithMergeWindow(mergeWindow int) bulker.StreamOption { - return bulker.WithOption(&MergeWindow, mergeWindow) +func WithDeduplicateWindow(deduplicateWindow int) bulker.StreamOption { + return bulker.WithOption(&DeduplicateWindow, deduplicateWindow) } func withColumnTypes(o *bulker.ImplementationOption[types.SQLTypes], fields types.SQLTypes) bulker.StreamOption { diff --git a/bulkerlib/implementations/sql/postgres.go b/bulkerlib/implementations/sql/postgres.go index 0f2dd4f..86e44e6 100644 --- a/bulkerlib/implementations/sql/postgres.go +++ b/bulkerlib/implementations/sql/postgres.go @@ -293,21 +293,21 @@ func (p *Postgres) Insert(ctx context.Context, table *Table, merge bool, objects } } -func (p *Postgres) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) error { +func (p *Postgres) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (bulker.WarehouseState, error) { if mergeWindow <= 0 { - return p.copy(ctx, targetTable, sourceTable) + return bulker.WarehouseState{}, p.copy(ctx, targetTable, sourceTable) } else { - return p.copyOrMerge(ctx, targetTable, sourceTable, pgBulkMergeQueryTemplate, pgBulkMergeSourceAlias) + return bulker.WarehouseState{}, p.copyOrMerge(ctx, targetTable, sourceTable, pgBulkMergeQueryTemplate, pgBulkMergeSourceAlias) } } -func (p *Postgres) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (err error) { +func (p *Postgres) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error) { quotedTableName := p.quotedTableName(targetTable.Name) if loadSource.Type != LocalFile { - return fmt.Errorf("LoadTable: only local file is supported") + return state, fmt.Errorf("LoadTable: only local file is supported") } if loadSource.Format != p.batchFileFormat { - return fmt.Errorf("LoadTable: only %s format is supported", p.batchFileFormat) + return state, fmt.Errorf("LoadTable: only %s format is supported", p.batchFileFormat) } columns := targetTable.SortedColumnNames() columnNames := make([]string, len(columns)) @@ -329,7 +329,7 @@ func (p *Postgres) LoadTable(ctx context.Context, targetTable *Table, loadSource stmt, err := p.txOrDb(ctx).PrepareContext(ctx, copyStatement) if err != nil { - return err + return state, err } defer func() { _ = stmt.Close() @@ -339,7 +339,7 @@ func (p *Postgres) LoadTable(ctx context.Context, targetTable *Table, loadSource file, err := os.Open(loadSource.Path) if err != nil { - return err + return state, err } scanner := bufio.NewScanner(file) scanner.Buffer(make([]byte, 1024*100), 1024*1024*10) @@ -349,7 +349,7 @@ func (p *Postgres) LoadTable(ctx context.Context, targetTable *Table, loadSource decoder.UseNumber() err = decoder.Decode(&object) if err != nil { - return err + return state, err } args := make([]any, len(columns)) for i, v := range columns { @@ -357,18 +357,18 @@ func (p *Postgres) LoadTable(ctx context.Context, targetTable *Table, loadSource args[i] = l } if _, err := stmt.ExecContext(ctx, args...); err != nil { - return checkErr(err) + return state, checkErr(err) } } if err = scanner.Err(); err != nil { - return fmt.Errorf("LoadTable: failed to read file: %v", err) + return state, fmt.Errorf("LoadTable: failed to read file: %v", err) } _, err = stmt.ExecContext(ctx) if err != nil { - return checkErr(err) + return state, checkErr(err) } - return nil + return state, nil } // pgColumnDDL returns column DDL (quoted column name, mapped sql type and 'not null' if pk field) diff --git a/bulkerlib/implementations/sql/redshift.go b/bulkerlib/implementations/sql/redshift.go index cfac266..0d1a010 100644 --- a/bulkerlib/implementations/sql/redshift.go +++ b/bulkerlib/implementations/sql/redshift.go @@ -169,13 +169,13 @@ func (p *Redshift) Insert(ctx context.Context, table *Table, merge bool, objects } // LoadTable copy transfer data from s3 to redshift by passing COPY request to redshift -func (p *Redshift) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (err error) { +func (p *Redshift) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error) { quotedTableName := p.quotedTableName(targetTable.Name) if loadSource.Type != AmazonS3 { - return fmt.Errorf("LoadTable: only Amazon S3 file is supported") + return state, fmt.Errorf("LoadTable: only Amazon S3 file is supported") } if loadSource.Format != p.batchFileFormat { - return fmt.Errorf("LoadTable: only %s format is supported", p.batchFileFormat) + return state, fmt.Errorf("LoadTable: only %s format is supported", p.batchFileFormat) } columns := targetTable.SortedColumnNames() columnNames := make([]string, len(columns)) @@ -190,7 +190,7 @@ func (p *Redshift) LoadTable(ctx context.Context, targetTable *Table, loadSource } statement := fmt.Sprintf(redshiftCopyTemplate, quotedTableName, strings.Join(columnNames, ","), s3Config.Bucket, fileKey, s3Config.AccessKeyID, s3Config.SecretKey, s3Config.Region) if _, err := p.txOrDb(ctx).ExecContext(ctx, statement); err != nil { - return errorj.CopyError.Wrap(err, "failed to copy data from s3"). + return state, errorj.CopyError.Wrap(err, "failed to copy data from s3"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ Schema: p.config.Schema, Table: quotedTableName, @@ -198,10 +198,10 @@ func (p *Redshift) LoadTable(ctx context.Context, targetTable *Table, loadSource }) } - return nil + return state, nil } -func (p *Redshift) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (err error) { +func (p *Redshift) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulker.WarehouseState, err error) { quotedTargetTableName := p.quotedTableName(targetTable.Name) quotedSourceTableName := p.quotedTableName(sourceTable.Name) @@ -218,7 +218,7 @@ func (p *Redshift) CopyTables(ctx context.Context, targetTable *Table, sourceTab if _, err = p.txOrDb(ctx).ExecContext(ctx, deleteStatement); err != nil { - return errorj.BulkMergeError.Wrap(err, "failed to delete duplicated rows"). + return state, errorj.BulkMergeError.Wrap(err, "failed to delete duplicated rows"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ Schema: p.config.Schema, Table: quotedTargetTableName, @@ -227,7 +227,7 @@ func (p *Redshift) CopyTables(ctx context.Context, targetTable *Table, sourceTab }) } } - return p.copy(ctx, targetTable, sourceTable) + return state, p.copy(ctx, targetTable, sourceTable) } func (p *Redshift) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error) { diff --git a/bulkerlib/implementations/sql/replacepartition_stream.go b/bulkerlib/implementations/sql/replacepartition_stream.go index 4beaa1d..3b4e83a 100644 --- a/bulkerlib/implementations/sql/replacepartition_stream.go +++ b/bulkerlib/implementations/sql/replacepartition_stream.go @@ -73,7 +73,9 @@ func (ps *ReplacePartitionStream) Complete(ctx context.Context) (state bulker.St err = ps.clearPartition(ctx, ps.tx) if err == nil && ps.state.SuccessfulRows > 0 { if ps.batchFile != nil { - if err = ps.flushBatchFile(ctx); err != nil { + ws, err := ps.flushBatchFile(ctx) + ps.state.WarehouseState.Merge(ws) + if err != nil { return ps.state, err } } @@ -86,7 +88,8 @@ func (ps *ReplacePartitionStream) Complete(ctx context.Context) (state bulker.St ps.dstTable = dstTable ps.updateRepresentationTable(ps.dstTable) //copy data from tmp table to destination table - err = ps.tx.CopyTables(ctx, ps.dstTable, ps.tmpTable, ps.mergeWindow) + ws, err := ps.tx.CopyTables(ctx, ps.dstTable, ps.tmpTable, ps.mergeWindow) + ps.state.WarehouseState.Merge(ws) if err != nil { return ps.state, err } diff --git a/bulkerlib/implementations/sql/replacetable_stream.go b/bulkerlib/implementations/sql/replacetable_stream.go index 84f16ce..a9ebf30 100644 --- a/bulkerlib/implementations/sql/replacetable_stream.go +++ b/bulkerlib/implementations/sql/replacetable_stream.go @@ -48,7 +48,9 @@ func (ps *ReplaceTableStream) Complete(ctx context.Context) (state bulker.State, //if at least one object was inserted if ps.state.SuccessfulRows > 0 { if ps.batchFile != nil { - if err = ps.flushBatchFile(ctx); err != nil { + ws, err := ps.flushBatchFile(ctx) + ps.state.WarehouseState.Merge(ws) + if err != nil { return ps.state, err } } diff --git a/bulkerlib/implementations/sql/snowflake.go b/bulkerlib/implementations/sql/snowflake.go index 6dd0baf..f1a2840 100644 --- a/bulkerlib/implementations/sql/snowflake.go +++ b/bulkerlib/implementations/sql/snowflake.go @@ -350,18 +350,18 @@ func (s *Snowflake) getPrimaryKey(ctx context.Context, tableName string) (string } // LoadTable transfer data from local file to Snowflake by passing COPY request to Snowflake -func (s *Snowflake) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (err error) { +func (s *Snowflake) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error) { quotedTableName := s.quotedTableName(targetTable.Name) if loadSource.Type != LocalFile { - return fmt.Errorf("LoadTable: only local file is supported") + return state, fmt.Errorf("LoadTable: only local file is supported") } if loadSource.Format != s.batchFileFormat { - return fmt.Errorf("LoadTable: only %s format is supported", s.batchFileFormat) + return state, fmt.Errorf("LoadTable: only %s format is supported", s.batchFileFormat) } putStatement := fmt.Sprintf("PUT file://%s @~", loadSource.Path) if _, err = s.txOrDb(ctx).ExecContext(ctx, putStatement); err != nil { - return errorj.LoadError.Wrap(err, "failed to put file to stage"). + return state, errorj.LoadError.Wrap(err, "failed to put file to stage"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ Schema: s.config.Schema, Table: quotedTableName, @@ -389,7 +389,7 @@ func (s *Snowflake) LoadTable(ctx context.Context, targetTable *Table, loadSourc statement := fmt.Sprintf(sfCopyStatement, quotedTableName, strings.Join(columnNames, ","), path.Base(loadSource.Path)) if _, err := s.txOrDb(ctx).ExecContext(ctx, statement); err != nil { - return errorj.CopyError.Wrap(err, "failed to copy data from stage"). + return state, errorj.CopyError.Wrap(err, "failed to copy data from stage"). WithProperty(errorj.DBInfo, &types2.ErrorPayload{ Schema: s.config.Schema, Table: quotedTableName, @@ -397,7 +397,7 @@ func (s *Snowflake) LoadTable(ctx context.Context, targetTable *Table, loadSourc }) } - return nil + return state, nil } // Insert inserts data with InsertContext as a single object or a batch into Snowflake @@ -433,11 +433,11 @@ func (s *Snowflake) Insert(ctx context.Context, table *Table, merge bool, object return nil } -func (s *Snowflake) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) error { +func (s *Snowflake) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (bulker.WarehouseState, error) { if mergeWindow <= 0 { - return s.copy(ctx, targetTable, sourceTable) + return bulker.WarehouseState{}, s.copy(ctx, targetTable, sourceTable) } else { - return s.copyOrMerge(ctx, targetTable, sourceTable, sfMergeQueryTemplate, "S") + return bulker.WarehouseState{}, s.copyOrMerge(ctx, targetTable, sourceTable, sfMergeQueryTemplate, "S") } } diff --git a/bulkerlib/implementations/sql/sql_adapter.go b/bulkerlib/implementations/sql/sql_adapter.go index 0bb7f0a..6db4bc5 100644 --- a/bulkerlib/implementations/sql/sql_adapter.go +++ b/bulkerlib/implementations/sql/sql_adapter.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + bulker "github.com/jitsucom/bulker/bulkerlib" types2 "github.com/jitsucom/bulker/bulkerlib/types" "regexp" ) @@ -35,8 +36,8 @@ type SQLAdapter interface { TableHelper() *TableHelper GetTableSchema(ctx context.Context, tableName string) (*Table, error) CreateTable(ctx context.Context, schemaToCreate *Table) error - CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) error - LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) error + CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulker.WarehouseState, err error) + LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error) PatchTableSchema(ctx context.Context, patchTable *Table) error TruncateTable(ctx context.Context, tableName string) error //(ctx context.Context, tableName string, object types.Object, whenConditions *WhenConditions) error @@ -123,11 +124,11 @@ func (tx *TxSQLAdapter) CreateTable(ctx context.Context, schemaToCreate *Table) ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx) return tx.sqlAdapter.CreateTable(ctx, schemaToCreate) } -func (tx *TxSQLAdapter) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) error { +func (tx *TxSQLAdapter) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (bulker.WarehouseState, error) { ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx) return tx.sqlAdapter.CopyTables(ctx, targetTable, sourceTable, mergeWindow) } -func (tx *TxSQLAdapter) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) error { +func (tx *TxSQLAdapter) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (bulker.WarehouseState, error) { ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx) return tx.sqlAdapter.LoadTable(ctx, targetTable, loadSource) } diff --git a/bulkerlib/implementations/sql/transactional_stream.go b/bulkerlib/implementations/sql/transactional_stream.go index c5cd15f..a6e6d62 100644 --- a/bulkerlib/implementations/sql/transactional_stream.go +++ b/bulkerlib/implementations/sql/transactional_stream.go @@ -55,7 +55,9 @@ func (ps *TransactionalStream) Complete(ctx context.Context) (state bulker.State //if at least one object was inserted if ps.state.SuccessfulRows > 0 { if ps.batchFile != nil { - if err = ps.flushBatchFile(ctx); err != nil { + ws, err := ps.flushBatchFile(ctx) + ps.state.WarehouseState.Merge(ws) + if err != nil { return ps.state, err } } @@ -68,11 +70,12 @@ func (ps *TransactionalStream) Complete(ctx context.Context) (state bulker.State ps.dstTable = dstTable ps.updateRepresentationTable(ps.dstTable) //copy data from tmp table to destination table - err = ps.tx.CopyTables(ctx, ps.dstTable, ps.tmpTable, ps.mergeWindow) + ws, err := ps.tx.CopyTables(ctx, ps.dstTable, ps.tmpTable, ps.mergeWindow) + ps.state.WarehouseState.Merge(ws) if err != nil { return ps.state, err } - return + return ps.state, nil } else { //if was any error - it will trigger transaction rollback in defer func err = ps.state.LastError diff --git a/bulkerlib/implementations/sql/transactional_stream_test.go b/bulkerlib/implementations/sql/transactional_stream_test.go index 5d3e7ba..16bf5f6 100644 --- a/bulkerlib/implementations/sql/transactional_stream_test.go +++ b/bulkerlib/implementations/sql/transactional_stream_test.go @@ -100,7 +100,7 @@ func TestTransactionalSequentialRepeatPK(t *testing.T) { tableName: "transactional_test_pk", modes: []bulker.BulkMode{bulker.Batch, bulker.Stream}, dataFile: "test_data/empty.ndjson", - streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithMergeRows()}, + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithDeduplicate()}, expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported}, configIds: allBulkerConfigs, }, @@ -118,7 +118,7 @@ func TestTransactionalSequentialRepeatPK(t *testing.T) { }, configIds: allBulkerConfigs, expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported}, - streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithMergeRows()}, + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithDeduplicate()}, }, { name: "second_run_batch", @@ -133,7 +133,7 @@ func TestTransactionalSequentialRepeatPK(t *testing.T) { {"_timestamp": constantTime, "id": 4, "name": "test14"}, {"_timestamp": constantTime, "id": 5, "name": "test15"}, }, - streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithMergeRows()}, + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithDeduplicate()}, expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported}, configIds: allBulkerConfigs, }, @@ -142,7 +142,7 @@ func TestTransactionalSequentialRepeatPK(t *testing.T) { tableName: "transactional_test_pk", modes: []bulker.BulkMode{bulker.Batch, bulker.Stream}, dataFile: "test_data/empty.ndjson", - streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithMergeRows()}, + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithDeduplicate()}, expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported}, configIds: allBulkerConfigs, }, @@ -168,7 +168,7 @@ func TestTransactionalSequentialRepeatPKMulti(t *testing.T) { tableName: "transactional_test_pk_multi", modes: []bulker.BulkMode{bulker.Batch, bulker.Stream}, dataFile: "test_data/empty.ndjson", - streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id", "id2"), bulker.WithMergeRows()}, + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id", "id2"), bulker.WithDeduplicate()}, expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported}, configIds: allBulkerConfigs, }, @@ -188,7 +188,7 @@ func TestTransactionalSequentialRepeatPKMulti(t *testing.T) { configIds: allBulkerConfigs, orderBy: []string{"id", "id2"}, expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported}, - streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id", "id2"), bulker.WithMergeRows()}, + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id", "id2"), bulker.WithDeduplicate()}, }, { name: "second_run_batch", @@ -205,7 +205,7 @@ func TestTransactionalSequentialRepeatPKMulti(t *testing.T) { {"_timestamp": constantTime, "id": 5, "id2": "d", "name": "test14"}, }, orderBy: []string{"id", "id2"}, - streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id", "id2"), bulker.WithMergeRows()}, + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id", "id2"), bulker.WithDeduplicate()}, expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported}, configIds: allBulkerConfigs, }, @@ -214,7 +214,7 @@ func TestTransactionalSequentialRepeatPKMulti(t *testing.T) { tableName: "transactional_test_pk_multi", modes: []bulker.BulkMode{bulker.Batch, bulker.Stream}, dataFile: "test_data/empty.ndjson", - streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id", "id2"), bulker.WithMergeRows()}, + streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id", "id2"), bulker.WithDeduplicate()}, expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported}, configIds: allBulkerConfigs, }, diff --git a/bulkerlib/options.go b/bulkerlib/options.go index 4c758f3..fbd9687 100644 --- a/bulkerlib/options.go +++ b/bulkerlib/options.go @@ -72,7 +72,7 @@ var ( }, } - MergeRowsOption = ImplementationOption[bool]{ + DeduplicateOption = ImplementationOption[bool]{ Key: "deduplicate", DefaultValue: false, ParseFunc: utils.ParseBool, @@ -97,7 +97,7 @@ func init() { RegisterOption(&RetryFrequencyOption) RegisterOption(&RetryBatchSizeOption) RegisterOption(&PrimaryKeyOption) - RegisterOption(&MergeRowsOption) + RegisterOption(&DeduplicateOption) RegisterOption(&PartitionIdOption) RegisterOption(&TimestampOption) @@ -196,14 +196,9 @@ func WithPrimaryKey(pkFields ...string) StreamOption { return withPrimaryKey(&PrimaryKeyOption, pkFields...) } -// WithMergeRows - when true merge rows on primary keys collision. -func WithMergeRows() StreamOption { - return WithOption(&MergeRowsOption, true) -} - -func WithoutMergeRows() StreamOption { - return WithOption(&MergeRowsOption, false) - +// WithDeduplicate - when true merge rows on primary keys collision. +func WithDeduplicate() StreamOption { + return WithOption(&DeduplicateOption, true) } // WithPartition settings for bulker.ReplacePartition mode only