Skip to content

Commit

Permalink
bulker: added schemaFreeze option
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Apr 25, 2024
1 parent 2af46a5 commit a8216fb
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 68 deletions.
86 changes: 51 additions & 35 deletions bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type AbstractSQLStream struct {
merge bool
mergeWindow int
omitNils bool
schemaFreeze bool
schemaFromOptions *Table

state bulker.State
Expand Down Expand Up @@ -56,6 +57,7 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu
ps.pkColumns = pkColumns.ToSlice()
ps.timestampColumn = bulker.TimestampOption.Get(&ps.options)
ps.omitNils = OmitNilsOption.Get(&ps.options)
ps.schemaFreeze = SchemaFreezeOption.Get(&ps.options)

schema := bulker.SchemaOption.Get(&ps.options)
if !schema.IsEmpty() {
Expand Down Expand Up @@ -135,13 +137,23 @@ func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable,
existingCol, ok = existingTable.Columns[name]
}
if !ok {
existingCol, ok = current[name]
if !ok {
//column not exist in database - adding as New
newCol.New = true
current[name] = newCol
columnsAdded = true
continue
if ps.schemaFreeze {
// when schemaFreeze=true all new columns values go to _unmapped_data
v, ok := values[name]
if ok {
unmappedObj[name] = v
}
delete(current, name)
delete(values, name)
} else {
existingCol, ok = current[name]
if !ok {
// column doesn't exist in database and in current batch - adding as New
newCol.New = true
current[name] = newCol
columnsAdded = true
continue
}
}
} else {
current[name] = existingCol
Expand All @@ -154,39 +166,43 @@ func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable,
current[name] = newCol
continue
}
if !existingCol.New {
//column exists in database - check if its DataType is castable to DataType of existing column
v, ok := values[name]
if ok && v != nil {
if types.IsConvertible(newCol.DataType, existingCol.DataType) {
newVal, _, err := types.Convert(existingCol.DataType, v)
if err != nil {
//logging.Warnf("Can't convert '%s' value '%v' from %s to %s: %v", name, values[name], newCol.DataType.String(), existingCol.DataType.String(), err)
if existingCol.Type != "" {
// if column exists in db (existingTable) or in current batch (currentTable)
if !existingCol.New {
//column exists in database - check if its DataType is castable to DataType of existing column
v, ok := values[name]
if ok && v != nil {
if types.IsConvertible(newCol.DataType, existingCol.DataType) {
newVal, _, err := types.Convert(existingCol.DataType, v)
if err != nil {
//logging.Warnf("Can't convert '%s' value '%v' from %s to %s: %v", name, values[name], newCol.DataType.String(), existingCol.DataType.String(), err)
unmappedObj[name] = v
delete(values, name)
continue
} else {
//logging.Infof("Converted '%s' value '%v' from %s to %s: %v", name, values[name], newCol.DataType.String(), existingCol.DataType.String(), newVal)
values[name] = newVal
}
} else {
//logging.Warnf("Can't convert '%s' value '%v' from %s to %s", name, values[name], newCol.DataType.String(), existingCol.DataType.String())
unmappedObj[name] = v
delete(values, name)
continue
} else {
//logging.Infof("Converted '%s' value '%v' from %s to %s: %v", name, values[name], newCol.DataType.String(), existingCol.DataType.String(), newVal)
values[name] = newVal
}
} else {
//logging.Warnf("Can't convert '%s' value '%v' from %s to %s", name, values[name], newCol.DataType.String(), existingCol.DataType.String())
unmappedObj[name] = v
delete(values, name)
continue
}
}
} else {
common := types.GetCommonAncestorType(existingCol.DataType, newCol.DataType)
if common != existingCol.DataType {
//logging.Warnf("Changed '%s' type from %s to %s because of %s", name, existingCol.DataType.String(), common.String(), newCol.DataType.String())
sqlType, ok := ps.sqlAdapter.GetSQLType(common)
if ok {
existingCol.DataType = common
existingCol.Type = sqlType
current[name] = existingCol
} else {
logging.SystemErrorf("Unknown column type %s mapping for %s", common, ps.sqlAdapter.Type())
} else {
// column exists only in current batch - we have chance to change new column type to common ancestor
common := types.GetCommonAncestorType(existingCol.DataType, newCol.DataType)
if common != existingCol.DataType {
//logging.Warnf("Changed '%s' type from %s to %s because of %s", name, existingCol.DataType.String(), common.String(), newCol.DataType.String())
sqlType, ok := ps.sqlAdapter.GetSQLType(common)
if ok {
existingCol.DataType = common
existingCol.Type = sqlType
current[name] = existingCol
} else {
logging.SystemErrorf("Unknown column type %s mapping for %s", common, ps.sqlAdapter.Type())
}
}
}
}
Expand Down
69 changes: 37 additions & 32 deletions bulkerlib/implementations/sql/autocommit_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,50 +32,55 @@ func (ps *AutoCommitStream) Consume(ctx context.Context, object types.Object) (s
}
table, processedObject, err := ps.preprocess(object)
if ps.schemaFromOptions != nil {
//just to convert values to schema data types
ps.adjustTableColumnTypes(table, nil, ps.schemaFromOptions, object)
}
if err != nil {
return
}
existingTable, err := ps.sqlAdapter.TableHelper().EnsureTableWithCaching(ctx, ps.sqlAdapter, ps.id, table)
if err == nil {
// for autocommit mode this method only tries to convert values to existing column types
columnsAdded := ps.adjustTableColumnTypes(table, existingTable, table, processedObject)
if columnsAdded {
ps.updateRepresentationTable(existingTable)
// if new columns were added - update table. (for _unmapped_data column)
existingTable, err = ps.sqlAdapter.TableHelper().EnsureTableWithCaching(ctx, ps.sqlAdapter, ps.id, table)
}
if err == nil {
existingTable.Columns = table.Columns
ps.updateRepresentationTable(existingTable)
err = ps.sqlAdapter.Insert(ctx, existingTable, ps.merge, processedObject)
}
}
existingTable, err := ps.sqlAdapter.TableHelper().Get(ctx, ps.sqlAdapter, table.Name, true)
if err != nil {
// give another try without using table cache
existingTable, err = ps.sqlAdapter.TableHelper().EnsureTableWithoutCaching(ctx, ps.sqlAdapter, ps.id, table)
if err != nil {
ps.updateRepresentationTable(table)
err = errorj.Decorate(err, "failed to ensure table")
return
}
// for autocommit mode this method only tries to convert values to existing column types
columnsAdded := ps.adjustTableColumnTypes(table, existingTable, table, processedObject)
err = errorj.Decorate(err, "failed to get current table table")
return
}
if existingTable.Exists() {
currentTable := existingTable.Clone()
columnsAdded := ps.adjustTableColumnTypes(currentTable, existingTable, table, processedObject)
if columnsAdded {
ps.updateRepresentationTable(existingTable)
ps.updateRepresentationTable(currentTable)
// if new columns were added - update table. (for _unmapped_data column)
existingTable, err = ps.sqlAdapter.TableHelper().EnsureTableWithCaching(ctx, ps.sqlAdapter, ps.id, table)
existingTable, err = ps.sqlAdapter.TableHelper().EnsureTableWithCaching(ctx, ps.sqlAdapter, ps.id, currentTable)
if err != nil {
err = errorj.Decorate(err, "failed to ensure table")
return
// give another try without using table cache
existingTable, err = ps.sqlAdapter.TableHelper().EnsureTableWithoutCaching(ctx, ps.sqlAdapter, ps.id, currentTable)
if err != nil {
ps.updateRepresentationTable(currentTable)
err = errorj.Decorate(err, "failed to ensure table")
return
}
currentTable = existingTable.Clone()
// here this method only tries to convert values to existing column types
columnsAdded = ps.adjustTableColumnTypes(currentTable, existingTable, table, processedObject)
if columnsAdded {
ps.updateRepresentationTable(currentTable)
// if new columns were added - update table. (for _unmapped_data column)
existingTable, err = ps.sqlAdapter.TableHelper().EnsureTableWithCaching(ctx, ps.sqlAdapter, ps.id, currentTable)
if err != nil {
err = errorj.Decorate(err, "failed to ensure table")
return
}
currentTable = existingTable
}
}
}
existingTable.Columns = table.Columns
ps.updateRepresentationTable(existingTable)
return ps.state, processedObject, ps.sqlAdapter.Insert(ctx, existingTable, ps.merge, processedObject)
ps.updateRepresentationTable(currentTable)
err = ps.sqlAdapter.Insert(ctx, currentTable, ps.merge, processedObject)
} else {
existingTable, err = ps.sqlAdapter.TableHelper().EnsureTableWithCaching(ctx, ps.sqlAdapter, ps.id, table)
err = ps.sqlAdapter.Insert(ctx, existingTable, ps.merge, processedObject)
}
return ps.state, processedObject, nil

return ps.state, processedObject, err
}

func (ps *AutoCommitStream) Complete(ctx context.Context) (state bulker.State, err error) {
Expand Down
12 changes: 12 additions & 0 deletions bulkerlib/implementations/sql/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ var (
ParseFunc: utils.ParseBool,
}

SchemaFreezeOption = bulker.ImplementationOption[bool]{
Key: "schemaFreeze",
DefaultValue: false,
ParseFunc: utils.ParseBool,
}

localBatchFileOption = bulker.ImplementationOption[string]{Key: "BULKER_OPTION_LOCAL_BATCH_FILE"}

s3BatchFileOption = bulker.ImplementationOption[*S3OptionConfig]{Key: "BULKER_OPTION_S3_BATCH_FILE"}
Expand All @@ -57,6 +63,8 @@ func init() {
bulker.RegisterOption(&DeduplicateWindow)
bulker.RegisterOption(&ColumnTypesOption)
bulker.RegisterOption(&OmitNilsOption)
bulker.RegisterOption(&SchemaFreezeOption)

}

type S3OptionConfig struct {
Expand All @@ -75,6 +83,10 @@ func WithoutOmitNils() bulker.StreamOption {
return bulker.WithOption(&OmitNilsOption, false)
}

func WithSchemaFreeze() bulker.StreamOption {
return bulker.WithOption(&SchemaFreezeOption, true)
}

func WithDeduplicateWindow(deduplicateWindow int) bulker.StreamOption {
return bulker.WithOption(&DeduplicateWindow, deduplicateWindow)
}
Expand Down
75 changes: 75 additions & 0 deletions bulkerlib/implementations/sql/schema_freeze_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package sql

import (
bulker "github.com/jitsucom/bulker/bulkerlib"
"github.com/jitsucom/bulker/jitsubase/utils"
"sync"
"testing"
)

// TestTransactionalStream sequentially runs transactional stream without dropping table in between
func TestSchemaFreeze(t *testing.T) {
t.Parallel()
tests := []bulkerTestConfig{
{
//deletes any table leftovers from previous tests
name: "dummy_test_table_cleanup",
tableName: "schema_freeze_test",
modes: []bulker.BulkMode{bulker.Batch, bulker.Stream},
dataFile: "test_data/empty.ndjson",
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId, MySQLBulkerTypeId}),
},
{
name: "added_columns_first_run",
tableName: "schema_freeze_test",
modes: []bulker.BulkMode{bulker.Batch, bulker.Stream},
leaveResultingTable: true,
dataFile: "test_data/columns_added.ndjson",
expectedTable: ExpectedTable{
Columns: justColumns("_timestamp", "column1", "column2", "column3", "id", "name"),
},
expectedRowsCount: 6,
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId, MySQLBulkerTypeId}),
},
{
name: "added_columns_second_run",
tableName: "schema_freeze_test",
modes: []bulker.BulkMode{bulker.Batch, bulker.Stream},
leaveResultingTable: true,
dataFile: "test_data/columns_added2.ndjson",
expectedTable: ExpectedTable{
Columns: justColumns("_timestamp", "column1", "column2", "column3", "_unmapped_data", "id", "name"),
},
expectedRows: []map[string]any{
{"_timestamp": constantTime, "id": 1, "name": "test", "column1": nil, "column2": nil, "column3": nil, "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 2, "name": "test2", "column1": "data", "column2": nil, "column3": nil, "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 3, "name": "test3", "column1": "data", "column2": "data", "column3": nil, "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 4, "name": "test2", "column1": "data", "column2": nil, "column3": nil, "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 5, "name": "test", "column1": nil, "column2": nil, "column3": nil, "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 6, "name": "test4", "column1": "data", "column2": "data", "column3": "data", "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 7, "name": "test", "column1": nil, "column2": nil, "column3": nil, "_unmapped_data": "{\"column4\": \"data\"}"},
{"_timestamp": constantTime, "id": 8, "name": "test2", "column1": nil, "column2": nil, "column3": nil, "_unmapped_data": "{\"column5\": \"data\"}"},
},
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId, MySQLBulkerTypeId}),
streamOptions: []bulker.StreamOption{WithSchemaFreeze()},
},
{
name: "dummy_test_table_cleanup",
tableName: "schema_freeze_test",
modes: []bulker.BulkMode{bulker.Batch, bulker.Stream},
dataFile: "test_data/empty.ndjson",
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId, MySQLBulkerTypeId}),
streamOptions: []bulker.StreamOption{WithSchemaFreeze()},
},
}
sequentialGroup := sync.WaitGroup{}
sequentialGroup.Add(1)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
runTestConfig(t, tt, testStream)
sequentialGroup.Done()
})
sequentialGroup.Wait()
sequentialGroup.Add(1)
}
}
6 changes: 5 additions & 1 deletion bulkerlib/implementations/sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ func (t *Table) SortedColumnNames() []string {
func (t *Table) Clone() *Table {
clonedColumns := Columns{}
for k, v := range t.Columns {
clonedColumns[k] = v
clonedColumns[k] = types.SQLColumn{
Type: v.Type,
DdlType: v.DdlType,
DataType: v.DataType,
}
}

clonedPkFields := t.PKFields.Clone()
Expand Down
19 changes: 19 additions & 0 deletions bulkerlib/implementations/sql/table_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,25 @@ func (th *TableHelper) getTableIdentifier(destinationID, tableName string) strin
return destinationID + "_" + tableName
}

func (th *TableHelper) Get(ctx context.Context, sqlAdapter SQLAdapter, tableName string, cacheTable bool) (*Table, error) {
var table *Table
var ok bool
if cacheTable {
table, ok = th.GetCached(tableName)
if ok {
return table, nil
}
}
table, err := sqlAdapter.GetTableSchema(ctx, tableName)
if err != nil {
return nil, err
}
if table.Exists() && cacheTable {
th.updateCached(table.Name, table)
}
return table, nil
}

func (th *TableHelper) GetCached(tableName string) (*Table, bool) {
th.RLock()
dbSchema, ok := th.tablesCache[tableName]
Expand Down

0 comments on commit a8216fb

Please sign in to comment.