Skip to content

Commit

Permalink
chore(jobsdb): use a different advisory lock for different table pref…
Browse files Browse the repository at this point in the history
…ixes
  • Loading branch information
atzoum committed Oct 15, 2022
1 parent eb705df commit 51cd742
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 8 deletions.
14 changes: 12 additions & 2 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ package jobsdb
import (
"bytes"
"context"
"crypto/sha256"
"database/sql"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -2913,6 +2915,7 @@ Other functions are impacted by movement of data across DS in background
so take both the list and data lock
*/
func (jd *HandleT) addNewDSLoop(ctx context.Context) {
advisoryLock := jd.getAdvisoryLockForOperation("add_ds")
for {
select {
case <-ctx.Done():
Expand All @@ -2926,10 +2929,10 @@ func (jd *HandleT) addNewDSLoop(ctx context.Context) {
// start a transaction
err := jd.WithTx(func(tx *Tx) error {
// acquire a advisory transaction level blocking lock, which is released once the transaction ends.
sqlStatement := fmt.Sprintf(`SELECT pg_advisory_xact_lock(%d);`, misc.JobsDBAddDsAdvisoryLock)
sqlStatement := fmt.Sprintf(`SELECT pg_advisory_xact_lock(%d);`, advisoryLock)
_, err := tx.ExecContext(context.TODO(), sqlStatement)
if err != nil {
return fmt.Errorf("error while acquiring advisory lock %d: %w", misc.JobsDBAddDsAdvisoryLock, err)
return fmt.Errorf("error while acquiring advisory lock %d: %w", advisoryLock, err)
}

// We acquire the list lock only after we have acquired the advisory lock.
Expand Down Expand Up @@ -2975,6 +2978,13 @@ func (jd *HandleT) addNewDSLoop(ctx context.Context) {
}
}

func (jd *HandleT) getAdvisoryLockForOperation(operation string) int64 {
key := fmt.Sprintf("%s_%s", jd.tablePrefix, operation)
h := sha256.New()
h.Write([]byte(key))
return int64(binary.BigEndian.Uint32(h.Sum(nil)))
}

func setReadonlyDsInTx(tx *Tx, latestDS dataSetT) error {
sqlStatement := fmt.Sprintf(
`CREATE TRIGGER readonlyTableTrg
Expand Down
15 changes: 15 additions & 0 deletions jobsdb/jobsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,21 @@ func Test_SortDnumList(t *testing.T) {
require.Equal(t, []string{"-2", "0_1", "0_1_1", "1"}, l)
}

func Test_GetAdvisoryLockForOperation_Unique(t *testing.T) {
calculated := map[int64]string{}
for _, operation := range []string{"add_ds", "migrate_ds"} {
for _, prefix := range []string{"gw", "rt", "batch_rt", "proc_error"} {
h := &HandleT{tablePrefix: prefix}
key := fmt.Sprintf("%s_%s", prefix, operation)
advLock := h.getAdvisoryLockForOperation(operation)
if dupKey, ok := calculated[advLock]; ok {
t.Errorf("Duplicate advisory lock calculated for different keys %s and %s: %d", key, dupKey, advLock)
}
calculated[advLock] = key
}
}
}

type testingT interface {
Errorf(format string, args ...interface{})
FailNow()
Expand Down
6 changes: 0 additions & 6 deletions utils/misc/dbutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ import (
"github.com/rudderlabs/rudder-server/config"
)

type AdvisoryLock int

const (
JobsDBAddDsAdvisoryLock AdvisoryLock = 11
)

// GetConnectionString Returns Jobs DB connection configuration
func GetConnectionString() string {
host := config.GetString("DB.host", "localhost")
Expand Down

0 comments on commit 51cd742

Please sign in to comment.