From 51cd74222f42e79027ac432fdd8a8e03eed94367 Mon Sep 17 00:00:00 2001 From: Aris Tzoumas Date: Sat, 15 Oct 2022 11:03:47 +0300 Subject: [PATCH] chore(jobsdb): use a different advisory lock for different table prefixes --- jobsdb/jobsdb.go | 14 ++++++++++++-- jobsdb/jobsdb_test.go | 15 +++++++++++++++ utils/misc/dbutils.go | 6 ------ 3 files changed, 27 insertions(+), 8 deletions(-) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 3794a96b05..c12532b065 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -22,7 +22,9 @@ package jobsdb import ( "bytes" "context" + "crypto/sha256" "database/sql" + "encoding/binary" "encoding/json" "errors" "fmt" @@ -2913,6 +2915,7 @@ Other functions are impacted by movement of data across DS in background so take both the list and data lock */ func (jd *HandleT) addNewDSLoop(ctx context.Context) { + advisoryLock := jd.getAdvisoryLockForOperation("add_ds") for { select { case <-ctx.Done(): @@ -2926,10 +2929,10 @@ func (jd *HandleT) addNewDSLoop(ctx context.Context) { // start a transaction err := jd.WithTx(func(tx *Tx) error { // acquire a advisory transaction level blocking lock, which is released once the transaction ends. - sqlStatement := fmt.Sprintf(`SELECT pg_advisory_xact_lock(%d);`, misc.JobsDBAddDsAdvisoryLock) + sqlStatement := fmt.Sprintf(`SELECT pg_advisory_xact_lock(%d);`, advisoryLock) _, err := tx.ExecContext(context.TODO(), sqlStatement) if err != nil { - return fmt.Errorf("error while acquiring advisory lock %d: %w", misc.JobsDBAddDsAdvisoryLock, err) + return fmt.Errorf("error while acquiring advisory lock %d: %w", advisoryLock, err) } // We acquire the list lock only after we have acquired the advisory lock. @@ -2975,6 +2978,13 @@ func (jd *HandleT) addNewDSLoop(ctx context.Context) { } } +func (jd *HandleT) getAdvisoryLockForOperation(operation string) int64 { + key := fmt.Sprintf("%s_%s", jd.tablePrefix, operation) + h := sha256.New() + h.Write([]byte(key)) + return int64(binary.BigEndian.Uint32(h.Sum(nil))) +} + func setReadonlyDsInTx(tx *Tx, latestDS dataSetT) error { sqlStatement := fmt.Sprintf( `CREATE TRIGGER readonlyTableTrg diff --git a/jobsdb/jobsdb_test.go b/jobsdb/jobsdb_test.go index fdfc9414c2..7f15f462b4 100644 --- a/jobsdb/jobsdb_test.go +++ b/jobsdb/jobsdb_test.go @@ -966,6 +966,21 @@ func Test_SortDnumList(t *testing.T) { require.Equal(t, []string{"-2", "0_1", "0_1_1", "1"}, l) } +func Test_GetAdvisoryLockForOperation_Unique(t *testing.T) { + calculated := map[int64]string{} + for _, operation := range []string{"add_ds", "migrate_ds"} { + for _, prefix := range []string{"gw", "rt", "batch_rt", "proc_error"} { + h := &HandleT{tablePrefix: prefix} + key := fmt.Sprintf("%s_%s", prefix, operation) + advLock := h.getAdvisoryLockForOperation(operation) + if dupKey, ok := calculated[advLock]; ok { + t.Errorf("Duplicate advisory lock calculated for different keys %s and %s: %d", key, dupKey, advLock) + } + calculated[advLock] = key + } + } +} + type testingT interface { Errorf(format string, args ...interface{}) FailNow() diff --git a/utils/misc/dbutils.go b/utils/misc/dbutils.go index 35675309ee..93dc899e81 100644 --- a/utils/misc/dbutils.go +++ b/utils/misc/dbutils.go @@ -10,12 +10,6 @@ import ( "github.com/rudderlabs/rudder-server/config" ) -type AdvisoryLock int - -const ( - JobsDBAddDsAdvisoryLock AdvisoryLock = 11 -) - // GetConnectionString Returns Jobs DB connection configuration func GetConnectionString() string { host := config.GetString("DB.host", "localhost")