Skip to content

Commit

Permalink
bulker: bigquery: return data processing stats.
Browse files Browse the repository at this point in the history
bulker: fixes and cleanups
  • Loading branch information
absorbb committed Nov 28, 2023
1 parent dcc6070 commit 1a424dd
Show file tree
Hide file tree
Showing 23 changed files with 173 additions and 146 deletions.
2 changes: 1 addition & 1 deletion bulkerapp/app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (r *Router) BulkHandler(c *gin.Context) {
}
var streamOptions []bulker.StreamOption
if len(pkeys) > 0 {
streamOptions = append(streamOptions, bulker.WithPrimaryKey(pkeys...), bulker.WithMergeRows())
streamOptions = append(streamOptions, bulker.WithPrimaryKey(pkeys...), bulker.WithDeduplicate())
}
destination.InitBulkerInstance()
bulkerStream, err := destination.bulker.CreateStream(jobId, tableName, bulkMode, streamOptions...)
Expand Down
15 changes: 15 additions & 0 deletions bulkerlib/bulker.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,21 @@ type State struct {
ProcessedRows int `json:"processedRows"`
SuccessfulRows int `json:"successfulRows"`
ErrorRowIndex int `json:"errorRowIndex,omitempty"`
WarehouseState `json:",inline,omitempty"`
}

type WarehouseState struct {
BytesProcessed int `json:"bytesProcessed,omitempty"`
EstimatedCost float64 `json:"estimatedCost,omitempty"`
AdditionalInfo map[string]any `json:",inline,omitempty"`
}

func (ws *WarehouseState) Merge(second WarehouseState) {
ws.BytesProcessed += second.BytesProcessed
ws.EstimatedCost += second.EstimatedCost
for k, v := range second.AdditionalInfo {
ws.AdditionalInfo[k] = v
}
}

// SetError sets error to the state
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/file_storage/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func newAbstractFileStorageStream(id string, p implementations2.FileAdapter, fil
for _, option := range streamOptions {
ps.options.Add(option)
}
ps.merge = bulker.MergeRowsOption.Get(&ps.options)
ps.merge = bulker.DeduplicateOption.Get(&ps.options)
pkColumns := bulker.PrimaryKeyOption.Get(&ps.options)
if ps.merge && len(pkColumns) == 0 {
return AbstractFileStorageStream{}, fmt.Errorf("MergeRows option requires primary key option. Please provide WithPrimaryKey option")
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/file_storage/bulker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestBasics(t *testing.T) {
{"_timestamp": constantTimeStr, "id": 1, "name": "test7"},
},
configIds: allBulkerConfigs,
streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithMergeRows()},
streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithDeduplicate()},
},
}
for _, tt := range tests {
Expand Down
4 changes: 2 additions & 2 deletions bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu
for _, option := range streamOptions {
ps.options.Add(option)
}
ps.merge = bulker.MergeRowsOption.Get(&ps.options)
ps.merge = bulker.DeduplicateOption.Get(&ps.options)
pkColumns := bulker.PrimaryKeyOption.Get(&ps.options)
if ps.merge && len(pkColumns) == 0 {
return nil, fmt.Errorf("MergeRows option requires primary key in the destination table. Please provide WithPrimaryKey option")
}
if ps.merge {
ps.mergeWindow = MergeWindow.Get(&ps.options)
ps.mergeWindow = DeduplicateWindow.Get(&ps.options)
}

var customFields = ColumnTypesOption.Get(&ps.options)
Expand Down
34 changes: 17 additions & 17 deletions bulkerlib/implementations/sql/abstract_transactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,11 @@ func (ps *AbstractTransactionalSQLStream) postComplete(ctx context.Context, err
return ps.AbstractSQLStream.postComplete(err)
}

func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (err error) {
func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (state bulker.WarehouseState, err error) {
table := ps.tmpTable
err = ps.tx.CreateTable(ctx, table)
if err != nil {
return errorj.Decorate(err, "failed to create table")
return bulker.WarehouseState{}, errorj.Decorate(err, "failed to create table")
}
columns := table.SortedColumnNames()
defer func() {
Expand All @@ -135,11 +135,11 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (e
}()
if ps.eventsInBatch > 0 {
if err != nil {
return errorj.Decorate(err, "failed to flush marshaller")
return bulker.WarehouseState{}, errorj.Decorate(err, "failed to flush marshaller")
}
err = ps.batchFile.Sync()
if err != nil {
return errorj.Decorate(err, "failed to sync batch file")
return bulker.WarehouseState{}, errorj.Decorate(err, "failed to sync batch file")
}
workingFile := ps.batchFile
needToConvert := false
Expand All @@ -150,7 +150,7 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (e
if len(ps.batchFileSkipLines) > 0 || needToConvert {
workingFile, err = os.CreateTemp("", path.Base(ps.batchFile.Name())+"_2")
if err != nil {
return errorj.Decorate(err, "failed to create tmp file for deduplication")
return bulker.WarehouseState{}, errorj.Decorate(err, "failed to create tmp file for deduplication")
}
defer func() {
_ = workingFile.Close()
Expand All @@ -159,12 +159,12 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (e
if needToConvert {
err = ps.targetMarshaller.Init(workingFile, columns)
if err != nil {
return errorj.Decorate(err, "failed to write header for converted batch file")
return bulker.WarehouseState{}, errorj.Decorate(err, "failed to write header for converted batch file")
}
}
file, err := os.Open(ps.batchFile.Name())
if err != nil {
return errorj.Decorate(err, "failed to open tmp file")
return bulker.WarehouseState{}, errorj.Decorate(err, "failed to open tmp file")
}
scanner := bufio.NewScanner(file)
scanner.Buffer(make([]byte, 1024*100), 1024*1024*10)
Expand All @@ -177,21 +177,21 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (e
obj := make(map[string]any)
err = dec.Decode(&obj)
if err != nil {
return errorj.Decorate(err, "failed to decode json object from batch filer")
return bulker.WarehouseState{}, errorj.Decorate(err, "failed to decode json object from batch filer")
}
ps.targetMarshaller.Marshal(obj)
} else {
_, err = workingFile.Write(scanner.Bytes())
if err != nil {
return errorj.Decorate(err, "failed write to deduplication file")
return bulker.WarehouseState{}, errorj.Decorate(err, "failed write to deduplication file")
}
_, _ = workingFile.Write([]byte("\n"))
}
}
i++
}
if err = scanner.Err(); err != nil {
return errorj.Decorate(err, "failed to read batch file")
return bulker.WarehouseState{}, errorj.Decorate(err, "failed to read batch file")
}
ps.targetMarshaller.Flush()
workingFile.Sync()
Expand All @@ -203,29 +203,29 @@ func (ps *AbstractTransactionalSQLStream) flushBatchFile(ctx context.Context) (e
s3Config := s3BatchFileOption.Get(&ps.options)
rFile, err := os.Open(workingFile.Name())
if err != nil {
return errorj.Decorate(err, "failed to open tmp file")
return bulker.WarehouseState{}, errorj.Decorate(err, "failed to open tmp file")
}
s3FileName := path.Base(workingFile.Name())
if s3Config.Folder != "" {
s3FileName = s3Config.Folder + "/" + s3FileName
}
err = ps.s3.Upload(s3FileName, rFile)
if err != nil {
return errorj.Decorate(err, "failed to upload file to s3")
return bulker.WarehouseState{}, errorj.Decorate(err, "failed to upload file to s3")
}
defer ps.s3.DeleteObject(s3FileName)
err = ps.tx.LoadTable(ctx, table, &LoadSource{Type: AmazonS3, Path: s3FileName, Format: ps.sqlAdapter.GetBatchFileFormat(), S3Config: s3Config})
state, err = ps.tx.LoadTable(ctx, table, &LoadSource{Type: AmazonS3, Path: s3FileName, Format: ps.sqlAdapter.GetBatchFileFormat(), S3Config: s3Config})
if err != nil {
return errorj.Decorate(err, "failed to flush tmp file to the warehouse")
return state, errorj.Decorate(err, "failed to flush tmp file to the warehouse")
}
} else {
err = ps.tx.LoadTable(ctx, table, &LoadSource{Type: LocalFile, Path: workingFile.Name(), Format: ps.sqlAdapter.GetBatchFileFormat()})
state, err = ps.tx.LoadTable(ctx, table, &LoadSource{Type: LocalFile, Path: workingFile.Name(), Format: ps.sqlAdapter.GetBatchFileFormat()})
if err != nil {
return errorj.Decorate(err, "failed to flush tmp file to the warehouse")
return state, errorj.Decorate(err, "failed to flush tmp file to the warehouse")
}
}
}
return nil
return
}

//func (ps *AbstractTransactionalSQLStream) ensureSchema(ctx context.Context, targetTable **Table, tableForObject *Table, initTable func(ctx context.Context) (*Table, error)) (err error) {
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/bigdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestMillionRowsBatched(t *testing.T) {
expectedRowsCount: eventsCount,
leaveResultingTable: false,
configIds: configIds,
streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithMergeRows()},
streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithDeduplicate()},
},
}
for _, tt := range tests {
Expand Down
64 changes: 36 additions & 28 deletions bulkerlib/implementations/sql/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
BigQueryAutocommitUnsupported = "BigQuery bulker doesn't support auto commit mode as not efficient"
BigqueryBulkerTypeId = "bigquery"

bigqueryInsertFromSelectTemplate = "INSERT %s SELECT * FROM %s"
bigqueryInsertFromSelectTemplate = "INSERT INTO %s(%s) SELECT %s FROM %s"
bigqueryMergeTemplate = "MERGE INTO %s T USING %s S ON %s WHEN MATCHED THEN UPDATE SET %s WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s)"
bigqueryDeleteTemplate = "DELETE FROM %s WHERE %s"
bigqueryUpdateTemplate = "UPDATE %s SET %s WHERE %s"
Expand Down Expand Up @@ -160,11 +160,11 @@ func (bq *BigQuery) validateOptions(streamOptions []bulker.StreamOption) error {
return nil
}

func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (err error) {
func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulker.WarehouseState, err error) {
if mergeWindow <= 0 {
defer func() {
if err != nil {
err = errorj.CopyError.Wrap(err, "failed to run BQ copier").
err = errorj.CopyError.Wrap(err, "failed to run CopyTables").
WithProperty(errorj.DBInfo, &types2.ErrorPayload{
Dataset: bq.config.Dataset,
Bucket: bq.config.Bucket,
Expand All @@ -177,15 +177,22 @@ func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTa
copier := dataset.Table(targetTable.Name).CopierFrom(dataset.Table(sourceTable.Name))
copier.WriteDisposition = bigquery.WriteAppend
copier.CreateDisposition = bigquery.CreateIfNeeded
_, err = bq.RunJob(ctx, copier, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name))
_, state, err = bq.RunJob(ctx, copier, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name))
if err != nil {
// try to insert from select as a fallback
insertFromSelectStatement := fmt.Sprintf(bigqueryInsertFromSelectTemplate, bq.fullTableName(targetTable.Name), bq.fullTableName(sourceTable.Name))
columns := sourceTable.SortedColumnNames()
quotedColumns := make([]string, len(columns))
for i, name := range columns {
quotedColumns[i] = bq.quotedColumnName(name)
}
columnsString := strings.Join(quotedColumns, ",")
insertFromSelectStatement := fmt.Sprintf(bigqueryInsertFromSelectTemplate, bq.fullTableName(targetTable.Name), columnsString, columnsString, bq.fullTableName(sourceTable.Name))
query := bq.client.Query(insertFromSelectStatement)
_, err = bq.RunJob(ctx, query, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name))
return err
_, state2, err := bq.RunJob(ctx, query, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name))
state.Merge(state2)
return state, err
} else {
return nil
return state, nil
}
} else {
defer func() {
Expand All @@ -199,8 +206,12 @@ func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTa
})
}
}()

columns := sourceTable.SortedColumnNames()
quotedColumns := make([]string, len(columns))
for i, name := range columns {
quotedColumns[i] = bq.quotedColumnName(name)
}
columnsString := strings.Join(quotedColumns, ",")
updateSet := make([]string, len(columns))
for i, name := range columns {
updateSet[i] = fmt.Sprintf("T.%s = S.%s", bq.quotedColumnName(name), bq.quotedColumnName(name))
Expand All @@ -214,17 +225,12 @@ func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTa
monthBefore := timestamp.Now().Add(time.Duration(mergeWindow) * -24 * time.Hour).Format("2006-01-02")
joinConditions = append(joinConditions, fmt.Sprintf("T.%s >= '%s'", bq.quotedColumnName(targetTable.TimestampColumn), monthBefore))
}
quotedColumns := make([]string, len(columns))
for i, name := range columns {
quotedColumns[i] = bq.quotedColumnName(name)
}
columnsString := strings.Join(quotedColumns, ",")
insertFromSelectStatement := fmt.Sprintf(bigqueryMergeTemplate, bq.fullTableName(targetTable.Name), bq.fullTableName(sourceTable.Name),
strings.Join(joinConditions, " AND "), strings.Join(updateSet, ", "), columnsString, columnsString)

query := bq.client.Query(insertFromSelectStatement)
_, err = bq.RunJob(ctx, query, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name))
return err
_, state, err = bq.RunJob(ctx, query, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name))
return state, err
}
}

Expand Down Expand Up @@ -538,7 +544,7 @@ func (bq *BigQuery) Insert(ctx context.Context, table *Table, merge bool, object
return nil
}

func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (err error) {
func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error) {
tableName := bq.TableName(targetTable.Name)
defer func() {
if err != nil {
Expand All @@ -556,7 +562,7 @@ func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSourc

file, err := os.Open(loadSource.Path)
if err != nil {
return err
return state, err
}
bqTable := bq.client.Dataset(bq.config.Dataset).Table(tableName)
meta, err := bqTable.Metadata(ctx)
Expand All @@ -568,7 +574,7 @@ func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSourc
}
for i, field := range targetTable.SortedColumnNames() {
if _, ok := mp[field]; !ok {
return fmt.Errorf("field %s is not in table schema", field)
return state, fmt.Errorf("field %s is not in table schema", field)
}
meta.Schema[i] = mp[field]
}
Expand All @@ -587,8 +593,8 @@ func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSourc
loader := bq.client.Dataset(bq.config.Dataset).Table(tableName).LoaderFrom(source)
loader.CreateDisposition = bigquery.CreateIfNeeded
loader.WriteDisposition = bigquery.WriteAppend
_, err = bq.RunJob(ctx, loader, fmt.Sprintf("load into table '%s'", tableName))
return err
_, state, err = bq.RunJob(ctx, loader, fmt.Sprintf("load into table '%s'", tableName))
return state, err
}

// DropTable drops table from BigQuery
Expand Down Expand Up @@ -637,7 +643,7 @@ func (bq *BigQuery) ReplaceTable(ctx context.Context, targetTableName string, re
copier := dataset.Table(targetTableName).CopierFrom(dataset.Table(replacementTableName))
copier.WriteDisposition = bigquery.WriteTruncate
copier.CreateDisposition = bigquery.CreateIfNeeded
_, err = bq.RunJob(ctx, copier, fmt.Sprintf("replace table '%s' with '%s'", targetTableName, replacementTableName))
_, _, err = bq.RunJob(ctx, copier, fmt.Sprintf("replace table '%s' with '%s'", targetTableName, replacementTableName))
if err != nil {
return err
}
Expand Down Expand Up @@ -777,7 +783,7 @@ func (bq *BigQuery) Update(ctx context.Context, tableName string, object types2.

query := bq.client.Query(updateQuery)
query.Parameters = values
_, err = bq.RunJob(ctx, query, fmt.Sprintf("update table '%s'", tableName))
_, _, err = bq.RunJob(ctx, query, fmt.Sprintf("update table '%s'", tableName))
return err
}

Expand Down Expand Up @@ -818,7 +824,7 @@ func (bq *BigQuery) selectFrom(ctx context.Context, tableName string, selectExpr

query := bq.client.Query(selectQuery)
query.Parameters = values
job, err := bq.RunJob(ctx, query, fmt.Sprintf("select from table '%s'", tableName))
job, _, err := bq.RunJob(ctx, query, fmt.Sprintf("select from table '%s'", tableName))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -906,7 +912,7 @@ func (bq *BigQuery) Delete(ctx context.Context, tableName string, deleteConditio
}()
query := bq.client.Query(deleteQuery)
query.Parameters = values
_, err = bq.RunJob(ctx, query, fmt.Sprintf("delete from table '%s'", tableName))
_, _, err = bq.RunJob(ctx, query, fmt.Sprintf("delete from table '%s'", tableName))
return err
}
func (bq *BigQuery) Type() string {
Expand Down Expand Up @@ -978,7 +984,7 @@ type JobRunner interface {
Run(ctx context.Context) (*bigquery.Job, error)
}

func (bq *BigQuery) RunJob(ctx context.Context, runner JobRunner, jobDescription string) (job *bigquery.Job, err error) {
func (bq *BigQuery) RunJob(ctx context.Context, runner JobRunner, jobDescription string) (job *bigquery.Job, state bulker.WarehouseState, err error) {
defer func() {
bq.logQuery(jobDescription, runner, err)
}()
Expand All @@ -994,6 +1000,8 @@ func (bq *BigQuery) RunJob(ctx context.Context, runner JobRunner, jobDescription
}
bytesProcessed := ""
if status != nil && status.Statistics != nil {
state.BytesProcessed = int(status.Statistics.TotalBytesProcessed)
state.EstimatedCost = float64(status.Statistics.TotalBytesProcessed) * 6.25 / 1_000_000_000_000
bytesProcessed = fmt.Sprintf(" Bytes processed: %d", status.Statistics.TotalBytesProcessed)
}
if err != nil {
Expand All @@ -1004,9 +1012,9 @@ func (bq *BigQuery) RunJob(ctx context.Context, runner JobRunner, jobDescription
}
err = errors.New(builder.String())
}
return job, fmt.Errorf("Failed to %s.%s Completed with error: %v%s", jobDescription, jobID, err, bytesProcessed)
return job, state, fmt.Errorf("Failed to %s.%s Completed with error: %v%s", jobDescription, jobID, err, bytesProcessed)
} else {
bq.Infof("Successfully %s.%s%s", jobDescription, jobID, bytesProcessed)
return job, nil
return job, state, nil
}
}
4 changes: 2 additions & 2 deletions bulkerlib/implementations/sql/bulker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func TestBasics(t *testing.T) {
},
expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported},
configIds: allBulkerConfigs,
streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithMergeRows()},
streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id"), bulker.WithDeduplicate()},
},
{
name: "multi_pk",
Expand All @@ -346,7 +346,7 @@ func TestBasics(t *testing.T) {
expectedErrors: map[string]any{"create_stream_bigquery_stream": BigQueryAutocommitUnsupported},
configIds: allBulkerConfigs,
orderBy: []string{"id", "id2"},
streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id", "id2"), bulker.WithMergeRows()},
streamOptions: []bulker.StreamOption{bulker.WithPrimaryKey("id", "id2"), bulker.WithDeduplicate()},
},
{
name: "timestamp_option",
Expand Down
Loading

0 comments on commit 1a424dd

Please sign in to comment.