Skip to content

Commit

Permalink
Revert "bulker: introduced maxColumnsCount limit"
Browse files Browse the repository at this point in the history
This reverts commit 4ff0929.
  • Loading branch information
absorbb committed May 2, 2024
1 parent c1046ab commit b848f0f
Show file tree
Hide file tree
Showing 4 changed files with 2 additions and 55 deletions.
4 changes: 1 addition & 3 deletions bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type AbstractSQLStream struct {
mergeWindow int
omitNils bool
schemaFreeze bool
maxColumnsCount int
schemaFromOptions *Table

state bulker.State
Expand Down Expand Up @@ -59,7 +58,6 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu
ps.timestampColumn = bulker.TimestampOption.Get(&ps.options)
ps.omitNils = OmitNilsOption.Get(&ps.options)
ps.schemaFreeze = SchemaFreezeOption.Get(&ps.options)
ps.maxColumnsCount = MaxColumnsCount.Get(&ps.options)

schema := bulker.SchemaOption.Get(&ps.options)
if !schema.IsEmpty() {
Expand Down Expand Up @@ -139,7 +137,7 @@ func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable,
existingCol, ok = existingTable.Columns[name]
}
if !ok {
if ps.schemaFreeze || len(current) >= ps.maxColumnsCount {
if ps.schemaFreeze {
// when schemaFreeze=true all new columns values go to _unmapped_data
v, ok := values[name]
if ok {
Expand Down
12 changes: 1 addition & 11 deletions bulkerlib/implementations/sql/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,6 @@ var (
ParseFunc: utils.ParseBool,
}

MaxColumnsCount = bulker.ImplementationOption[int]{
Key: "maxColumnsCount",
DefaultValue: 1500,
ParseFunc: utils.ParseInt,
}

localBatchFileOption = bulker.ImplementationOption[string]{Key: "BULKER_OPTION_LOCAL_BATCH_FILE"}

s3BatchFileOption = bulker.ImplementationOption[*S3OptionConfig]{Key: "BULKER_OPTION_S3_BATCH_FILE"}
Expand All @@ -70,7 +64,7 @@ func init() {
bulker.RegisterOption(&ColumnTypesOption)
bulker.RegisterOption(&OmitNilsOption)
bulker.RegisterOption(&SchemaFreezeOption)
bulker.RegisterOption(&MaxColumnsCount)

}

type S3OptionConfig struct {
Expand All @@ -97,10 +91,6 @@ func WithDeduplicateWindow(deduplicateWindow int) bulker.StreamOption {
return bulker.WithOption(&DeduplicateWindow, deduplicateWindow)
}

func WithMaxColumnsCount(maxColumnsCount int) bulker.StreamOption {
return bulker.WithOption(&MaxColumnsCount, maxColumnsCount)
}

func withColumnTypes(o *bulker.ImplementationOption[types.SQLTypes], fields types.SQLTypes) bulker.StreamOption {
return func(options *bulker.StreamOptions) {
sqlTypes := o.Get(options)
Expand Down
37 changes: 0 additions & 37 deletions bulkerlib/implementations/sql/schema_freeze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,40 +73,3 @@ func TestSchemaFreeze(t *testing.T) {
sequentialGroup.Add(1)
}
}

func TestMaxColumns(t *testing.T) {
t.Parallel()
tests := []bulkerTestConfig{
{
name: "added_columns_first_run",
tableName: "schema_freeze_test",
modes: []bulker.BulkMode{bulker.Batch, bulker.Stream},
leaveResultingTable: true,
dataFile: "test_data/columns_added.ndjson",
expectedTable: ExpectedTable{
Columns: justColumns("_timestamp", "column1", "_unmapped_data", "id", "name"),
},
expectedRows: []map[string]any{
{"_timestamp": constantTime, "id": 1, "name": "test", "column1": nil, "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 2, "name": "test2", "column1": "data", "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 3, "name": "test3", "column1": "data", "_unmapped_data": "{\"column2\": \"data\"}"},
{"_timestamp": constantTime, "id": 4, "name": "test2", "column1": "data", "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 5, "name": "test", "column1": nil, "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 6, "name": "test4", "column1": "data", "_unmapped_data": "{\"column2\": \"data\", \"column3\": \"data\"}"},
},
batchSize: 2,
streamOptions: []bulker.StreamOption{WithMaxColumnsCount(4)},
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId, MySQLBulkerTypeId}),
},
}
sequentialGroup := sync.WaitGroup{}
sequentialGroup.Add(1)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
runTestConfig(t, tt, testStream)
sequentialGroup.Done()
})
sequentialGroup.Wait()
sequentialGroup.Add(1)
}
}
4 changes: 0 additions & 4 deletions bulkerlib/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,6 @@ func withPrimaryKey(o *ImplementationOption[utils.Set[string]], pkFields ...stri
}
}

func WithBatchSize(batchSize int) StreamOption {
return WithOption(&BatchSizeOption, batchSize)
}

func WithPrimaryKey(pkFields ...string) StreamOption {
return withPrimaryKey(&PrimaryKeyOption, pkFields...)
}
Expand Down

0 comments on commit b848f0f

Please sign in to comment.