From 8de85e627ba05cd2e3cfdd29fcad22808bb0ebe0 Mon Sep 17 00:00:00 2001 From: Ildar Nurislamov Date: Wed, 27 Mar 2024 18:06:33 +0400 Subject: [PATCH] bulker: WithSchema options. adjust for TIMESTAMP columns --- bulkerapp/app/router.go | 2 +- bulkerlib/implementations/sql/abstract.go | 54 +++++++++++++++++++ .../implementations/sql/autocommit_stream.go | 2 +- .../sql/replacepartition_stream.go | 2 +- .../sql/replacetable_stream.go | 2 +- .../sql/transactional_stream.go | 2 +- 6 files changed, 59 insertions(+), 5 deletions(-) diff --git a/bulkerapp/app/router.go b/bulkerapp/app/router.go index 909e5aa..abe62ec 100644 --- a/bulkerapp/app/router.go +++ b/bulkerapp/app/router.go @@ -202,7 +202,7 @@ func (r *Router) BulkHandler(c *gin.Context) { if !schema.IsEmpty() { streamOptions = append(streamOptions, bulker.WithSchema(schema)) } - r.Infof("Schema for %s: %+v\njson:%s", jobId, schema, schemaHeader) + r.Infof("Schema for %s: %v", jobId, schema) } //streamOptions = append(streamOptions, sql.WithoutOmitNils()) destination.InitBulkerInstance() diff --git a/bulkerlib/implementations/sql/abstract.go b/bulkerlib/implementations/sql/abstract.go index d25d28d..c55a642 100644 --- a/bulkerlib/implementations/sql/abstract.go +++ b/bulkerlib/implementations/sql/abstract.go @@ -206,6 +206,60 @@ func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable, return columnsAdded } +func (ps *AbstractSQLStream) adjustTableToSchema(currentTable, existingTable, schemaTable *Table) bool { + columnsAdded := false + current := currentTable.Columns.Clone() + for name, newCol := range schemaTable.Columns { + var existingCol types.SQLColumn + ok := false + if existingTable != nil { + existingCol, ok = existingTable.Columns[name] + } + if !ok { + existingCol, ok = current[name] + if !ok { + //column not exist in database - adding as New + newCol.New = true + current[name] = newCol + columnsAdded = true + continue + } + } else { + current[name] = existingCol + } + if existingCol.DataType == newCol.DataType { + continue + } + if newCol.Override { + //if column sql type is overridden by user - leave it this way + current[name] = newCol + continue + } + // when we have uncommitted tables schemas in both sides + if existingCol.New { + // source schema may have STRING type e.g. for File source date columns for what we usually treat as TIMESTAMP + if existingCol.DataType == types.TIMESTAMP && newCol.DataType == types.STRING { + // keep TIMESTAMP type in such case + continue + } + common := types.GetCommonAncestorType(existingCol.DataType, newCol.DataType) + if common != existingCol.DataType { + //logging.Warnf("Changed '%s' type from %s to %s because of %s", name, existingCol.DataType.String(), common.String(), newCol.DataType.String()) + sqlType, ok := ps.sqlAdapter.GetSQLType(common) + if ok { + existingCol.DataType = common + existingCol.Type = sqlType + current[name] = existingCol + } else { + logging.SystemErrorf("Unknown column type %s mapping for %s", common, ps.sqlAdapter.Type()) + } + } + } + } + currentTable.Columns = current + return columnsAdded +} + func (ps *AbstractSQLStream) updateRepresentationTable(table *Table) { if ps.state.Representation == nil || ps.state.Representation.(RepresentationTable).Name != table.Name || diff --git a/bulkerlib/implementations/sql/autocommit_stream.go b/bulkerlib/implementations/sql/autocommit_stream.go index adeddaa..042dd1a 100644 --- a/bulkerlib/implementations/sql/autocommit_stream.go +++ b/bulkerlib/implementations/sql/autocommit_stream.go @@ -32,7 +32,7 @@ func (ps *AutoCommitStream) Consume(ctx context.Context, object types.Object) (s } table, processedObject, err := ps.preprocess(object) if ps.schemaFromOptions != nil { - ps.adjustTableColumnTypes(table, nil, ps.schemaFromOptions, object) + ps.adjustTableToSchema(table, nil, ps.schemaFromOptions) } if err != nil { return diff --git a/bulkerlib/implementations/sql/replacepartition_stream.go b/bulkerlib/implementations/sql/replacepartition_stream.go index 5b1e309..58a0e06 100644 --- a/bulkerlib/implementations/sql/replacepartition_stream.go +++ b/bulkerlib/implementations/sql/replacepartition_stream.go @@ -36,7 +36,7 @@ func newReplacePartitionStream(id string, p SQLAdapter, tableName string, stream dstTable := tableForObject ps.adjustTableColumnTypes(dstTable, ps.existingTable, tableForObject, object) if ps.schemaFromOptions != nil { - ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object) + ps.adjustTableToSchema(dstTable, ps.existingTable, ps.schemaFromOptions) } tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(tableName, 47), time.Now().Format("060102150405")) return &Table{ diff --git a/bulkerlib/implementations/sql/replacetable_stream.go b/bulkerlib/implementations/sql/replacetable_stream.go index 1e16474..e7cf490 100644 --- a/bulkerlib/implementations/sql/replacetable_stream.go +++ b/bulkerlib/implementations/sql/replacetable_stream.go @@ -36,7 +36,7 @@ func newReplaceTableStream(id string, p SQLAdapter, tableName string, streamOpti TimestampColumn: tableForObject.TimestampColumn, } if ps.schemaFromOptions != nil { - ps.adjustTableColumnTypes(tmpTable, nil, ps.schemaFromOptions, object) + ps.adjustTableToSchema(tmpTable, nil, ps.schemaFromOptions) } return tmpTable } diff --git a/bulkerlib/implementations/sql/transactional_stream.go b/bulkerlib/implementations/sql/transactional_stream.go index 7b7dd63..f417ab0 100644 --- a/bulkerlib/implementations/sql/transactional_stream.go +++ b/bulkerlib/implementations/sql/transactional_stream.go @@ -28,7 +28,7 @@ func newTransactionalStream(id string, p SQLAdapter, tableName string, streamOpt dstTable := tableForObject ps.adjustTableColumnTypes(dstTable, ps.existingTable, tableForObject, object) if ps.schemaFromOptions != nil { - ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object) + ps.adjustTableToSchema(dstTable, ps.existingTable, ps.schemaFromOptions) } tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(tableName, 47), time.Now().Format("060102150405")) return &Table{