Skip to content

Commit

Permalink
bulker: refactor all bigquery errors
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Nov 22, 2023
1 parent fa02b8f commit 7c971d3
Showing 1 changed file with 44 additions and 78 deletions.
122 changes: 44 additions & 78 deletions bulkerlib/implementations/sql/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,19 +176,8 @@ func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTa
copier.WriteDisposition = bigquery.WriteAppend
copier.CreateDisposition = bigquery.CreateIfNeeded
bq.logQuery(fmt.Sprintf("Copy tables values to table %s from: %s ", targetTable.Name, sourceTable.Name), nil, err)
job, err := copier.Run(ctx)
if err != nil {
return err
}
jobStatus, err := job.Wait(ctx)
if err != nil {
return err
}
if jobStatus.Err() != nil {
return jobStatus.Err()
}

return nil
_, err = RunJob(ctx, copier, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name))
return err
} else {
defer func() {
if err != nil {
Expand Down Expand Up @@ -220,16 +209,8 @@ func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTa
strings.Join(joinConditions, " AND "), strings.Join(updateSet, ", "), columnsString, columnsString)

query := bq.client.Query(insertFromSelectStatement)
job, err := query.Run(ctx)
bq.logQuery(insertFromSelectStatement, nil, err)
if err != nil {
return err
}
status, err := job.Wait(ctx)
if err != nil {
return err
}
return status.Err()
_, err = RunJob(ctx, query, fmt.Sprintf("copy data from '%s' to '%s'", sourceTable.Name, targetTable.Name))
return err
}
}

Expand Down Expand Up @@ -578,26 +559,8 @@ func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSourc
loader := bq.client.Dataset(bq.config.Dataset).Table(tableName).LoaderFrom(source)
loader.CreateDisposition = bigquery.CreateIfNeeded
loader.WriteDisposition = bigquery.WriteAppend
job, err := loader.Run(ctx)
if err != nil {
return err
}
status, err := job.Wait(ctx)
if err != nil {
return err
}
if err := status.Err(); err != nil {
builder := strings.Builder{}
builder.WriteString(fmt.Sprintf("Failed to load table %s. Job ID: %s Completed with error: %s", tableName, job.ID(), err.Error()))
if len(status.Errors) > 0 {
builder.WriteString("\nDetailed errors:")
for _, statusError := range status.Errors {
builder.WriteString(fmt.Sprintf("\n%s", statusError.Error()))
}
}
return errors.New(builder.String())
}
return nil
_, err = RunJob(ctx, loader, fmt.Sprintf("load into table '%s'", tableName))
return err
}

// DropTable drops table from BigQuery
Expand Down Expand Up @@ -648,18 +611,10 @@ func (bq *BigQuery) ReplaceTable(ctx context.Context, targetTableName string, re
copier.CreateDisposition = bigquery.CreateIfNeeded
bq.logQuery(fmt.Sprintf("COPY table '%s' to '%s'", replacementTableName, targetTableName), copier, nil)

job, err := copier.Run(ctx)
_, err = RunJob(ctx, copier, fmt.Sprintf("replace table '%s' with '%s'", targetTableName, replacementTableName))
if err != nil {
return err
}
status, err := job.Wait(ctx)
if err != nil {
return err

}
if err = status.Err(); err != nil {
return err
}
if dropOldTable {
return bq.DropTable(ctx, replacementTableName, false)
} else {
Expand Down Expand Up @@ -796,15 +751,8 @@ func (bq *BigQuery) Update(ctx context.Context, tableName string, object types2.

query := bq.client.Query(updateQuery)
query.Parameters = values
job, err := query.Run(ctx)
if err != nil {
return err
}
status, err := job.Wait(ctx)
if err != nil {
return err
}
return status.Err()
_, err = RunJob(ctx, query, fmt.Sprintf("update table '%s'", tableName))
return err
}

func (bq *BigQuery) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) {
Expand Down Expand Up @@ -845,17 +793,10 @@ func (bq *BigQuery) selectFrom(ctx context.Context, tableName string, selectExpr

query := bq.client.Query(selectQuery)
query.Parameters = values
job, err := query.Run(ctx)
job, err := RunJob(ctx, query, fmt.Sprintf("select from table '%s'", tableName))
if err != nil {
return nil, err
}
status, err := job.Wait(ctx)
if err != nil {
return nil, err
}
if err := status.Err(); err != nil {
return nil, err
}
it, err := job.Read(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -940,15 +881,8 @@ func (bq *BigQuery) Delete(ctx context.Context, tableName string, deleteConditio
}()
query := bq.client.Query(deleteQuery)
query.Parameters = values
job, err := query.Run(ctx)
if err != nil {
return err
}
status, err := job.Wait(ctx)
if err != nil {
return err
}
return status.Err()
_, err = RunJob(ctx, query, fmt.Sprintf("delete from table '%s'", tableName))
return err
}
func (bq *BigQuery) Type() string {
return BigqueryBulkerTypeId
Expand Down Expand Up @@ -1014,3 +948,35 @@ func (bq *BigQuery) GetDataType(sqlType string) (types2.DataType, bool) {
func (bq *BigQuery) TableHelper() *TableHelper {
return &bq.tableHelper
}

type JobRunner interface {
Run(ctx context.Context) (*bigquery.Job, error)
}

func RunJob(ctx context.Context, runner JobRunner, jobDescription string) (job *bigquery.Job, err error) {
var status *bigquery.JobStatus
job, err = runner.Run(ctx)
if err == nil {
status, err = job.Wait(ctx)
if err == nil {
err = status.Err()
if err == nil {
return job, nil
}
}
}

if err != nil {
builder := strings.Builder{}
builder.WriteString(fmt.Sprintf("Failed to %s. Job ID: %s Completed with error: %s", jobDescription, job.ID(), err.Error()))
if status != nil && len(status.Errors) > 0 {
builder.WriteString("\nDetailed errors:")
for _, statusError := range status.Errors {
builder.WriteString(fmt.Sprintf("\n%s", statusError.Error()))
}
}
return job, errors.New(builder.String())
} else {
return job, nil
}
}

0 comments on commit 7c971d3

Please sign in to comment.