Skip to content

Commit

Permalink
fix: warehouse sync job queries for snowflake and bigquery (#2740)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Nov 29, 2022
1 parent 6ac8c31 commit d33d8c3
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 51 deletions.
20 changes: 12 additions & 8 deletions warehouse/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,19 @@ func (bq *HandleT) dropStagingTable(stagingTableName string) {

func (bq *HandleT) DeleteBy(tableNames []string, params warehouseutils.DeleteByParams) error {
pkgLogger.Infof("BQ: Cleaning up the following tables in bigquery for BQ:%s : %v", tableNames)

for _, tb := range tableNames {
sqlStatement := fmt.Sprintf(`DELETE FROM "%[1]s"."%[2]s" WHERE
context_sources_job_run_id <> @jobrunid AND
context_sources_task_run_id <> @taskrunid AND
context_source_id = @sourceid AND
received_at < @starttime`,
bq.namespace,
tb,
tableName := fmt.Sprintf("`%s`.`%s`", bq.namespace, tb)
sqlStatement := fmt.Sprintf(`
DELETE FROM
%[1]s
WHERE
context_sources_job_run_id <> @jobrunid AND
context_sources_task_run_id <> @taskrunid AND
context_source_id = @sourceid AND
received_at < @starttime;
`,
tableName,
)

pkgLogger.Infof("PG: Deleting rows in table in bigquery for BQ:%s", bq.warehouse.Destination.ID)
Expand All @@ -273,7 +278,6 @@ func (bq *HandleT) DeleteBy(tableNames []string, params warehouseutils.DeleteByP
return status.Err()
}
}

}
return nil
}
Expand Down
56 changes: 31 additions & 25 deletions warehouse/jobs/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"encoding/json"
"io"
"net/http"
"strings"
)

// AddWarehouseJobHandler The following handler gets called for adding async
Expand Down Expand Up @@ -51,35 +52,40 @@ func (a *AsyncJobWhT) AddWarehouseJobHandler(w http.ResponseWriter, r *http.Requ
http.Error(w, "Error extracting tableNames", http.StatusBadRequest)
return
}

var jobIds []int64
// Add to wh_async_job queue each of the tables
for _, th := range tableNames {
if !skipTable(th) {
jobsMetaData := WhJobsMetaData{
JobRunID: startJobPayload.JobRunID,
TaskRunID: startJobPayload.TaskRunID,
StartTime: startJobPayload.StartTime,
JobType: AsyncJobType,
}
metadataJson, err := json.Marshal(jobsMetaData)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
payload := AsyncJobPayloadT{
SourceID: startJobPayload.SourceID,
DestinationID: startJobPayload.DestinationID,
TableName: th,
AsyncJobType: startJobPayload.AsyncJobType,
MetaData: metadataJson,
}
id, err := a.addJobsToDB(a.context, &payload)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
jobIds = append(jobIds, id)

switch strings.ToLower(th) {
case "rudder_discards", "rudder_identity_mappings", "rudder_identity_merge_rules":
continue
}

jobsMetaData := WhJobsMetaData{
JobRunID: startJobPayload.JobRunID,
TaskRunID: startJobPayload.TaskRunID,
StartTime: startJobPayload.StartTime,
JobType: AsyncJobType,
}
metadataJson, err := json.Marshal(jobsMetaData)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
payload := AsyncJobPayloadT{
SourceID: startJobPayload.SourceID,
DestinationID: startJobPayload.DestinationID,
TableName: th,
AsyncJobType: startJobPayload.AsyncJobType,
MetaData: metadataJson,
}
id, err := a.addJobsToDB(a.context, &payload)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
jobIds = append(jobIds, id)
}
whAddJobResponse := WhAddJobResponse{
JobIds: jobIds,
Expand Down
7 changes: 0 additions & 7 deletions warehouse/jobs/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ func validatePayload(payload StartJobReqPayload) bool {
return true
}

func skipTable(th string) bool {
if th == "RUDDER_DISCARDS" || th == "rudder_discards" {
return true
}
return false
}

func contains(sArray []string, s string) bool {
for _, s1 := range sArray {
if s1 == s {
Expand Down
25 changes: 14 additions & 11 deletions warehouse/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,25 +214,28 @@ func (sf *HandleT) authString() string {
func (sf *HandleT) DeleteBy(tableNames []string, params warehouseutils.DeleteByParams) (err error) {
pkgLogger.Infof("SF: Cleaning up the following tables in snowflake for SF:%s : %v", tableNames)
for _, tb := range tableNames {
sqlStatement := fmt.Sprintf(`DELETE FROM "%[1]s"."%[2]s" WHERE
context_sources_job_run_id <> :jobrunid AND
context_sources_task_run_id <> :taskrunid AND
context_source_id = :sourceid AND
received_at < :starttime`,
sqlStatement := fmt.Sprintf(`
DELETE FROM
%[1]q.%[2]q
WHERE
context_sources_job_run_id <> '%[3]s' AND
context_sources_task_run_id <> '%[4]s' AND
context_source_id = '%[5]s' AND
received_at < '%[6]s';
`,
sf.Namespace,
tb,
params.JobRunId,
params.TaskRunId,
params.SourceId,
params.StartTime,
)

pkgLogger.Infof("SF: Deleting rows in table in snowflake for SF:%s", sf.Warehouse.Destination.ID)
pkgLogger.Debugf("SF: Executing the sql statement %v", sqlStatement)

if enableDeleteByJobs {
_, err = sf.Db.Exec(sqlStatement,
sql.Named("jobrunid", params.JobRunId),
sql.Named("taskrunid", params.TaskRunId),
sql.Named("sourceid", params.SourceId),
sql.Named("starttime", params.StartTime),
)
_, err = sf.Db.Exec(sqlStatement)
if err != nil {
pkgLogger.Errorf("Error %s", err)
return err
Expand Down

0 comments on commit d33d8c3

Please sign in to comment.