From b848f0fd6e02b503a7ed01f68bf0ae9f3d84763b Mon Sep 17 00:00:00 2001 From: Ildar Nurislamov Date: Thu, 2 May 2024 15:25:39 +0400 Subject: [PATCH] Revert "bulker: introduced maxColumnsCount limit" This reverts commit 4ff0929eab4daaf88172a99b3b76e6608afcdd13. --- bulkerlib/implementations/sql/abstract.go | 4 +- bulkerlib/implementations/sql/options.go | 12 +----- .../implementations/sql/schema_freeze_test.go | 37 ------------------- bulkerlib/options.go | 4 -- 4 files changed, 2 insertions(+), 55 deletions(-) diff --git a/bulkerlib/implementations/sql/abstract.go b/bulkerlib/implementations/sql/abstract.go index bcd4b7c..5eafe66 100644 --- a/bulkerlib/implementations/sql/abstract.go +++ b/bulkerlib/implementations/sql/abstract.go @@ -26,7 +26,6 @@ type AbstractSQLStream struct { mergeWindow int omitNils bool schemaFreeze bool - maxColumnsCount int schemaFromOptions *Table state bulker.State @@ -59,7 +58,6 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu ps.timestampColumn = bulker.TimestampOption.Get(&ps.options) ps.omitNils = OmitNilsOption.Get(&ps.options) ps.schemaFreeze = SchemaFreezeOption.Get(&ps.options) - ps.maxColumnsCount = MaxColumnsCount.Get(&ps.options) schema := bulker.SchemaOption.Get(&ps.options) if !schema.IsEmpty() { @@ -139,7 +137,7 @@ func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable, existingCol, ok = existingTable.Columns[name] } if !ok { - if ps.schemaFreeze || len(current) >= ps.maxColumnsCount { + if ps.schemaFreeze { // when schemaFreeze=true all new columns values go to _unmapped_data v, ok := values[name] if ok { diff --git a/bulkerlib/implementations/sql/options.go b/bulkerlib/implementations/sql/options.go index 81d1b67..d693169 100644 --- a/bulkerlib/implementations/sql/options.go +++ b/bulkerlib/implementations/sql/options.go @@ -54,12 +54,6 @@ var ( ParseFunc: utils.ParseBool, } - MaxColumnsCount = bulker.ImplementationOption[int]{ - Key: "maxColumnsCount", - DefaultValue: 1500, - ParseFunc: utils.ParseInt, - } - localBatchFileOption = bulker.ImplementationOption[string]{Key: "BULKER_OPTION_LOCAL_BATCH_FILE"} s3BatchFileOption = bulker.ImplementationOption[*S3OptionConfig]{Key: "BULKER_OPTION_S3_BATCH_FILE"} @@ -70,7 +64,7 @@ func init() { bulker.RegisterOption(&ColumnTypesOption) bulker.RegisterOption(&OmitNilsOption) bulker.RegisterOption(&SchemaFreezeOption) - bulker.RegisterOption(&MaxColumnsCount) + } type S3OptionConfig struct { @@ -97,10 +91,6 @@ func WithDeduplicateWindow(deduplicateWindow int) bulker.StreamOption { return bulker.WithOption(&DeduplicateWindow, deduplicateWindow) } -func WithMaxColumnsCount(maxColumnsCount int) bulker.StreamOption { - return bulker.WithOption(&MaxColumnsCount, maxColumnsCount) -} - func withColumnTypes(o *bulker.ImplementationOption[types.SQLTypes], fields types.SQLTypes) bulker.StreamOption { return func(options *bulker.StreamOptions) { sqlTypes := o.Get(options) diff --git a/bulkerlib/implementations/sql/schema_freeze_test.go b/bulkerlib/implementations/sql/schema_freeze_test.go index 95fa349..67533a5 100644 --- a/bulkerlib/implementations/sql/schema_freeze_test.go +++ b/bulkerlib/implementations/sql/schema_freeze_test.go @@ -73,40 +73,3 @@ func TestSchemaFreeze(t *testing.T) { sequentialGroup.Add(1) } } - -func TestMaxColumns(t *testing.T) { - t.Parallel() - tests := []bulkerTestConfig{ - { - name: "added_columns_first_run", - tableName: "schema_freeze_test", - modes: []bulker.BulkMode{bulker.Batch, bulker.Stream}, - leaveResultingTable: true, - dataFile: "test_data/columns_added.ndjson", - expectedTable: ExpectedTable{ - Columns: justColumns("_timestamp", "column1", "_unmapped_data", "id", "name"), - }, - expectedRows: []map[string]any{ - {"_timestamp": constantTime, "id": 1, "name": "test", "column1": nil, "_unmapped_data": nil}, - {"_timestamp": constantTime, "id": 2, "name": "test2", "column1": "data", "_unmapped_data": nil}, - {"_timestamp": constantTime, "id": 3, "name": "test3", "column1": "data", "_unmapped_data": "{\"column2\": \"data\"}"}, - {"_timestamp": constantTime, "id": 4, "name": "test2", "column1": "data", "_unmapped_data": nil}, - {"_timestamp": constantTime, "id": 5, "name": "test", "column1": nil, "_unmapped_data": nil}, - {"_timestamp": constantTime, "id": 6, "name": "test4", "column1": "data", "_unmapped_data": "{\"column2\": \"data\", \"column3\": \"data\"}"}, - }, - batchSize: 2, - streamOptions: []bulker.StreamOption{WithMaxColumnsCount(4)}, - configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId, MySQLBulkerTypeId}), - }, - } - sequentialGroup := sync.WaitGroup{} - sequentialGroup.Add(1) - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - runTestConfig(t, tt, testStream) - sequentialGroup.Done() - }) - sequentialGroup.Wait() - sequentialGroup.Add(1) - } -} diff --git a/bulkerlib/options.go b/bulkerlib/options.go index c4825b3..04c74e7 100644 --- a/bulkerlib/options.go +++ b/bulkerlib/options.go @@ -214,10 +214,6 @@ func withPrimaryKey(o *ImplementationOption[utils.Set[string]], pkFields ...stri } } -func WithBatchSize(batchSize int) StreamOption { - return WithOption(&BatchSizeOption, batchSize) -} - func WithPrimaryKey(pkFields ...string) StreamOption { return withPrimaryKey(&PrimaryKeyOption, pkFields...) }