Skip to content

Commit

Permalink
refactor: internal migration in postgres (#2559)
Browse files Browse the repository at this point in the history
* refactor: internal migration in postgres

* chore: cleanup

* test to verify non-terminal jobs are migrated

* cleanup

Co-authored-by: Aris Tzoumas <[email protected]>

Co-authored-by: Aris Tzoumas <[email protected]>
  • Loading branch information
Sidddddarth and atzoum authored Oct 14, 2022
1 parent 73baf76 commit 3cb2ec6
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 110 deletions.
111 changes: 111 additions & 0 deletions jobsdb/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func genJobs(workspaceId, customVal string, jobCount, eventsPerJob int) []*JobT
js := make([]*JobT, jobCount)
for i := range js {
js[i] = &JobT{
JobID: int64(i) + 1,
Parameters: []byte(`{"batch_id":1,"source_id":"sourceID","source_job_run_id":""}`),
EventPayload: []byte(`{"receivedAt":"2021-06-06T20:26:39.598+05:30","writeKey":"writeKey","requestIP":"[::1]", "batch": [{"anonymousId":"anon_id","channel":"android-sdk","context":{"app":{"build":"1","name":"RudderAndroidClient","namespace":"com.rudderlabs.android.sdk","version":"1.0"},"device":{"id":"49e4bdd1c280bc00","manufacturer":"Google","model":"Android SDK built for x86","name":"generic_x86"},"library":{"name":"com.rudderstack.android.sdk.core"},"locale":"en-US","network":{"carrier":"Android"},"screen":{"density":420,"height":1794,"width":1080},"traits":{"anonymousId":"49e4bdd1c280bc00"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"event":"Demo Track","integrations":{"All":true},"messageId":"b96f3d8a-7c26-4329-9671-4e3202f42f15","originalTimestamp":"2019-08-12T05:08:30.909Z","properties":{"category":"Demo Category","floatVal":4.501,"label":"Demo Label","testArray":[{"id":"elem1","value":"e1"},{"id":"elem2","value":"e2"}],"testMap":{"t1":"a","t2":4},"value":5},"rudderId":"a-292e-4e79-9880-f8009e0ae4a3","sentAt":"2019-08-12T05:08:30.909Z","type":"track"}]}`),
UserID: "a-292e-4e79-9880-f8009e0ae4a3",
Expand Down Expand Up @@ -678,6 +679,116 @@ func TestJobsDB(t *testing.T) {
require.Equal(t, prefix+"_jobs_4", dsList[2].JobTable)
require.Equal(t, prefix+"_jobs_5", dsList[3].JobTable)
})

t.Run(`migrates only moves non-terminal jobs to a new DS`, func(t *testing.T) {
customVal := "MOCKDS"
triggerAddNewDS := make(chan time.Time)
triggerMigrateDS := make(chan time.Time)

jobDB := HandleT{
TriggerAddNewDS: func() <-chan time.Time {
return triggerAddNewDS
},
TriggerMigrateDS: func() <-chan time.Time {
return triggerMigrateDS
},
}
tablePrefix := strings.ToLower(rand.String(5))
err := jobDB.Setup(ReadWrite, true, tablePrefix, true, []prebackup.Handler{})
require.NoError(t, err)
defer jobDB.TearDown()

jobDB.MaxDSRetentionPeriod = time.Second

var (
numTotalJobs = 30
numFailedJobs = 10
numUnprocessedJobs = 10
numSucceededJobs = 10
jobs = genJobs(defaultWorkspaceID, customVal, numTotalJobs, 1)
// first #numFailedJobs jobs marked Failed - should be migrated
failedStatuses = genJobStatuses(jobs[:numFailedJobs], Failed.State)
// #numFailedJobs - #numFailedJobs+#numSucceededJobs jobs marked as succeeded - should not be migrated
succeededStatuses = genJobStatuses(jobs[numFailedJobs:numFailedJobs+numSucceededJobs], Succeeded.State)
// #numFailedJobs+#numSucceededJobs - #numTotalJobs jobs are unprocessed - should be migrated
)
require.NoError(t, jobDB.Store(context.Background(), jobs))
require.NoError(
t,
jobDB.UpdateJobStatus(
context.Background(),
append(failedStatuses, succeededStatuses...),
[]string{customVal},
[]ParameterFilterT{},
),
)

require.EqualValues(t, 1, jobDB.GetMaxDSIndex())
time.Sleep(time.Second * 2) // wait for some time to pass
triggerAddNewDS <- time.Now() // trigger addNewDSLoop to run
triggerAddNewDS <- time.Now() // Second time, waits for the first loop to finish

jobDBInspector := HandleInspector{HandleT: &jobDB}
require.EqualValues(t, 2, len(jobDBInspector.DSIndicesList()))
require.EqualValues(t, 2, jobDB.GetMaxDSIndex())

time.Sleep(time.Second * 2) // wait for some time to pass so that retention condition satisfies

triggerMigrateDS <- time.Now() // trigger migrateDSLoop to run
triggerMigrateDS <- time.Now() // Second time, waits for the first loop to finish

dsIndicesList := jobDBInspector.DSIndicesList()
require.EqualValues(t, 2, len(jobDBInspector.DSIndicesList()))
require.EqualValues(t, "1_1", dsIndicesList[0])
require.EqualValues(t, "2", dsIndicesList[1])
require.EqualValues(t, 2, jobDB.GetMaxDSIndex())

// only non-terminal jobs should be migrated
var numJobs int64
require.NoError(
t,
jobDB.dbHandle.QueryRow(
fmt.Sprintf(`SELECT COUNT(*) FROM %s`, tablePrefix+`_jobs_`+dsIndicesList[0]),
).Scan(&numJobs),
)
require.Equal(t, numFailedJobs+numUnprocessedJobs, int(numJobs))

// verify that unprocessed jobs are migrated to new DS
unprocessedResult, err := jobDB.GetUnprocessed(context.Background(), GetQueryParamsT{
CustomValFilters: []string{customVal},
JobsLimit: 100,
ParameterFilters: []ParameterFilterT{},
})
require.NoError(t, err, "GetUnprocessed failed")
require.Equal(t, numUnprocessedJobs, len(unprocessedResult.Jobs))
expectedUnprocessedJobIDs := make([]int64, 0)
for _, job := range jobs[numFailedJobs+numSucceededJobs:] {
expectedUnprocessedJobIDs = append(expectedUnprocessedJobIDs, job.JobID)
}
actualUnprocessedJobIDs := make([]int64, 0)
for _, job := range unprocessedResult.Jobs {
actualUnprocessedJobIDs = append(actualUnprocessedJobIDs, job.JobID)
}
require.Equal(t, expectedUnprocessedJobIDs, actualUnprocessedJobIDs)

// verifying that failed jobs are migrated to new DS
failedResult, err := jobDB.GetToRetry(context.Background(), GetQueryParamsT{
CustomValFilters: []string{customVal},
JobsLimit: 100,
ParameterFilters: []ParameterFilterT{},
})
require.NoError(t, err, "GetToRetry failed")
expectedFailedJobIDs := make([]int64, 0)
for _, job := range jobs[:numFailedJobs] {
expectedFailedJobIDs = append(expectedFailedJobIDs, job.JobID)
}
actualFailedJobIDs := make([]int64, 0)
for _, job := range failedResult.Jobs {
actualFailedJobIDs = append(actualFailedJobIDs, job.JobID)
}
require.Equal(t, numFailedJobs, len(failedResult.Jobs))
require.Equal(t, expectedFailedJobIDs, actualFailedJobIDs)
})
}

func TestMultiTenantLegacyGetAllJobs(t *testing.T) {
Expand Down
178 changes: 68 additions & 110 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1747,53 +1747,42 @@ completed (state is failed or waiting or waiting_retry or executiong) are copied
over. Then the status (only the latest) is set for those jobs
*/

func (jd *HandleT) migrateJobsInTx(ctx context.Context, tx *sql.Tx, srcDS, destDS dataSetT) (noJobsMigrated int, err error) {
func (jd *HandleT) migrateJobsInTx(ctx context.Context, tx *sql.Tx, srcDS, destDS dataSetT) (int, error) {
queryStat := stats.Default.NewTaggedStat("migration_jobs", stats.TimerType, stats.Tags{"customVal": jd.tablePrefix})
queryStat.Start()
defer queryStat.End()
if !jd.dsListLock.RTryLockWithCtx(ctx) {
return 0, fmt.Errorf("could not acquire a dslist read lock: %w", ctx.Err())
}
defer jd.dsListLock.RUnlock()

// Unprocessed jobs
unprocessedList, _, err := jd.getUnprocessedJobsDS(ctx, srcDS, false, GetQueryParamsT{})
if err != nil {
return 0, err
}
// Jobs which haven't finished processing
retryList, _, err := jd.getProcessedJobsDS(ctx, srcDS, true,
GetQueryParamsT{StateFilters: validNonTerminalStates})
if err != nil {
return 0, err
}
jobsToMigrate := append(unprocessedList.Jobs, retryList.Jobs...)
noJobsMigrated = len(jobsToMigrate)
compactDSQuery := fmt.Sprintf(
`with last_status as
(
select * from %[1]q where id in (select max(id) from %[1]q group by job_id)
),
inserted_jobs as
(
insert into %[3]q (select j.* from %[2]q j left join last_status js on js.job_id = j.job_id
where js.job_id is null or js.job_state = ANY('{%[5]s}') order by j.job_id) returning job_id
),
insertedStatuses as
(
insert into %[4]q (select * from last_status where job_state = ANY('{%[5]s}'))
)
select count(*) from inserted_jobs;`,
srcDS.JobStatusTable,
srcDS.JobTable,
destDS.JobTable,
destDS.JobStatusTable,
strings.Join(validNonTerminalStates, ","),
)

if err := jd.copyJobsDS(tx, destDS, jobsToMigrate); err != nil {
return 0, err
}
// Now copy over the latest status of the unfinished jobs
var statusList []*JobStatusT
for _, job := range retryList.Jobs {
newStatus := JobStatusT{
JobID: job.JobID,
JobState: job.LastJobStatus.JobState,
AttemptNum: job.LastJobStatus.AttemptNum,
ExecTime: job.LastJobStatus.ExecTime,
RetryTime: job.LastJobStatus.RetryTime,
ErrorCode: job.LastJobStatus.ErrorCode,
ErrorResponse: job.LastJobStatus.ErrorResponse,
Parameters: job.LastJobStatus.Parameters,
WorkspaceId: job.WorkspaceId,
}
statusList = append(statusList, &newStatus)
}
err = jd.copyJobStatusDS(ctx, tx, destDS, statusList, []string{})
var numJobsMigrated int64
err := tx.QueryRowContext(
ctx,
compactDSQuery,
).Scan(&numJobsMigrated)
if err != nil {
return 0, err
}
return noJobsMigrated, nil
return int(numJobsMigrated), nil
}

func (jd *HandleT) postMigrateHandleDS(tx *sql.Tx, migrateFrom []dataSetT) error {
Expand Down Expand Up @@ -2444,7 +2433,7 @@ stateFilters and customValFilters do a OR query on values passed in array
parameterFilters do a AND query on values included in the map.
A JobsLimit less than or equal to zero indicates no limit.
*/
func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, getAll bool, params GetQueryParamsT) (JobsResult, bool, error) { // skipcq: CRT-P0003
func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, params GetQueryParamsT) (JobsResult, bool, error) { // skipcq: CRT-P0003
stateFilters := params.StateFilters
customValFilters := params.CustomValFilters
parameterFilters := params.ParameterFilters
Expand Down Expand Up @@ -2472,55 +2461,27 @@ func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, getAll b
stateQuery = ""
}
if len(customValFilters) > 0 && !params.IgnoreCustomValFiltersInQuery {
jd.assert(!getAll, "getAll is true")
customValQuery = " AND " +
constructQueryOR("jobs.custom_val", customValFilters)
} else {
customValQuery = ""
}

if len(parameterFilters) > 0 {
jd.assert(!getAll, "getAll is true")
sourceQuery += " AND " + constructParameterJSONQuery("jobs", parameterFilters)
} else {
sourceQuery = ""
}

if params.JobsLimit > 0 {
jd.assert(!getAll, "getAll is true")
limitQuery = fmt.Sprintf(" LIMIT %d ", params.JobsLimit)
} else {
limitQuery = ""
}

var rows *sql.Rows
if getAll {
sqlStatement := fmt.Sprintf(`SELECT
jobs.job_id, jobs.uuid, jobs.user_id, jobs.parameters, jobs.custom_val, jobs.event_payload, jobs.event_count,
jobs.created_at, jobs.expire_at, jobs.workspace_id,
pg_column_size(jobs.event_payload) as payload_size,
sum(jobs.event_count) over (order by jobs.job_id asc) as running_event_counts,
sum(pg_column_size(jobs.event_payload)) over (order by jobs.job_id) as running_payload_size,
job_latest_state.job_state, job_latest_state.attempt,
job_latest_state.exec_time, job_latest_state.retry_time,
job_latest_state.error_code, job_latest_state.error_response, job_latest_state.parameters
FROM
%[1]q AS jobs,
(SELECT job_id, job_state, attempt, exec_time, retry_time,
error_code, error_response,parameters FROM %[2]q WHERE id IN
(SELECT MAX(id) from %[2]q GROUP BY job_id) %[3]s)
AS job_latest_state
WHERE jobs.job_id=job_latest_state.job_id`,
ds.JobTable, ds.JobStatusTable, stateQuery)
var err error
rows, err = jd.dbHandle.QueryContext(ctx, sqlStatement)
if err != nil {
return JobsResult{}, false, err
}
defer func() { _ = rows.Close() }()

} else {
sqlStatement := fmt.Sprintf(`SELECT
sqlStatement := fmt.Sprintf(`SELECT
jobs.job_id, jobs.uuid, jobs.user_id, jobs.parameters, jobs.custom_val, jobs.event_payload, jobs.event_count,
jobs.created_at, jobs.expire_at, jobs.workspace_id,
pg_column_size(jobs.event_payload) as payload_size,
Expand All @@ -2538,41 +2499,40 @@ func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, getAll b
WHERE jobs.job_id=job_latest_state.job_id
%[4]s %[5]s
AND job_latest_state.retry_time < $1 ORDER BY jobs.job_id %[6]s`,
ds.JobTable, ds.JobStatusTable, stateQuery, customValQuery, sourceQuery, limitQuery)
ds.JobTable, ds.JobStatusTable, stateQuery, customValQuery, sourceQuery, limitQuery)

args := []interface{}{getTimeNowFunc()}
args := []interface{}{getTimeNowFunc()}

var wrapQuery []string
if params.EventsLimit > 0 {
// If there is a single job in the dataset containing more events than the EventsLimit, we should return it,
// otherwise processing will halt.
// Therefore, we always retrieve one more job from the database than our limit dictates.
// This job will only be returned to the result in case of the aforementioned scenario, otherwise it gets filtered out
// later, during row scanning
wrapQuery = append(wrapQuery, fmt.Sprintf(`running_event_counts - t.event_count <= $%d`, len(args)+1))
args = append(args, params.EventsLimit)
}
var wrapQuery []string
if params.EventsLimit > 0 {
// If there is a single job in the dataset containing more events than the EventsLimit, we should return it,
// otherwise processing will halt.
// Therefore, we always retrieve one more job from the database than our limit dictates.
// This job will only be returned to the result in case of the aforementioned scenario, otherwise it gets filtered out
// later, during row scanning
wrapQuery = append(wrapQuery, fmt.Sprintf(`running_event_counts - t.event_count <= $%d`, len(args)+1))
args = append(args, params.EventsLimit)
}

if params.PayloadSizeLimit > 0 {
wrapQuery = append(wrapQuery, fmt.Sprintf(`running_payload_size - t.payload_size <= $%d`, len(args)+1))
args = append(args, params.PayloadSizeLimit)
}
if params.PayloadSizeLimit > 0 {
wrapQuery = append(wrapQuery, fmt.Sprintf(`running_payload_size - t.payload_size <= $%d`, len(args)+1))
args = append(args, params.PayloadSizeLimit)
}

if len(wrapQuery) > 0 {
sqlStatement = `SELECT * FROM (` + sqlStatement + `) t WHERE ` + strings.Join(wrapQuery, " AND ")
}
if len(wrapQuery) > 0 {
sqlStatement = `SELECT * FROM (` + sqlStatement + `) t WHERE ` + strings.Join(wrapQuery, " AND ")
}

stmt, err := jd.dbHandle.PrepareContext(ctx, sqlStatement)
if err != nil {
return JobsResult{}, false, err
}
defer func() { _ = stmt.Close() }()
rows, err = stmt.QueryContext(ctx, args...)
if err != nil {
return JobsResult{}, false, err
}
defer func() { _ = rows.Close() }()
stmt, err := jd.dbHandle.PrepareContext(ctx, sqlStatement)
if err != nil {
return JobsResult{}, false, err
}
defer func() { _ = stmt.Close() }()
rows, err = stmt.QueryContext(ctx, args...)
if err != nil {
return JobsResult{}, false, err
}
defer func() { _ = rows.Close() }()

var runningEventCount int
var runningPayloadSize int64
Expand All @@ -2594,17 +2554,15 @@ func (jd *HandleT) getProcessedJobsDS(ctx context.Context, ds dataSetT, getAll b
return JobsResult{}, false, err
}

if !getAll { // if getAll is true, limits do not apply
if params.EventsLimit > 0 && runningEventCount > params.EventsLimit && len(jobList) > 0 {
// events limit overflow is triggered as long as we have read at least one job
limitsReached = true
break
}
if params.PayloadSizeLimit > 0 && runningPayloadSize > params.PayloadSizeLimit && len(jobList) > 0 {
// payload size limit overflow is triggered as long as we have read at least one job
limitsReached = true
break
}
if params.EventsLimit > 0 && runningEventCount > params.EventsLimit && len(jobList) > 0 {
// events limit overflow is triggered as long as we have read at least one job
limitsReached = true
break
}
if params.PayloadSizeLimit > 0 && runningPayloadSize > params.PayloadSizeLimit && len(jobList) > 0 {
// payload size limit overflow is triggered as long as we have read at least one job
limitsReached = true
break
}
// we are adding the job only after testing for limitsReached
// so that we don't always overflow
Expand Down Expand Up @@ -4392,7 +4350,7 @@ func (jd *HandleT) GetProcessed(ctx context.Context, params GetQueryParamsT) (Jo
if dsLimit > 0 && dsQueryCount >= dsLimit {
break
}
processedJobs, dsHit, err := jd.getProcessedJobsDS(ctx, ds, false, params)
processedJobs, dsHit, err := jd.getProcessedJobsDS(ctx, ds, params)
if err != nil {
return JobsResult{}, err
}
Expand Down

0 comments on commit 3cb2ec6

Please sign in to comment.