Skip to content

Commit

Permalink
bulker: WithSchema options. adjust for TIMESTAMP columns
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Mar 27, 2024
1 parent 99a61b7 commit 8de85e6
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 5 deletions.
2 changes: 1 addition & 1 deletion bulkerapp/app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (r *Router) BulkHandler(c *gin.Context) {
if !schema.IsEmpty() {
streamOptions = append(streamOptions, bulker.WithSchema(schema))
}
r.Infof("Schema for %s: %+v\njson:%s", jobId, schema, schemaHeader)
r.Infof("Schema for %s: %v", jobId, schema)
}
//streamOptions = append(streamOptions, sql.WithoutOmitNils())
destination.InitBulkerInstance()
Expand Down
54 changes: 54 additions & 0 deletions bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,60 @@ func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable,
return columnsAdded
}

func (ps *AbstractSQLStream) adjustTableToSchema(currentTable, existingTable, schemaTable *Table) bool {
columnsAdded := false
current := currentTable.Columns.Clone()
for name, newCol := range schemaTable.Columns {
var existingCol types.SQLColumn
ok := false
if existingTable != nil {
existingCol, ok = existingTable.Columns[name]
}
if !ok {
existingCol, ok = current[name]
if !ok {
//column not exist in database - adding as New
newCol.New = true
current[name] = newCol
columnsAdded = true
continue
}
} else {
current[name] = existingCol
}
if existingCol.DataType == newCol.DataType {
continue
}
if newCol.Override {
//if column sql type is overridden by user - leave it this way
current[name] = newCol
continue
}
// when we have uncommitted tables schemas in both sides
if existingCol.New {
// source schema may have STRING type e.g. for File source date columns for what we usually treat as TIMESTAMP
if existingCol.DataType == types.TIMESTAMP && newCol.DataType == types.STRING {
// keep TIMESTAMP type in such case
continue
}
common := types.GetCommonAncestorType(existingCol.DataType, newCol.DataType)
if common != existingCol.DataType {
//logging.Warnf("Changed '%s' type from %s to %s because of %s", name, existingCol.DataType.String(), common.String(), newCol.DataType.String())
sqlType, ok := ps.sqlAdapter.GetSQLType(common)
if ok {
existingCol.DataType = common
existingCol.Type = sqlType
current[name] = existingCol
} else {
logging.SystemErrorf("Unknown column type %s mapping for %s", common, ps.sqlAdapter.Type())
}
}
}
}
currentTable.Columns = current
return columnsAdded
}

func (ps *AbstractSQLStream) updateRepresentationTable(table *Table) {
if ps.state.Representation == nil ||
ps.state.Representation.(RepresentationTable).Name != table.Name ||
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/autocommit_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (ps *AutoCommitStream) Consume(ctx context.Context, object types.Object) (s
}
table, processedObject, err := ps.preprocess(object)
if ps.schemaFromOptions != nil {
ps.adjustTableColumnTypes(table, nil, ps.schemaFromOptions, object)
ps.adjustTableToSchema(table, nil, ps.schemaFromOptions)
}
if err != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/replacepartition_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newReplacePartitionStream(id string, p SQLAdapter, tableName string, stream
dstTable := tableForObject
ps.adjustTableColumnTypes(dstTable, ps.existingTable, tableForObject, object)
if ps.schemaFromOptions != nil {
ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object)
ps.adjustTableToSchema(dstTable, ps.existingTable, ps.schemaFromOptions)
}
tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(tableName, 47), time.Now().Format("060102150405"))
return &Table{
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/replacetable_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func newReplaceTableStream(id string, p SQLAdapter, tableName string, streamOpti
TimestampColumn: tableForObject.TimestampColumn,
}
if ps.schemaFromOptions != nil {
ps.adjustTableColumnTypes(tmpTable, nil, ps.schemaFromOptions, object)
ps.adjustTableToSchema(tmpTable, nil, ps.schemaFromOptions)
}
return tmpTable
}
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/transactional_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newTransactionalStream(id string, p SQLAdapter, tableName string, streamOpt
dstTable := tableForObject
ps.adjustTableColumnTypes(dstTable, ps.existingTable, tableForObject, object)
if ps.schemaFromOptions != nil {
ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object)
ps.adjustTableToSchema(dstTable, ps.existingTable, ps.schemaFromOptions)
}
tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(tableName, 47), time.Now().Format("060102150405"))
return &Table{
Expand Down

0 comments on commit 8de85e6

Please sign in to comment.