Skip to content

Commit

Permalink
chore: added feature flag for loadByFolderPath
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Jan 27, 2025
1 parent f469ce1 commit 08a43ce
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 59 deletions.
59 changes: 42 additions & 17 deletions warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type BigQuery struct {
enableDeleteByJobs bool
customPartitionsEnabledWorkspaceIDs []string
slowQueryThreshold time.Duration
loadByFolderPath bool
}
}

Expand Down Expand Up @@ -130,6 +131,7 @@ func New(conf *config.Config, log logger.Logger) *BigQuery {
bq.config.enableDeleteByJobs = conf.GetBool("Warehouse.bigquery.enableDeleteByJobs", false)
bq.config.customPartitionsEnabledWorkspaceIDs = conf.GetStringSlice("Warehouse.bigquery.customPartitionsEnabledWorkspaceIDs", nil)
bq.config.slowQueryThreshold = conf.GetDuration("Warehouse.bigquery.slowQueryThreshold", 5, time.Minute)
bq.config.loadByFolderPath = conf.GetBool("Warehouse.bigquery.loadByFolderPath", false)

return bq
}
Expand Down Expand Up @@ -441,41 +443,57 @@ func (bq *BigQuery) loadTable(ctx context.Context, tableName string) (
)
log.Infon("started loading")

loadFileLocation, err := bq.loadFileLocation(ctx, tableName)
gcsReferences, err := bq.gcsReferences(ctx, tableName)
if err != nil {
return nil, nil, fmt.Errorf("getting load file location: %w", err)
return nil, nil, fmt.Errorf("getting gcs references: %w", err)

Check warning on line 448 in warehouse/integrations/bigquery/bigquery.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/bigquery/bigquery.go#L448

Added line #L448 was not covered by tests
}

gcsRef := bigquery.NewGCSReference(loadFileLocation)
gcsRef := bigquery.NewGCSReference(gcsReferences...)
gcsRef.SourceFormat = bigquery.JSON
gcsRef.MaxBadRecords = 0
gcsRef.IgnoreUnknownValues = false

return bq.loadTableByAppend(ctx, tableName, gcsRef, log)
}

func (bq *BigQuery) loadFileLocation(
func (bq *BigQuery) gcsReferences(
ctx context.Context,
tableName string,
) (string, error) {
) ([]string, error) {
switch tableName {
case warehouseutils.IdentityMappingsTable, warehouseutils.IdentityMergeRulesTable:
loadfile, err := bq.uploader.GetSingleLoadFile(
ctx,
tableName,
)
if err != nil {
return "", fmt.Errorf("getting single load file for table %s: %w", tableName, err)
return nil, fmt.Errorf("getting single load file for table %s: %w", tableName, err)
}
return loadfile.Location, nil

locations := warehouseutils.GetGCSLocations([]warehouseutils.LoadFile{loadfile}, warehouseutils.GCSLocationOptions{})
return locations, nil
default:
objectLocation, err := bq.uploader.GetSampleLoadFileLocation(ctx, tableName)
if err != nil {
return "", fmt.Errorf("getting sample load file location for table %s: %w", tableName, err)
}
gcsLocation := warehouseutils.GetGCSLocation(objectLocation, warehouseutils.GCSLocationOptions{})
if bq.config.loadByFolderPath {
objectLocation, err := bq.uploader.GetSampleLoadFileLocation(ctx, tableName)
if err != nil {
return nil, fmt.Errorf("getting sample load file location for table %s: %w", tableName, err)
}
gcsLocation := warehouseutils.GetGCSLocation(objectLocation, warehouseutils.GCSLocationOptions{})
gcsLocationFolder := loadFolder(gcsLocation)

return []string{gcsLocationFolder}, nil

Check warning on line 484 in warehouse/integrations/bigquery/bigquery.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/bigquery/bigquery.go#L477-L484

Added lines #L477 - L484 were not covered by tests
} else {
loadFilesMetadata, err := bq.uploader.GetLoadFilesMetadata(
ctx,
warehouseutils.GetLoadFilesOptions{Table: tableName},
)
if err != nil {
return nil, fmt.Errorf("getting load files metadata for table %s: %w", tableName, err)
}

Check warning on line 492 in warehouse/integrations/bigquery/bigquery.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/bigquery/bigquery.go#L491-L492

Added lines #L491 - L492 were not covered by tests

return loadFolder(gcsLocation), nil
locations := warehouseutils.GetGCSLocations(loadFilesMetadata, warehouseutils.GCSLocationOptions{})
return locations, nil
}
}
}

Expand Down Expand Up @@ -690,12 +708,12 @@ func (bq *BigQuery) LoadUserTables(ctx context.Context) (errorMap map[string]err
}

func (bq *BigQuery) createAndLoadStagingUsersTable(ctx context.Context, stagingTable string) error {
loadFileLocation, err := bq.loadFileLocation(ctx, warehouseutils.UsersTable)
gcsReferences, err := bq.gcsReferences(ctx, warehouseutils.UsersTable)
if err != nil {
return fmt.Errorf("getting load file location: %w", err)
return fmt.Errorf("getting gcs references: %w", err)

Check warning on line 713 in warehouse/integrations/bigquery/bigquery.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/bigquery/bigquery.go#L713

Added line #L713 was not covered by tests
}

gcsRef := bigquery.NewGCSReference(loadFileLocation)
gcsRef := bigquery.NewGCSReference(gcsReferences...)
gcsRef.SourceFormat = bigquery.JSON
gcsRef.MaxBadRecords = 0
gcsRef.IgnoreUnknownValues = false
Expand Down Expand Up @@ -1175,7 +1193,14 @@ func (bq *BigQuery) Connect(ctx context.Context, warehouse model.Warehouse) (cli
func (bq *BigQuery) LoadTestTable(ctx context.Context, location, tableName string, _ map[string]interface{}, _ string) error {
gcsLocation := warehouseutils.GetGCSLocation(location, warehouseutils.GCSLocationOptions{})

gcsRef := bigquery.NewGCSReference(loadFolder(gcsLocation))
var gcsReference string
if bq.config.loadByFolderPath {
gcsReference = loadFolder(gcsLocation)

Check warning on line 1198 in warehouse/integrations/bigquery/bigquery.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/bigquery/bigquery.go#L1198

Added line #L1198 was not covered by tests
} else {
gcsReference = gcsLocation
}

gcsRef := bigquery.NewGCSReference(gcsReference)
gcsRef.SourceFormat = bigquery.JSON
gcsRef.MaxBadRecords = 0
gcsRef.IgnoreUnknownValues = false
Expand Down
99 changes: 57 additions & 42 deletions warehouse/integrations/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"os"
"slices"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -1098,47 +1099,59 @@ func TestIntegration(t *testing.T) {
require.Equal(t, records, whth.SampleTestRecords())
})
t.Run("multiple files", func(t *testing.T) {
tableName := "multiple_files_test_table"
repeat := 10
loadObjectFolder := "rudder-warehouse-load-objects"
sourceID := "test_source_id"
testCases := []struct {
name string
loadByFolderPath bool
}{
{name: "loadByFolderPath = false", loadByFolderPath: false},
{name: "loadByFolderPath = true", loadByFolderPath: true},
}
for i, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
tableName := "multiple_files_test_table" + strconv.Itoa(i)
repeat := 10
loadObjectFolder := "rudder-warehouse-load-objects"
sourceID := "test_source_id"

prefixes := []string{loadObjectFolder, tableName, sourceID, uuid.New().String() + "-" + tableName}
prefixes := []string{loadObjectFolder, tableName, sourceID, uuid.New().String() + "-" + tableName}

loadFiles := lo.RepeatBy(repeat, func(int) whutils.LoadFile {
sourceFile, err := os.Open("../testdata/load.json.gz")
require.NoError(t, err)
defer func() { _ = sourceFile.Close() }()
loadFiles := lo.RepeatBy(repeat, func(int) whutils.LoadFile {
sourceFile, err := os.Open("../testdata/load.json.gz")
require.NoError(t, err)
defer func() { _ = sourceFile.Close() }()

tempFile, err := os.CreateTemp("", "clone_*.json.gz")
require.NoError(t, err)
defer func() { _ = tempFile.Close() }()
tempFile, err := os.CreateTemp("", "clone_*.json.gz")
require.NoError(t, err)
defer func() { _ = tempFile.Close() }()

_, err = io.Copy(tempFile, sourceFile)
require.NoError(t, err)
_, err = io.Copy(tempFile, sourceFile)
require.NoError(t, err)

f, err := os.Open(tempFile.Name())
require.NoError(t, err)
defer func() { _ = f.Close() }()
f, err := os.Open(tempFile.Name())
require.NoError(t, err)
defer func() { _ = f.Close() }()

uploadOutput, err := fm.Upload(context.Background(), f, prefixes...)
require.NoError(t, err)
return whutils.LoadFile{Location: uploadOutput.Location}
})
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)
uploadOutput, err := fm.Upload(context.Background(), f, prefixes...)
require.NoError(t, err)
return whutils.LoadFile{Location: uploadOutput.Location}
})
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)

bq := whbigquery.New(config.New(), logger.NOP)
require.NoError(t, bq.Setup(ctx, warehouse, mockUploader))
require.NoError(t, bq.CreateSchema(ctx))
require.NoError(t, bq.CreateTable(ctx, tableName, schemaInWarehouse))
c := config.New()
c.Set("Warehouse.redshift.loadByFolderPath", tc.loadByFolderPath)

loadTableStat, err := bq.LoadTable(ctx, tableName)
require.NoError(t, err)
require.Equal(t, loadTableStat.RowsInserted, int64(repeat*14))
require.Equal(t, loadTableStat.RowsUpdated, int64(0))
bq := whbigquery.New(c, logger.NOP)
require.NoError(t, bq.Setup(ctx, warehouse, mockUploader))
require.NoError(t, bq.CreateSchema(ctx))
require.NoError(t, bq.CreateTable(ctx, tableName, schemaInWarehouse))

records := bqhelper.RetrieveRecordsFromWarehouse(t, db,
fmt.Sprintf(`
loadTableStat, err := bq.LoadTable(ctx, tableName)
require.NoError(t, err)
require.Equal(t, loadTableStat.RowsInserted, int64(repeat*14))
require.Equal(t, loadTableStat.RowsUpdated, int64(0))

records := bqhelper.RetrieveRecordsFromWarehouse(t, db,
fmt.Sprintf(`
SELECT
id,
received_at,
Expand All @@ -1150,17 +1163,19 @@ func TestIntegration(t *testing.T) {
FROM %s.%s
WHERE _PARTITIONTIME BETWEEN TIMESTAMP('%s') AND TIMESTAMP('%s')
ORDER BY id;`,
namespace,
tableName,
time.Now().Add(-24*time.Hour).Format("2006-01-02"),
time.Now().Add(+24*time.Hour).Format("2006-01-02"),
),
)
expectedRecords := make([][]string, 0, repeat)
for i := 0; i < repeat; i++ {
expectedRecords = append(expectedRecords, whth.SampleTestRecords()...)
namespace,
tableName,
time.Now().Add(-24*time.Hour).Format("2006-01-02"),
time.Now().Add(+24*time.Hour).Format("2006-01-02"),
),
)
expectedRecords := make([][]string, 0, repeat)
for i := 0; i < repeat; i++ {
expectedRecords = append(expectedRecords, whth.SampleTestRecords()...)
}
require.ElementsMatch(t, expectedRecords, records)
})
}
require.ElementsMatch(t, expectedRecords, records)
})
})

Expand Down
7 changes: 7 additions & 0 deletions warehouse/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,13 @@ func GetGCSLocationFolder(location string, options GCSLocationOptions) string {
return GetLocationFolder(GetGCSLocation(location, options))
}

func GetGCSLocations(loadFiles []LoadFile, options GCSLocationOptions) (gcsLocations []string) {
for _, loadFile := range loadFiles {
gcsLocations = append(gcsLocations, GetGCSLocation(loadFile.Location, options))
}
return
}

func GetLocationFolder(location string) string {
return location[:strings.LastIndex(location, "/")]
}
Expand Down
18 changes: 18 additions & 0 deletions warehouse/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,24 @@ func TestGetGCSLocationFolder(t *testing.T) {
}
}

func TestGetGCSLocations(t *testing.T) {
inputs := []LoadFile{
{Location: "https://storage.googleapis.com/test-bucket/test-object.csv"},
{Location: "https://storage.googleapis.com/my.test-bucket/test-object.csv"},
{Location: "https://storage.googleapis.com/my.test-bucket2/test-object.csv"},
{Location: "https://storage.googleapis.com/my.test-bucket/test-object2.csv"},
}
outputs := []string{
"gs://test-bucket/test-object.csv",
"gs://my.test-bucket/test-object.csv",
"gs://my.test-bucket2/test-object.csv",
"gs://my.test-bucket/test-object2.csv",
}

gcsLocations := GetGCSLocations(inputs, GCSLocationOptions{})
require.Equal(t, gcsLocations, outputs)
}

func TestGetAzureBlobLocation(t *testing.T) {
inputs := []struct {
location string
Expand Down

0 comments on commit 08a43ce

Please sign in to comment.