From 08a43ceacb97018294e9d3fe5c21cf6c61297f43 Mon Sep 17 00:00:00 2001 From: achettyiitr Date: Mon, 27 Jan 2025 12:48:25 +0530 Subject: [PATCH] chore: added feature flag for loadByFolderPath --- warehouse/integrations/bigquery/bigquery.go | 59 +++++++---- .../integrations/bigquery/bigquery_test.go | 99 +++++++++++-------- warehouse/utils/utils.go | 7 ++ warehouse/utils/utils_test.go | 18 ++++ 4 files changed, 124 insertions(+), 59 deletions(-) diff --git a/warehouse/integrations/bigquery/bigquery.go b/warehouse/integrations/bigquery/bigquery.go index 929c46092e..f34be2ec10 100644 --- a/warehouse/integrations/bigquery/bigquery.go +++ b/warehouse/integrations/bigquery/bigquery.go @@ -49,6 +49,7 @@ type BigQuery struct { enableDeleteByJobs bool customPartitionsEnabledWorkspaceIDs []string slowQueryThreshold time.Duration + loadByFolderPath bool } } @@ -130,6 +131,7 @@ func New(conf *config.Config, log logger.Logger) *BigQuery { bq.config.enableDeleteByJobs = conf.GetBool("Warehouse.bigquery.enableDeleteByJobs", false) bq.config.customPartitionsEnabledWorkspaceIDs = conf.GetStringSlice("Warehouse.bigquery.customPartitionsEnabledWorkspaceIDs", nil) bq.config.slowQueryThreshold = conf.GetDuration("Warehouse.bigquery.slowQueryThreshold", 5, time.Minute) + bq.config.loadByFolderPath = conf.GetBool("Warehouse.bigquery.loadByFolderPath", false) return bq } @@ -441,12 +443,12 @@ func (bq *BigQuery) loadTable(ctx context.Context, tableName string) ( ) log.Infon("started loading") - loadFileLocation, err := bq.loadFileLocation(ctx, tableName) + gcsReferences, err := bq.gcsReferences(ctx, tableName) if err != nil { - return nil, nil, fmt.Errorf("getting load file location: %w", err) + return nil, nil, fmt.Errorf("getting gcs references: %w", err) } - gcsRef := bigquery.NewGCSReference(loadFileLocation) + gcsRef := bigquery.NewGCSReference(gcsReferences...) gcsRef.SourceFormat = bigquery.JSON gcsRef.MaxBadRecords = 0 gcsRef.IgnoreUnknownValues = false @@ -454,10 +456,10 @@ func (bq *BigQuery) loadTable(ctx context.Context, tableName string) ( return bq.loadTableByAppend(ctx, tableName, gcsRef, log) } -func (bq *BigQuery) loadFileLocation( +func (bq *BigQuery) gcsReferences( ctx context.Context, tableName string, -) (string, error) { +) ([]string, error) { switch tableName { case warehouseutils.IdentityMappingsTable, warehouseutils.IdentityMergeRulesTable: loadfile, err := bq.uploader.GetSingleLoadFile( @@ -465,17 +467,33 @@ func (bq *BigQuery) loadFileLocation( tableName, ) if err != nil { - return "", fmt.Errorf("getting single load file for table %s: %w", tableName, err) + return nil, fmt.Errorf("getting single load file for table %s: %w", tableName, err) } - return loadfile.Location, nil + + locations := warehouseutils.GetGCSLocations([]warehouseutils.LoadFile{loadfile}, warehouseutils.GCSLocationOptions{}) + return locations, nil default: - objectLocation, err := bq.uploader.GetSampleLoadFileLocation(ctx, tableName) - if err != nil { - return "", fmt.Errorf("getting sample load file location for table %s: %w", tableName, err) - } - gcsLocation := warehouseutils.GetGCSLocation(objectLocation, warehouseutils.GCSLocationOptions{}) + if bq.config.loadByFolderPath { + objectLocation, err := bq.uploader.GetSampleLoadFileLocation(ctx, tableName) + if err != nil { + return nil, fmt.Errorf("getting sample load file location for table %s: %w", tableName, err) + } + gcsLocation := warehouseutils.GetGCSLocation(objectLocation, warehouseutils.GCSLocationOptions{}) + gcsLocationFolder := loadFolder(gcsLocation) + + return []string{gcsLocationFolder}, nil + } else { + loadFilesMetadata, err := bq.uploader.GetLoadFilesMetadata( + ctx, + warehouseutils.GetLoadFilesOptions{Table: tableName}, + ) + if err != nil { + return nil, fmt.Errorf("getting load files metadata for table %s: %w", tableName, err) + } - return loadFolder(gcsLocation), nil + locations := warehouseutils.GetGCSLocations(loadFilesMetadata, warehouseutils.GCSLocationOptions{}) + return locations, nil + } } } @@ -690,12 +708,12 @@ func (bq *BigQuery) LoadUserTables(ctx context.Context) (errorMap map[string]err } func (bq *BigQuery) createAndLoadStagingUsersTable(ctx context.Context, stagingTable string) error { - loadFileLocation, err := bq.loadFileLocation(ctx, warehouseutils.UsersTable) + gcsReferences, err := bq.gcsReferences(ctx, warehouseutils.UsersTable) if err != nil { - return fmt.Errorf("getting load file location: %w", err) + return fmt.Errorf("getting gcs references: %w", err) } - gcsRef := bigquery.NewGCSReference(loadFileLocation) + gcsRef := bigquery.NewGCSReference(gcsReferences...) gcsRef.SourceFormat = bigquery.JSON gcsRef.MaxBadRecords = 0 gcsRef.IgnoreUnknownValues = false @@ -1175,7 +1193,14 @@ func (bq *BigQuery) Connect(ctx context.Context, warehouse model.Warehouse) (cli func (bq *BigQuery) LoadTestTable(ctx context.Context, location, tableName string, _ map[string]interface{}, _ string) error { gcsLocation := warehouseutils.GetGCSLocation(location, warehouseutils.GCSLocationOptions{}) - gcsRef := bigquery.NewGCSReference(loadFolder(gcsLocation)) + var gcsReference string + if bq.config.loadByFolderPath { + gcsReference = loadFolder(gcsLocation) + } else { + gcsReference = gcsLocation + } + + gcsRef := bigquery.NewGCSReference(gcsReference) gcsRef.SourceFormat = bigquery.JSON gcsRef.MaxBadRecords = 0 gcsRef.IgnoreUnknownValues = false diff --git a/warehouse/integrations/bigquery/bigquery_test.go b/warehouse/integrations/bigquery/bigquery_test.go index 4dd301ec56..720eef1ea8 100644 --- a/warehouse/integrations/bigquery/bigquery_test.go +++ b/warehouse/integrations/bigquery/bigquery_test.go @@ -7,6 +7,7 @@ import ( "io" "os" "slices" + "strconv" "testing" "time" @@ -1098,47 +1099,59 @@ func TestIntegration(t *testing.T) { require.Equal(t, records, whth.SampleTestRecords()) }) t.Run("multiple files", func(t *testing.T) { - tableName := "multiple_files_test_table" - repeat := 10 - loadObjectFolder := "rudder-warehouse-load-objects" - sourceID := "test_source_id" + testCases := []struct { + name string + loadByFolderPath bool + }{ + {name: "loadByFolderPath = false", loadByFolderPath: false}, + {name: "loadByFolderPath = true", loadByFolderPath: true}, + } + for i, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tableName := "multiple_files_test_table" + strconv.Itoa(i) + repeat := 10 + loadObjectFolder := "rudder-warehouse-load-objects" + sourceID := "test_source_id" - prefixes := []string{loadObjectFolder, tableName, sourceID, uuid.New().String() + "-" + tableName} + prefixes := []string{loadObjectFolder, tableName, sourceID, uuid.New().String() + "-" + tableName} - loadFiles := lo.RepeatBy(repeat, func(int) whutils.LoadFile { - sourceFile, err := os.Open("../testdata/load.json.gz") - require.NoError(t, err) - defer func() { _ = sourceFile.Close() }() + loadFiles := lo.RepeatBy(repeat, func(int) whutils.LoadFile { + sourceFile, err := os.Open("../testdata/load.json.gz") + require.NoError(t, err) + defer func() { _ = sourceFile.Close() }() - tempFile, err := os.CreateTemp("", "clone_*.json.gz") - require.NoError(t, err) - defer func() { _ = tempFile.Close() }() + tempFile, err := os.CreateTemp("", "clone_*.json.gz") + require.NoError(t, err) + defer func() { _ = tempFile.Close() }() - _, err = io.Copy(tempFile, sourceFile) - require.NoError(t, err) + _, err = io.Copy(tempFile, sourceFile) + require.NoError(t, err) - f, err := os.Open(tempFile.Name()) - require.NoError(t, err) - defer func() { _ = f.Close() }() + f, err := os.Open(tempFile.Name()) + require.NoError(t, err) + defer func() { _ = f.Close() }() - uploadOutput, err := fm.Upload(context.Background(), f, prefixes...) - require.NoError(t, err) - return whutils.LoadFile{Location: uploadOutput.Location} - }) - mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) + uploadOutput, err := fm.Upload(context.Background(), f, prefixes...) + require.NoError(t, err) + return whutils.LoadFile{Location: uploadOutput.Location} + }) + mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse) - bq := whbigquery.New(config.New(), logger.NOP) - require.NoError(t, bq.Setup(ctx, warehouse, mockUploader)) - require.NoError(t, bq.CreateSchema(ctx)) - require.NoError(t, bq.CreateTable(ctx, tableName, schemaInWarehouse)) + c := config.New() + c.Set("Warehouse.redshift.loadByFolderPath", tc.loadByFolderPath) - loadTableStat, err := bq.LoadTable(ctx, tableName) - require.NoError(t, err) - require.Equal(t, loadTableStat.RowsInserted, int64(repeat*14)) - require.Equal(t, loadTableStat.RowsUpdated, int64(0)) + bq := whbigquery.New(c, logger.NOP) + require.NoError(t, bq.Setup(ctx, warehouse, mockUploader)) + require.NoError(t, bq.CreateSchema(ctx)) + require.NoError(t, bq.CreateTable(ctx, tableName, schemaInWarehouse)) - records := bqhelper.RetrieveRecordsFromWarehouse(t, db, - fmt.Sprintf(` + loadTableStat, err := bq.LoadTable(ctx, tableName) + require.NoError(t, err) + require.Equal(t, loadTableStat.RowsInserted, int64(repeat*14)) + require.Equal(t, loadTableStat.RowsUpdated, int64(0)) + + records := bqhelper.RetrieveRecordsFromWarehouse(t, db, + fmt.Sprintf(` SELECT id, received_at, @@ -1150,17 +1163,19 @@ func TestIntegration(t *testing.T) { FROM %s.%s WHERE _PARTITIONTIME BETWEEN TIMESTAMP('%s') AND TIMESTAMP('%s') ORDER BY id;`, - namespace, - tableName, - time.Now().Add(-24*time.Hour).Format("2006-01-02"), - time.Now().Add(+24*time.Hour).Format("2006-01-02"), - ), - ) - expectedRecords := make([][]string, 0, repeat) - for i := 0; i < repeat; i++ { - expectedRecords = append(expectedRecords, whth.SampleTestRecords()...) + namespace, + tableName, + time.Now().Add(-24*time.Hour).Format("2006-01-02"), + time.Now().Add(+24*time.Hour).Format("2006-01-02"), + ), + ) + expectedRecords := make([][]string, 0, repeat) + for i := 0; i < repeat; i++ { + expectedRecords = append(expectedRecords, whth.SampleTestRecords()...) + } + require.ElementsMatch(t, expectedRecords, records) + }) } - require.ElementsMatch(t, expectedRecords, records) }) }) diff --git a/warehouse/utils/utils.go b/warehouse/utils/utils.go index d5e90ead2b..848c2c481f 100644 --- a/warehouse/utils/utils.go +++ b/warehouse/utils/utils.go @@ -413,6 +413,13 @@ func GetGCSLocationFolder(location string, options GCSLocationOptions) string { return GetLocationFolder(GetGCSLocation(location, options)) } +func GetGCSLocations(loadFiles []LoadFile, options GCSLocationOptions) (gcsLocations []string) { + for _, loadFile := range loadFiles { + gcsLocations = append(gcsLocations, GetGCSLocation(loadFile.Location, options)) + } + return +} + func GetLocationFolder(location string) string { return location[:strings.LastIndex(location, "/")] } diff --git a/warehouse/utils/utils_test.go b/warehouse/utils/utils_test.go index 36e29a11ce..b476578b5d 100644 --- a/warehouse/utils/utils_test.go +++ b/warehouse/utils/utils_test.go @@ -405,6 +405,24 @@ func TestGetGCSLocationFolder(t *testing.T) { } } +func TestGetGCSLocations(t *testing.T) { + inputs := []LoadFile{ + {Location: "https://storage.googleapis.com/test-bucket/test-object.csv"}, + {Location: "https://storage.googleapis.com/my.test-bucket/test-object.csv"}, + {Location: "https://storage.googleapis.com/my.test-bucket2/test-object.csv"}, + {Location: "https://storage.googleapis.com/my.test-bucket/test-object2.csv"}, + } + outputs := []string{ + "gs://test-bucket/test-object.csv", + "gs://my.test-bucket/test-object.csv", + "gs://my.test-bucket2/test-object.csv", + "gs://my.test-bucket/test-object2.csv", + } + + gcsLocations := GetGCSLocations(inputs, GCSLocationOptions{}) + require.Equal(t, gcsLocations, outputs) +} + func TestGetAzureBlobLocation(t *testing.T) { inputs := []struct { location string