From 8f40634944e76038d981e72977dc5bc6725034a8 Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Tue, 25 Oct 2022 15:16:21 +0530 Subject: [PATCH 1/6] test(warehouse): added support for warehouse stats tests during integration tests. --- go.mod | 7 +- go.sum | 10 +++ warehouse/archiver.go | 102 ++++++++++++------------ warehouse/bigquery/bigquery_test.go | 4 + warehouse/clickhouse/clickhouse_test.go | 14 ++++ warehouse/datalake/datalake_test.go | 6 ++ warehouse/deltalake/deltalake.go | 16 ++-- warehouse/deltalake/deltalake_test.go | 10 +++ warehouse/docker-compose.test.yml | 17 ++++ warehouse/mssql/mssql_test.go | 2 + warehouse/postgres/postgres.go | 15 ++-- warehouse/postgres/postgres_test.go | 4 + warehouse/redshift/redshift_test.go | 2 + warehouse/snowflake/snowflake_test.go | 2 + warehouse/stats.go | 14 ++-- warehouse/testhelper/.env | 3 +- warehouse/testhelper/setup.go | 88 ++++++++++++++++++++ warehouse/utils/utils.go | 10 +-- warehouse/warehouse.go | 6 +- 19 files changed, 249 insertions(+), 83 deletions(-) diff --git a/go.mod b/go.mod index 3262cee54c..1baac7509f 100644 --- a/go.mod +++ b/go.mod @@ -160,6 +160,8 @@ require ( github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.3.0 + github.com/prometheus/common v0.37.0 github.com/rivo/uniseg v0.1.0 // indirect github.com/rs/xid v1.4.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect @@ -196,4 +198,7 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -require github.com/golang/protobuf v1.5.2 // indirect +require ( + github.com/golang/protobuf v1.5.2 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect +) diff --git a/go.sum b/go.sum index 445ec94b4e..01867c6d8e 100644 --- a/go.sum +++ b/go.sum @@ -560,10 +560,12 @@ github.com/go-ini/ini v1.63.2/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3I github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= +github.com/go-kit/log v0.2.0/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2CSIqUrmQPqA0gdRIlnLEY0gK5JGjh37zN5U= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= @@ -1010,6 +1012,7 @@ github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOq github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v1.14.10/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/maxbrunsfeld/counterfeiter/v6 v6.2.2/go.mod h1:eD9eIE7cdwcMi9rYluz88Jz2VyhSmden33/aXg4oVIY= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= @@ -1188,11 +1191,14 @@ github.com/prometheus/client_golang v1.1.0/go.mod h1:I1FGZT9+L76gKKOs5djB6ezCbFQ github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= +github.com/prometheus/client_golang v1.12.1/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= github.com/prometheus/client_model v0.0.0-20171117100541-99fa1f4be8e5/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvqdiQ7Xew4= +github.com/prometheus/client_model v0.3.0/go.mod h1:LDGWKZIo7rky3hgvBe+caln+Dr3dPggB5dvjtD7w9+w= github.com/prometheus/common v0.0.0-20180110214958-89604d197083/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= @@ -1201,6 +1207,9 @@ github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+ github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= github.com/prometheus/common v0.30.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= +github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= +github.com/prometheus/common v0.37.0 h1:ccBbHCgIiT9uSoFY0vX8H3zsNR5eLt17/RQLUvn8pXE= +github.com/prometheus/common v0.37.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA= github.com/prometheus/procfs v0.0.0-20180125133057-cb4147076ac7/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= @@ -1808,6 +1817,7 @@ golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/warehouse/archiver.go b/warehouse/archiver.go index 12410994eb..a7947e1883 100644 --- a/warehouse/archiver.go +++ b/warehouse/archiver.go @@ -89,19 +89,19 @@ func backupRecords(args backupRecordsArgs) (backupLocation string, err error) { } tmpl := fmt.Sprintf(` - SELECT - json_agg(dump_table) - FROM + SELECT + json_agg(dump_table) + FROM ( - SELECT - * - FROM - %[1]s - WHERE - %[2]s - ORDER BY - id ASC - LIMIT + SELECT + * + FROM + %[1]s + WHERE + %[2]s + ORDER BY + id ASC + LIMIT %[3]s offset %[4]s ) AS dump_table `, @@ -147,28 +147,28 @@ func usedRudderStorage(metadata []byte) bool { func archiveUploads(dbHandle *sql.DB) { pkgLogger.Infof(`Started archiving for warehouse`) sqlStatement := fmt.Sprintf(` - SELECT - id, - source_id, - destination_id, - start_staging_file_id, - end_staging_file_id, - start_load_file_id, - end_load_file_id, - metadata - FROM - %s - WHERE + SELECT + id, + source_id, + destination_id, + start_staging_file_id, + end_staging_file_id, + start_load_file_id, + end_load_file_id, + metadata + FROM + %s + WHERE ( ( metadata ->> 'archivedStagingAndLoadFiles' - ):: bool IS DISTINCT - FROM + ):: bool IS DISTINCT + FROM TRUE - ) - AND created_at < NOW() - INTERVAL '%d DAY' - AND status = '%s' - LIMIT + ) + AND created_at < NOW() - INTERVAL '%d DAY' + AND status = '%s' + LIMIT 10000; `, warehouseutils.WarehouseUploadsTable, @@ -225,15 +225,15 @@ func archiveUploads(dbHandle *sql.DB) { // archive staging files stmt := fmt.Sprintf(` - SELECT - id, - location - FROM - %s - WHERE - source_id = '%s' - AND destination_id = '%s' - AND id >= %d + SELECT + id, + location + FROM + %s + WHERE + source_id = '%s' + AND destination_id = '%s' + AND id >= %d and id <= %d; `, warehouseutils.WarehouseStagingFilesTable, @@ -302,9 +302,9 @@ func archiveUploads(dbHandle *sql.DB) { // delete staging file records stmt = fmt.Sprintf(` - DELETE FROM - %s - WHERE + DELETE FROM + %s + WHERE id IN (%v); `, warehouseutils.WarehouseStagingFilesTable, @@ -319,9 +319,9 @@ func archiveUploads(dbHandle *sql.DB) { // delete load file records stmt = fmt.Sprintf(` - DELETE FROM - %s - WHERE + DELETE FROM + %s + WHERE staging_file_id = ANY($1) RETURNING location; `, warehouseutils.WarehouseLoadFilesTable, @@ -371,11 +371,11 @@ func archiveUploads(dbHandle *sql.DB) { // update upload metadata u.uploadMetdata, _ = sjson.SetBytes(u.uploadMetdata, "archivedStagingAndLoadFiles", true) stmt = fmt.Sprintf(` - UPDATE - %s - SET - metadata = $1 - WHERE + UPDATE + %s + SET + metadata = $1 + WHERE id = %d; `, warehouseutils.WarehouseUploadsTable, @@ -398,7 +398,7 @@ func archiveUploads(dbHandle *sql.DB) { pkgLogger.Debugf(`[Archiver]: Archived upload: %d related staging files at: %s`, u.uploadID, storedStagingFilesLocation) } - stats.Default.NewTaggedStat("warehouse.archiver.numArchivedUploads", stats.CountType, map[string]string{ + stats.Default.NewTaggedStat("warehouse.archiver.numArchivedUploads", stats.CountType, stats.Tags{ "destination": u.destID, "source": u.sourceID, }).Count(1) diff --git a/warehouse/bigquery/bigquery_test.go b/warehouse/bigquery/bigquery_test.go index 45181f78c9..d5bf8909d5 100644 --- a/warehouse/bigquery/bigquery_test.go +++ b/warehouse/bigquery/bigquery_test.go @@ -107,6 +107,8 @@ func TestBigQueryIntegration(t *testing.T) { testhelper.VerifyEventsInLoadFiles(t, warehouseTest, loadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, tableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, mergeEventsMap()) + + testhelper.VerifyingWorkspaceIDInStats(t) }) t.Run("Append Mode", func(t *testing.T) { @@ -145,6 +147,8 @@ func TestBigQueryIntegration(t *testing.T) { testhelper.VerifyEventsInLoadFiles(t, warehouseTest, loadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, tableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, appendEventsMap()) + + testhelper.VerifyingWorkspaceIDInStats(t) }) } diff --git a/warehouse/clickhouse/clickhouse_test.go b/warehouse/clickhouse/clickhouse_test.go index 8f4a8a0e0a..e5198b6f33 100644 --- a/warehouse/clickhouse/clickhouse_test.go +++ b/warehouse/clickhouse/clickhouse_test.go @@ -31,6 +31,16 @@ type TestHandle struct { var handle *TestHandle +var statsToVerify = []string{ + "warehouse_clickhouse_commitTimeouts", + "warehouse_clickhouse_execTimeouts", + "warehouse_clickhouse_failedRetries", + "warehouse_clickhouse_commitTimeouts", + "warehouse_clickhouse_syncLoadFileTime", + "warehouse_clickhouse_downloadLoadFilesTime", + "warehouse_clickhouse_numRowsLoadFile", +} + func (*TestHandle) VerifyConnection() error { err := testhelper.WithConstantBackoff(func() (err error) { credentials := clickhouse.CredentialsT{ @@ -178,6 +188,8 @@ func TestClickHouseIntegration(t *testing.T) { testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, testhelper.WarehouseEventsMap()) + + testhelper.VerifyingWorkspaceIDInStats(t, statsToVerify...) }) t.Run("Cluster Mode Setup", func(t *testing.T) { @@ -230,6 +242,8 @@ func TestClickHouseIntegration(t *testing.T) { testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, clusterWarehouseEventsMap()) + + testhelper.VerifyingWorkspaceIDInStats(t, statsToVerify...) }) } diff --git a/warehouse/datalake/datalake_test.go b/warehouse/datalake/datalake_test.go index c7000f1b58..dd43917c63 100644 --- a/warehouse/datalake/datalake_test.go +++ b/warehouse/datalake/datalake_test.go @@ -66,6 +66,8 @@ func TestDatalakeIntegration(t *testing.T) { testhelper.VerifyEventsInStagingFiles(t, warehouseTest, testhelper.StagingFilesEventsMap()) testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) + + testhelper.VerifyingWorkspaceIDInStats(t) }) t.Run("AzureDatalake", func(t *testing.T) { t.Parallel() @@ -104,6 +106,8 @@ func TestDatalakeIntegration(t *testing.T) { testhelper.VerifyEventsInStagingFiles(t, warehouseTest, testhelper.StagingFilesEventsMap()) testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) + + testhelper.VerifyingWorkspaceIDInStats(t) }) t.Run("GCSDatalake", func(t *testing.T) { t.Parallel() @@ -146,6 +150,8 @@ func TestDatalakeIntegration(t *testing.T) { testhelper.VerifyEventsInStagingFiles(t, warehouseTest, testhelper.StagingFilesEventsMap()) testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) + + testhelper.VerifyingWorkspaceIDInStats(t) }) } diff --git a/warehouse/deltalake/deltalake.go b/warehouse/deltalake/deltalake.go index 385b88e999..54eec038ce 100644 --- a/warehouse/deltalake/deltalake.go +++ b/warehouse/deltalake/deltalake.go @@ -272,7 +272,7 @@ func (*HandleT) DeleteBy([]string, warehouseutils.DeleteByParams) error { // fetchTables fetch tables with tableNames func (dl *HandleT) fetchTables(dbT *databricks.DBHandleT, schema string) (tableNames []string, err error) { - fetchTablesExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, map[string]string{ + fetchTablesExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, stats.Tags{ "workspaceId": dl.Warehouse.WorkspaceID, "destination": dl.Warehouse.Destination.ID, "destType": dl.Warehouse.Type, @@ -357,7 +357,7 @@ func (dl *HandleT) partitionQuery(tableName string) (string, error) { // ExecuteSQL executes sql using grpc Client func (dl *HandleT) ExecuteSQL(sqlStatement, queryType string) (err error) { - execSqlStatTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, map[string]string{ + execSqlStatTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, stats.Tags{ "workspaceId": dl.Warehouse.WorkspaceID, "destination": dl.Warehouse.Destination.ID, "destType": dl.Warehouse.Type, @@ -392,7 +392,7 @@ func (*HandleT) ExecuteSQLClient(dbClient *databricks.DBHandleT, sqlStatement st // schemaExists checks it schema exists or not. func (dl *HandleT) schemaExists(schemaName string) (exists bool, err error) { - fetchSchemasExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, map[string]string{ + fetchSchemasExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, stats.Tags{ "workspaceId": dl.Warehouse.WorkspaceID, "destination": dl.Warehouse.Destination.ID, "destType": dl.Warehouse.Type, @@ -431,7 +431,7 @@ func (dl *HandleT) createSchema() (err error) { // dropStagingTables drops staging tables func (dl *HandleT) dropStagingTables(tableNames []string) { - dropTablesExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, map[string]string{ + dropTablesExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, stats.Tags{ "workspaceId": dl.Warehouse.WorkspaceID, "destination": dl.Warehouse.Destination.ID, "destType": dl.Warehouse.Type, @@ -805,7 +805,7 @@ func (dl *HandleT) connectToWarehouse() (dbHandleT *databricks.DBHandleT, err er Path: warehouseutils.GetConfigValue(DLPath, dl.Warehouse), Token: warehouseutils.GetConfigValue(DLToken, dl.Warehouse), } - connStat := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, map[string]string{ + connStat := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, stats.Tags{ "workspaceId": dl.Warehouse.WorkspaceID, "destination": dl.Warehouse.Destination.ID, "destType": dl.Warehouse.Type, @@ -817,7 +817,7 @@ func (dl *HandleT) connectToWarehouse() (dbHandleT *databricks.DBHandleT, err er connStat.Start() defer connStat.End() - closeConnStat := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, map[string]string{ + closeConnStat := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, stats.Tags{ "workspaceId": dl.Warehouse.WorkspaceID, "destination": dl.Warehouse.Destination.ID, "destType": dl.Warehouse.Type, @@ -936,7 +936,7 @@ func (dl *HandleT) FetchSchema(warehouse warehouseutils.Warehouse) (schema wareh filteredTablesNames = append(filteredTablesNames, tableName) } - fetchTablesAttributesExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, map[string]string{ + fetchTablesAttributesExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, stats.Tags{ "workspaceId": dl.Warehouse.WorkspaceID, "destination": dl.Warehouse.Destination.ID, "destType": dl.Warehouse.Type, @@ -1054,7 +1054,7 @@ func (*HandleT) DownloadIdentityRules(*misc.GZipWriter) (err error) { // GetTotalCountInTable returns total count in tables. func (dl *HandleT) GetTotalCountInTable(ctx context.Context, tableName string) (total int64, err error) { - fetchTotalCountExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, map[string]string{ + fetchTotalCountExecTime := stats.Default.NewTaggedStat("warehouse.deltalake.grpcExecTime", stats.TimerType, stats.Tags{ "workspaceId": dl.Warehouse.WorkspaceID, "destination": dl.Warehouse.Destination.ID, "destType": dl.Warehouse.Type, diff --git a/warehouse/deltalake/deltalake_test.go b/warehouse/deltalake/deltalake_test.go index 42a921cfc3..9afaa3e245 100644 --- a/warehouse/deltalake/deltalake_test.go +++ b/warehouse/deltalake/deltalake_test.go @@ -34,6 +34,12 @@ type TestHandle struct { var handle *TestHandle +var statsToVerify = []string{ + "warehouse_deltalake_grpcExecTime", + "warehouse_deltalake_grpcExecTime", + "warehouse_deltalake_healthTimeouts", +} + func (*TestHandle) VerifyConnection() error { credentials, err := testhelper.DatabricksCredentials() if err != nil { @@ -119,6 +125,8 @@ func TestDeltalakeIntegration(t *testing.T) { testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, mergeEventsMap()) + + testhelper.VerifyingWorkspaceIDInStats(t, statsToVerify...) }) t.Run("Append Mode", func(t *testing.T) { @@ -171,6 +179,8 @@ func TestDeltalakeIntegration(t *testing.T) { testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, appendEventsMap()) + + testhelper.VerifyingWorkspaceIDInStats(t, statsToVerify...) }) } diff --git a/warehouse/docker-compose.test.yml b/warehouse/docker-compose.test.yml index 6ac7499493..0937d9e7bf 100644 --- a/warehouse/docker-compose.test.yml +++ b/warehouse/docker-compose.test.yml @@ -161,6 +161,21 @@ services: test: wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1 interval: 1s retries: 25 + wh-statsd-exporter: + image: prom/statsd-exporter:latest + container_name: wh-statsd-exporter + ports: + - "9102" + - "8125" + - "8125/udp" + command: + - --statsd.listen-udp=:8125 + - --statsd.listen-tcp=:8125 + - --web.listen-address=:9102 + healthcheck: + test: wget --no-verbose --tries=1 --spider http://localhost:9102/metrics || exit 1 + interval: 1s + retries: 25 wh-backend: container_name: wh-backend depends_on: @@ -172,6 +187,8 @@ services: condition: service_healthy wh-minio: condition: service_healthy + wh-statsd-exporter: + condition: service_healthy wh-postgres: condition: service_healthy wh-clickhouse: diff --git a/warehouse/mssql/mssql_test.go b/warehouse/mssql/mssql_test.go index 9ff59c60ef..cb305d1890 100644 --- a/warehouse/mssql/mssql_test.go +++ b/warehouse/mssql/mssql_test.go @@ -92,6 +92,8 @@ func TestMSSQLIntegration(t *testing.T) { testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, testhelper.WarehouseEventsMap()) + + testhelper.VerifyingWorkspaceIDInStats(t) } func TestMSSQLConfigurationValidation(t *testing.T) { diff --git a/warehouse/postgres/postgres.go b/warehouse/postgres/postgres.go index 0d110f18b1..08b63395e8 100644 --- a/warehouse/postgres/postgres.go +++ b/warehouse/postgres/postgres.go @@ -237,11 +237,11 @@ func (pg *HandleT) DownloadLoadFiles(tableName string) ([]string, error) { return fileNames, nil } -func handleRollbackTimeout(tags map[string]string) { +func handleRollbackTimeout(tags stats.Tags) { stats.Default.NewTaggedStat("pg_rollback_timeout", stats.CountType, tags).Count(1) } -func runRollbackWithTimeout(f func() error, onTimeout func(map[string]string), d time.Duration, tags map[string]string) { +func runRollbackWithTimeout(f func() error, onTimeout func(tags stats.Tags), d time.Duration, tags stats.Tags) { c := make(chan struct{}) go func() { defer close(c) @@ -269,7 +269,7 @@ func (pg *HandleT) loadTable(tableName string, tableSchemaInUpload warehouseutil pkgLogger.Infof("PG: Starting load for table:%s", tableName) // tags - tags := map[string]string{ + tags := stats.Tags{ "workspaceId": pg.Warehouse.WorkspaceID, "namepsace": pg.Namespace, "destinationID": pg.Warehouse.Destination.ID, @@ -561,10 +561,11 @@ func (pg *HandleT) loadUserTables() (errorMap map[string]error) { sqlStatement = fmt.Sprintf(`DELETE FROM "%[1]s"."%[2]s" using "%[1]s"."%[3]s" _source where (_source.%[4]s = %[1]s.%[2]s.%[4]s)`, pg.Namespace, warehouseutils.UsersTable, stagingTableName, primaryKey) pkgLogger.Infof("PG: Dedup records for table:%s using staging table: %s\n", warehouseutils.UsersTable, sqlStatement) // tags - tags := map[string]string{ - "namespace": pg.Namespace, - "destId": pg.Warehouse.Destination.ID, - "tableName": warehouseutils.UsersTable, + tags := stats.Tags{ + "workspaceId": pg.Warehouse.WorkspaceID, + "namespace": pg.Namespace, + "destId": pg.Warehouse.Destination.ID, + "tableName": warehouseutils.UsersTable, } err = handleExec(&QueryParams{txn: tx, query: sqlStatement, enableWithQueryPlan: enableSQLStatementExecutionPlan}) if err != nil { diff --git a/warehouse/postgres/postgres_test.go b/warehouse/postgres/postgres_test.go index b4a9f480ca..187d7f53fc 100644 --- a/warehouse/postgres/postgres_test.go +++ b/warehouse/postgres/postgres_test.go @@ -26,6 +26,8 @@ type TestHandle struct { var handle *TestHandle +var statsToVerify = []string{"pg_rollback_timeout"} + func (*TestHandle) VerifyConnection() error { return testhelper.WithConstantBackoff(func() (err error) { credentials := postgres.CredentialsT{ @@ -91,6 +93,8 @@ func TestPostgresIntegration(t *testing.T) { testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, testhelper.WarehouseEventsMap()) + + testhelper.VerifyingWorkspaceIDInStats(t, statsToVerify...) } func TestPostgresConfigurationValidation(t *testing.T) { diff --git a/warehouse/redshift/redshift_test.go b/warehouse/redshift/redshift_test.go index ce9db34005..c78d4e8bca 100644 --- a/warehouse/redshift/redshift_test.go +++ b/warehouse/redshift/redshift_test.go @@ -106,6 +106,8 @@ func TestRedshiftIntegration(t *testing.T) { testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, testhelper.WarehouseEventsMap()) + + testhelper.VerifyingWorkspaceIDInStats(t) } func TestRedshiftConfigurationValidation(t *testing.T) { diff --git a/warehouse/snowflake/snowflake_test.go b/warehouse/snowflake/snowflake_test.go index 25ef316b14..e4091bc4fc 100644 --- a/warehouse/snowflake/snowflake_test.go +++ b/warehouse/snowflake/snowflake_test.go @@ -139,6 +139,8 @@ func TestSnowflakeIntegration(t *testing.T) { testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, testhelper.WarehouseEventsMap()) + + testhelper.VerifyingWorkspaceIDInStats(t) }) } } diff --git a/warehouse/stats.go b/warehouse/stats.go index b0443b60b7..cd412a128f 100644 --- a/warehouse/stats.go +++ b/warehouse/stats.go @@ -33,7 +33,7 @@ func (jobRun *JobRunT) warehouseID() string { } func (job *UploadJobT) timerStat(name string, extraTags ...tag) stats.Measurement { - tags := map[string]string{ + tags := stats.Tags{ "module": moduleName, "destType": job.warehouse.Type, "warehouseID": job.warehouseID(), @@ -48,7 +48,7 @@ func (job *UploadJobT) timerStat(name string, extraTags ...tag) stats.Measuremen } func (job *UploadJobT) counterStat(name string, extraTags ...tag) stats.Measurement { - tags := map[string]string{ + tags := stats.Tags{ "module": moduleName, "destType": job.warehouse.Type, "warehouseID": job.warehouseID(), @@ -63,7 +63,7 @@ func (job *UploadJobT) counterStat(name string, extraTags ...tag) stats.Measurem } func (job *UploadJobT) guageStat(name string, extraTags ...tag) stats.Measurement { - tags := map[string]string{ + tags := stats.Tags{ "module": moduleName, "destType": job.warehouse.Type, "warehouseID": job.warehouseID(), @@ -79,7 +79,7 @@ func (job *UploadJobT) guageStat(name string, extraTags ...tag) stats.Measuremen } func (jobRun *JobRunT) timerStat(name string, extraTags ...tag) stats.Measurement { - tags := map[string]string{ + tags := stats.Tags{ "module": moduleName, "destType": jobRun.job.DestinationType, "warehouseID": jobRun.warehouseID(), @@ -94,7 +94,7 @@ func (jobRun *JobRunT) timerStat(name string, extraTags ...tag) stats.Measuremen } func (jobRun *JobRunT) counterStat(name string, extraTags ...tag) stats.Measurement { - tags := map[string]string{ + tags := stats.Tags{ "module": moduleName, "destType": jobRun.job.DestinationType, "warehouseID": jobRun.warehouseID(), @@ -221,7 +221,7 @@ func (job *UploadJobT) recordLoadFileGenerationTimeStat(startID, endID int64) (e } func getUploadStatusStat(name string, warehouse warehouseutils.Warehouse) stats.Measurement { - tags := map[string]string{ + tags := stats.Tags{ "workspaceId": warehouse.WorkspaceID, "module": moduleName, "destType": warehouse.Type, @@ -236,7 +236,7 @@ func getUploadStatusStat(name string, warehouse warehouseutils.Warehouse) stats. } func persistSSLFileErrorStat(workspaceID, destType, destName, destID, sourceName, sourceID, errTag string) { - tags := map[string]string{ + tags := stats.Tags{ "workspaceId": workspaceID, "module": moduleName, "destType": destType, diff --git a/warehouse/testhelper/.env b/warehouse/testhelper/.env index 1e01018eca..8aa75f37b2 100644 --- a/warehouse/testhelper/.env +++ b/warehouse/testhelper/.env @@ -26,7 +26,7 @@ LOG_LEVEL=INFO INSTANCE_ID=1 ALERT_PROVIDER=pagerduty CONFIG_PATH="../../config/config.yaml" -ENABLE_STATS=false +STATSD_SERVER_URL="wh-statsd-exporter:8125" DATABRICKS_CONNECTOR_URL=wh-databricks-connector:50051 DEST_TRANSFORM_URL=http://wh-transformer:9090 @@ -45,7 +45,6 @@ RSERVER_WAREHOUSE_ENABLE_JITTER_FOR_SYNCS=false RSERVER_EVENT_SCHEMAS_ENABLE_EVENT_SCHEMAS_FEATURE=false RSERVER_EVENT_SCHEMAS_SYNC_INTERVAL=15 -RSERVER_ENABLE_STATS=false RSERVER_BACKEND_CONFIG_CONFIG_FROM_FILE=true RUDDER_ADMIN_PASSWORD=password diff --git a/warehouse/testhelper/setup.go b/warehouse/testhelper/setup.go index 028f9cde2f..7e3cf17481 100644 --- a/warehouse/testhelper/setup.go +++ b/warehouse/testhelper/setup.go @@ -5,7 +5,11 @@ import ( "database/sql" "encoding/json" "fmt" + promCLient "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "log" + "net/http" "os" "strconv" "strings" @@ -430,6 +434,90 @@ func VerifyingConfigurationTest(t *testing.T, destination backendconfig.Destinat t.Logf("Completed configuration tests for destination type: %s", destination.DestinationDefinition.Name) } +func VerifyingWorkspaceIDInStats(t *testing.T, extraStats ...string) { + t.Helper() + t.Logf("Started verifying workspaceID in stats") + + var statsToVerify []string + statsToVerify = append(statsToVerify, extraStats...) + statsToVerify = append(statsToVerify, []string{ + // Miscellaneous + "wh_scheduler_create_upload_jobs", + "wh_scheduler_pending_staging_files", + "warehouse_rudder_missing_datatype", + "warehouse_long_running_upload", + "warehouse_successful_upload_exists", + "persist_ssl_file_failure", + + // Timer stats + "load_file_generation_time", + "event_delivery_time", + "identity_tables_load_time", + "other_tables_load_time", + "user_tables_load_time", + "upload_time", + "download_staging_file_time", + "staging_files_total_processing_time", + "process_staging_file_time", + "load_file_upload_time", + + // Counter stats + "total_rows_synced", + "num_staged_events", + "upload_aborted", + "num_staged_events", + "upload_success", + "event_delivery", + "rows_synced", + "staging_files_processed", + "bytes_processed_in_staging_file", + + //Gauge stats + "pre_load_table_rows", + "post_load_table_rows_estimate", + "post_load_table_rows", + }...) + mf := prometheusStats(t) + + for _, statToVerify := range statsToVerify { + if ps, ok := mf[statToVerify]; ok { + for _, metric := range ps.GetMetric() { + found := false + for _, label := range metric.GetLabel() { + if label.GetName() == "workspaceId" { + require.NotEmptyf(t, label.GetValue(), "workspaceId is empty for stat: %s", statToVerify) + found = true + break + } + } + require.True(t, found, "workspaceId not found in stat: %s", statToVerify) + } + } + } + + t.Logf("Completed verifying workspaceID in stats") +} + +func prometheusStats(t *testing.T) map[string]*promCLient.MetricFamily { + t.Helper() + + req, err := http.NewRequestWithContext(context.Background(), "GET", "http://wh-statsd-exporter:9102/metrics", http.NoBody) + require.NoError(t, err) + + httpClient := &http.Client{Timeout: 5 * time.Second} + resp, err := httpClient.Do(req) + require.NoError(t, err) + require.NotNil(t, resp) + require.NotNil(t, resp.Body) + + defer func() { _ = resp.Body.Close() }() + + var parser expfmt.TextParser + mf, err := parser.TextToMetricFamilies(resp.Body) + require.NoError(t, err) + return mf +} + func queryCount(cl *client.Client, statement string) (int64, error) { result, err := cl.Query(statement) if err != nil || result.Values == nil { diff --git a/warehouse/utils/utils.go b/warehouse/utils/utils.go index 48c89d8f95..f7fc6016e1 100644 --- a/warehouse/utils/utils.go +++ b/warehouse/utils/utils.go @@ -816,8 +816,8 @@ type Tag struct { } func NewTimerStat(name string, extraTags ...Tag) stats.Measurement { - tags := map[string]string{ - "module": "warehouse", + tags := stats.Tags{ + "module": WAREHOUSE, } for _, extraTag := range extraTags { tags[extraTag.Name] = extraTag.Value @@ -826,8 +826,8 @@ func NewTimerStat(name string, extraTags ...Tag) stats.Measurement { } func NewCounterStat(name string, extraTags ...Tag) stats.Measurement { - tags := map[string]string{ - "module": "warehouse", + tags := stats.Tags{ + "module": WAREHOUSE, } for _, extraTag := range extraTags { tags[extraTag.Name] = extraTag.Value @@ -836,7 +836,7 @@ func NewCounterStat(name string, extraTags ...Tag) stats.Measurement { } func WHCounterStat(name string, warehouse *Warehouse, extraTags ...Tag) stats.Measurement { - tags := map[string]string{ + tags := stats.Tags{ "module": WAREHOUSE, "destType": warehouse.Type, "workspaceId": warehouse.WorkspaceID, diff --git a/warehouse/warehouse.go b/warehouse/warehouse.go index f194e14025..79cba136e1 100644 --- a/warehouse/warehouse.go +++ b/warehouse/warehouse.go @@ -301,6 +301,7 @@ func (wh *HandleT) backendConfigSubscriber() { } namespace := wh.getNamespace(destination.Config, source, destination, wh.destType) warehouse := warehouseutils.Warehouse{ + WorkspaceID: workspaceID, Source: source, Destination: destination, Namespace: namespace, @@ -1423,6 +1424,7 @@ func minimalConfigSubscriber() { connectionsMap[destination.ID] = map[string]warehouseutils.Warehouse{} } connectionsMap[destination.ID][source.ID] = warehouseutils.Warehouse{ + WorkspaceID: workspaceID, Destination: destination, Namespace: namespace, Type: wh.destType, @@ -1630,9 +1632,9 @@ func processHandler(w http.ResponseWriter, r *http.Request) { panic(err) } - stats.Default.NewTaggedStat("rows_staged", stats.CountType, map[string]string{ + stats.Default.NewTaggedStat("rows_staged", stats.CountType, stats.Tags{ "workspace_id": stagingFile.WorkspaceID, - "module": "warehouse", + "module": moduleName, "destType": stagingFile.BatchDestination.Destination.DestinationDefinition.Name, "warehouseID": getWarehouseTagName( stagingFile.BatchDestination.Destination.ID, From 4bf22b87c9a4e18c1c2b980ea3976a97b6551201 Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Tue, 25 Oct 2022 15:38:43 +0530 Subject: [PATCH 2/6] some more changes. --- warehouse/testhelper/setup.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/warehouse/testhelper/setup.go b/warehouse/testhelper/setup.go index 7e3cf17481..7819d4533b 100644 --- a/warehouse/testhelper/setup.go +++ b/warehouse/testhelper/setup.go @@ -5,9 +5,6 @@ import ( "database/sql" "encoding/json" "fmt" - promCLient "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - "log" "net/http" "os" @@ -16,6 +13,9 @@ import ( "testing" "time" + promCLient "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "github.com/minio/minio-go/v6" "github.com/rudderlabs/rudder-server/admin" @@ -438,7 +438,11 @@ func VerifyingWorkspaceIDInStats(t *testing.T, extraStats ...string) { t.Helper() t.Logf("Started verifying workspaceID in stats") - var statsToVerify []string + var ( + statsToVerify []string + workspaceID = "BpLnfgDsc2WD8F2qNfHK5a84jjJ" + ) + statsToVerify = append(statsToVerify, extraStats...) statsToVerify = append(statsToVerify, []string{ // Miscellaneous @@ -472,7 +476,7 @@ func VerifyingWorkspaceIDInStats(t *testing.T, extraStats ...string) { "staging_files_processed", "bytes_processed_in_staging_file", - //Gauge stats + // Gauge stats "pre_load_table_rows", "post_load_table_rows_estimate", "post_load_table_rows", @@ -485,12 +489,12 @@ func VerifyingWorkspaceIDInStats(t *testing.T, extraStats ...string) { found := false for _, label := range metric.GetLabel() { if label.GetName() == "workspaceId" { - require.NotEmptyf(t, label.GetValue(), "workspaceId is empty for stat: %s", statToVerify) + require.Equalf(t, label.GetValue(), workspaceID, "workspaceId is empty for stat: %s", statToVerify) found = true break } } - require.True(t, found, "workspaceId not found in stat: %s", statToVerify) + require.Truef(t, found, "workspaceId not found in stat: %s", statToVerify) } } } From 857f253cd4b08c0365e48226726a041bd3b48062 Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Tue, 25 Oct 2022 16:23:16 +0530 Subject: [PATCH 3/6] added workspaceID clause in warehouse integration tests. --- warehouse/testhelper/setup.go | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/warehouse/testhelper/setup.go b/warehouse/testhelper/setup.go index 7819d4533b..6804e163f5 100644 --- a/warehouse/testhelper/setup.go +++ b/warehouse/testhelper/setup.go @@ -191,6 +191,7 @@ func VerifyEventsInStagingFiles(t testing.TB, wareHouseTest *WareHouseTest, even var ( tableName = "wh_staging_files" + workspaceID = "BpLnfgDsc2WD8F2qNfHK5a84jjJ" stagingFileEvents int sqlStatement string operation func() bool @@ -208,15 +209,17 @@ func VerifyEventsInStagingFiles(t testing.TB, wareHouseTest *WareHouseTest, even sqlStatement = ` SELECT - COALESCE(SUM(total_events)) AS sum + COALESCE(SUM(total_events)) AS sum FROM - wh_staging_files + wh_staging_files WHERE - source_id = $1 - AND destination_id = $2 - AND created_at > $3; + source_id = $1 AND + workspace_id = $2 AND + destination_id = $3 AND + created_at > $4; ` - t.Logf("Checking events in staging files for sourceID: %s, DestinationID: %s, TimestampBeforeSendingEvents: %s, sqlStatement: %s", + t.Logf("Checking events in staging files for workspaceID: %s, sourceID: %s, DestinationID: %s, TimestampBeforeSendingEvents: %s, sqlStatement: %s", + workspaceID, wareHouseTest.SourceID, wareHouseTest.DestinationID, wareHouseTest.TimestampBeforeSendingEvents, @@ -225,6 +228,7 @@ func VerifyEventsInStagingFiles(t testing.TB, wareHouseTest *WareHouseTest, even operation = func() bool { err = jobsDB.DB.QueryRow( sqlStatement, + workspaceID, wareHouseTest.SourceID, wareHouseTest.DestinationID, wareHouseTest.TimestampBeforeSendingEvents, @@ -299,6 +303,7 @@ func VerifyEventsInTableUploads(t testing.TB, wareHouseTest *WareHouseTest, even t.Logf("Started verifying events in table uploads") var ( + workspaceID = "BpLnfgDsc2WD8F2qNfHK5a84jjJ" tableUploadEvents int sqlStatement string operation func() bool @@ -325,13 +330,15 @@ func VerifyEventsInTableUploads(t testing.TB, wareHouseTest *WareHouseTest, even wh_uploads ON wh_uploads.id = wh_table_uploads.wh_upload_id WHERE - wh_uploads.source_id = $1 - AND wh_uploads.destination_id = $2 - AND wh_uploads.created_at > $3 - AND wh_table_uploads.table_name = $4 - AND wh_table_uploads.status = 'exported_data'; + wh_uploads.workspace_id = $1 AND + wh_uploads.source_id = $2 AND + wh_uploads.destination_id = $3 AND + wh_uploads.created_at > $4 AND + wh_table_uploads.table_name = $5 AND + wh_table_uploads.status = 'exported_data'; ` - t.Logf("Checking events in table uploads for sourceID: %s, DestinationID: %s, TimestampBeforeSendingEvents: %s, table: %s, sqlStatement: %s", + t.Logf("Checking events in table uploads for workspaceID: %s, sourceID: %s, DestinationID: %s, TimestampBeforeSendingEvents: %s, table: %s, sqlStatement: %s", + workspaceID, wareHouseTest.SourceID, wareHouseTest.DestinationID, wareHouseTest.TimestampBeforeSendingEvents, @@ -341,6 +348,7 @@ func VerifyEventsInTableUploads(t testing.TB, wareHouseTest *WareHouseTest, even operation = func() bool { err = jobsDB.DB.QueryRow( sqlStatement, + workspaceID, wareHouseTest.SourceID, wareHouseTest.DestinationID, wareHouseTest.TimestampBeforeSendingEvents, From 3d737e573c1b047b39c56f49e75838cb093662eb Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Tue, 25 Oct 2022 16:29:00 +0530 Subject: [PATCH 4/6] added workspaceID clause in warehouse integration tests. --- warehouse/testhelper/setup.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/warehouse/testhelper/setup.go b/warehouse/testhelper/setup.go index 6804e163f5..558ddab782 100644 --- a/warehouse/testhelper/setup.go +++ b/warehouse/testhelper/setup.go @@ -213,8 +213,8 @@ func VerifyEventsInStagingFiles(t testing.TB, wareHouseTest *WareHouseTest, even FROM wh_staging_files WHERE - source_id = $1 AND - workspace_id = $2 AND + workspace_id = $1 AND + source_id = $2 AND destination_id = $3 AND created_at > $4; ` From 2307c4577342a6114c6af627d0d7a951386a99d2 Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Tue, 25 Oct 2022 16:41:32 +0530 Subject: [PATCH 5/6] use statsd-exporter for integration test. --- warehouse/docker-compose.test.yml | 6 +++--- warehouse/testhelper/.env | 2 +- warehouse/testhelper/setup.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/warehouse/docker-compose.test.yml b/warehouse/docker-compose.test.yml index 0937d9e7bf..de28ff4965 100644 --- a/warehouse/docker-compose.test.yml +++ b/warehouse/docker-compose.test.yml @@ -161,9 +161,9 @@ services: test: wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1 interval: 1s retries: 25 - wh-statsd-exporter: + statsd-exporter: image: prom/statsd-exporter:latest - container_name: wh-statsd-exporter + container_name: statsd-exporter ports: - "9102" - "8125" @@ -187,7 +187,7 @@ services: condition: service_healthy wh-minio: condition: service_healthy - wh-statsd-exporter: + statsd-exporter: condition: service_healthy wh-postgres: condition: service_healthy diff --git a/warehouse/testhelper/.env b/warehouse/testhelper/.env index 8aa75f37b2..7d2484220a 100644 --- a/warehouse/testhelper/.env +++ b/warehouse/testhelper/.env @@ -26,7 +26,7 @@ LOG_LEVEL=INFO INSTANCE_ID=1 ALERT_PROVIDER=pagerduty CONFIG_PATH="../../config/config.yaml" -STATSD_SERVER_URL="wh-statsd-exporter:8125" +STATSD_SERVER_URL="statsd-exporter:8125" DATABRICKS_CONNECTOR_URL=wh-databricks-connector:50051 DEST_TRANSFORM_URL=http://wh-transformer:9090 diff --git a/warehouse/testhelper/setup.go b/warehouse/testhelper/setup.go index 558ddab782..28ef5d3d01 100644 --- a/warehouse/testhelper/setup.go +++ b/warehouse/testhelper/setup.go @@ -513,7 +513,7 @@ func VerifyingWorkspaceIDInStats(t *testing.T, extraStats ...string) { func prometheusStats(t *testing.T) map[string]*promCLient.MetricFamily { t.Helper() - req, err := http.NewRequestWithContext(context.Background(), "GET", "http://wh-statsd-exporter:9102/metrics", http.NoBody) + req, err := http.NewRequestWithContext(context.Background(), "GET", "http://statsd-exporter:9102/metrics", http.NoBody) require.NoError(t, err) httpClient := &http.Client{Timeout: 5 * time.Second} From 0d79c0531fb2692e88afebd8e046e643a32c027a Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Wed, 26 Oct 2022 09:14:35 +0530 Subject: [PATCH 6/6] review comments --- warehouse/bigquery/bigquery_test.go | 4 ++-- warehouse/clickhouse/clickhouse_test.go | 5 ++--- warehouse/datalake/datalake_test.go | 6 +++--- warehouse/deltalake/deltalake_test.go | 5 ++--- warehouse/docker-compose.test.yml | 4 ---- warehouse/mssql/mssql_test.go | 2 +- warehouse/postgres/postgres_test.go | 2 +- warehouse/redshift/redshift_test.go | 2 +- warehouse/snowflake/snowflake_test.go | 2 +- warehouse/testhelper/setup.go | 2 +- 10 files changed, 14 insertions(+), 20 deletions(-) diff --git a/warehouse/bigquery/bigquery_test.go b/warehouse/bigquery/bigquery_test.go index d5bf8909d5..b11da1be9c 100644 --- a/warehouse/bigquery/bigquery_test.go +++ b/warehouse/bigquery/bigquery_test.go @@ -108,7 +108,7 @@ func TestBigQueryIntegration(t *testing.T) { testhelper.VerifyEventsInTableUploads(t, warehouseTest, tableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, mergeEventsMap()) - testhelper.VerifyingWorkspaceIDInStats(t) + testhelper.VerifyWorkspaceIDInStats(t) }) t.Run("Append Mode", func(t *testing.T) { @@ -148,7 +148,7 @@ func TestBigQueryIntegration(t *testing.T) { testhelper.VerifyEventsInTableUploads(t, warehouseTest, tableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, appendEventsMap()) - testhelper.VerifyingWorkspaceIDInStats(t) + testhelper.VerifyWorkspaceIDInStats(t) }) } diff --git a/warehouse/clickhouse/clickhouse_test.go b/warehouse/clickhouse/clickhouse_test.go index e5198b6f33..210c4a5c79 100644 --- a/warehouse/clickhouse/clickhouse_test.go +++ b/warehouse/clickhouse/clickhouse_test.go @@ -35,7 +35,6 @@ var statsToVerify = []string{ "warehouse_clickhouse_commitTimeouts", "warehouse_clickhouse_execTimeouts", "warehouse_clickhouse_failedRetries", - "warehouse_clickhouse_commitTimeouts", "warehouse_clickhouse_syncLoadFileTime", "warehouse_clickhouse_downloadLoadFilesTime", "warehouse_clickhouse_numRowsLoadFile", @@ -189,7 +188,7 @@ func TestClickHouseIntegration(t *testing.T) { testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, testhelper.WarehouseEventsMap()) - testhelper.VerifyingWorkspaceIDInStats(t, statsToVerify...) + testhelper.VerifyWorkspaceIDInStats(t, statsToVerify...) }) t.Run("Cluster Mode Setup", func(t *testing.T) { @@ -243,7 +242,7 @@ func TestClickHouseIntegration(t *testing.T) { testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, clusterWarehouseEventsMap()) - testhelper.VerifyingWorkspaceIDInStats(t, statsToVerify...) + testhelper.VerifyWorkspaceIDInStats(t, statsToVerify...) }) } diff --git a/warehouse/datalake/datalake_test.go b/warehouse/datalake/datalake_test.go index dd43917c63..ab89cbb48e 100644 --- a/warehouse/datalake/datalake_test.go +++ b/warehouse/datalake/datalake_test.go @@ -67,7 +67,7 @@ func TestDatalakeIntegration(t *testing.T) { testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) - testhelper.VerifyingWorkspaceIDInStats(t) + testhelper.VerifyWorkspaceIDInStats(t) }) t.Run("AzureDatalake", func(t *testing.T) { t.Parallel() @@ -107,7 +107,7 @@ func TestDatalakeIntegration(t *testing.T) { testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) - testhelper.VerifyingWorkspaceIDInStats(t) + testhelper.VerifyWorkspaceIDInStats(t) }) t.Run("GCSDatalake", func(t *testing.T) { t.Parallel() @@ -151,7 +151,7 @@ func TestDatalakeIntegration(t *testing.T) { testhelper.VerifyEventsInLoadFiles(t, warehouseTest, testhelper.LoadFilesEventsMap()) testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) - testhelper.VerifyingWorkspaceIDInStats(t) + testhelper.VerifyWorkspaceIDInStats(t) }) } diff --git a/warehouse/deltalake/deltalake_test.go b/warehouse/deltalake/deltalake_test.go index 9afaa3e245..57ddefb921 100644 --- a/warehouse/deltalake/deltalake_test.go +++ b/warehouse/deltalake/deltalake_test.go @@ -35,7 +35,6 @@ type TestHandle struct { var handle *TestHandle var statsToVerify = []string{ - "warehouse_deltalake_grpcExecTime", "warehouse_deltalake_grpcExecTime", "warehouse_deltalake_healthTimeouts", } @@ -126,7 +125,7 @@ func TestDeltalakeIntegration(t *testing.T) { testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, mergeEventsMap()) - testhelper.VerifyingWorkspaceIDInStats(t, statsToVerify...) + testhelper.VerifyWorkspaceIDInStats(t, statsToVerify...) }) t.Run("Append Mode", func(t *testing.T) { @@ -180,7 +179,7 @@ func TestDeltalakeIntegration(t *testing.T) { testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, appendEventsMap()) - testhelper.VerifyingWorkspaceIDInStats(t, statsToVerify...) + testhelper.VerifyWorkspaceIDInStats(t, statsToVerify...) }) } diff --git a/warehouse/docker-compose.test.yml b/warehouse/docker-compose.test.yml index de28ff4965..71768ed0dd 100644 --- a/warehouse/docker-compose.test.yml +++ b/warehouse/docker-compose.test.yml @@ -172,10 +172,6 @@ services: - --statsd.listen-udp=:8125 - --statsd.listen-tcp=:8125 - --web.listen-address=:9102 - healthcheck: - test: wget --no-verbose --tries=1 --spider http://localhost:9102/metrics || exit 1 - interval: 1s - retries: 25 wh-backend: container_name: wh-backend depends_on: diff --git a/warehouse/mssql/mssql_test.go b/warehouse/mssql/mssql_test.go index cb305d1890..7bc133e02c 100644 --- a/warehouse/mssql/mssql_test.go +++ b/warehouse/mssql/mssql_test.go @@ -93,7 +93,7 @@ func TestMSSQLIntegration(t *testing.T) { testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, testhelper.WarehouseEventsMap()) - testhelper.VerifyingWorkspaceIDInStats(t) + testhelper.VerifyWorkspaceIDInStats(t) } func TestMSSQLConfigurationValidation(t *testing.T) { diff --git a/warehouse/postgres/postgres_test.go b/warehouse/postgres/postgres_test.go index 187d7f53fc..67177863cc 100644 --- a/warehouse/postgres/postgres_test.go +++ b/warehouse/postgres/postgres_test.go @@ -94,7 +94,7 @@ func TestPostgresIntegration(t *testing.T) { testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, testhelper.WarehouseEventsMap()) - testhelper.VerifyingWorkspaceIDInStats(t, statsToVerify...) + testhelper.VerifyWorkspaceIDInStats(t, statsToVerify...) } func TestPostgresConfigurationValidation(t *testing.T) { diff --git a/warehouse/redshift/redshift_test.go b/warehouse/redshift/redshift_test.go index c78d4e8bca..02865979c6 100644 --- a/warehouse/redshift/redshift_test.go +++ b/warehouse/redshift/redshift_test.go @@ -107,7 +107,7 @@ func TestRedshiftIntegration(t *testing.T) { testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, testhelper.WarehouseEventsMap()) - testhelper.VerifyingWorkspaceIDInStats(t) + testhelper.VerifyWorkspaceIDInStats(t) } func TestRedshiftConfigurationValidation(t *testing.T) { diff --git a/warehouse/snowflake/snowflake_test.go b/warehouse/snowflake/snowflake_test.go index e4091bc4fc..8789b69935 100644 --- a/warehouse/snowflake/snowflake_test.go +++ b/warehouse/snowflake/snowflake_test.go @@ -140,7 +140,7 @@ func TestSnowflakeIntegration(t *testing.T) { testhelper.VerifyEventsInTableUploads(t, warehouseTest, testhelper.TableUploadsEventsMap()) testhelper.VerifyEventsInWareHouse(t, warehouseTest, testhelper.WarehouseEventsMap()) - testhelper.VerifyingWorkspaceIDInStats(t) + testhelper.VerifyWorkspaceIDInStats(t) }) } } diff --git a/warehouse/testhelper/setup.go b/warehouse/testhelper/setup.go index 28ef5d3d01..31b617bae7 100644 --- a/warehouse/testhelper/setup.go +++ b/warehouse/testhelper/setup.go @@ -442,7 +442,7 @@ func VerifyingConfigurationTest(t *testing.T, destination backendconfig.Destinat t.Logf("Completed configuration tests for destination type: %s", destination.DestinationDefinition.Name) } -func VerifyingWorkspaceIDInStats(t *testing.T, extraStats ...string) { +func VerifyWorkspaceIDInStats(t *testing.T, extraStats ...string) { t.Helper() t.Logf("Started verifying workspaceID in stats")