Skip to content

Commit

Permalink
fix(jobsdb): update cache after transaction completes
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Oct 14, 2022
1 parent 8cba05e commit 8a6b638
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 101 deletions.
2 changes: 1 addition & 1 deletion gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (gateway *HandleT) dbWriterWorkerProcess() {
// rsources stats
rsourcesStats := rsources.NewStatsCollector(gateway.rsourcesService)
rsourcesStats.JobsStoredWithErrors(jobList, errorMessagesMap)
return rsourcesStats.Publish(ctx, tx.Tx())
return rsourcesStats.Publish(ctx, tx.SqlTx())
})
cancel()
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions jobsdb/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package jobsdb

import (
"context"
"database/sql"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -1280,7 +1279,7 @@ func consume(t testing.TB, db *HandleT, count int) {
func getPayloadSize(t *testing.T, jobsDB JobsDB, job *JobT) (int64, error) {
var size int64
var tables []string
err := jobsDB.WithTx(func(tx *sql.Tx) error {
err := jobsDB.WithTx(func(tx *Tx) error {
rows, err := tx.Query(fmt.Sprintf("SELECT tablename FROM pg_catalog.pg_tables where tablename like '%s_jobs_%%'", jobsDB.Identifier()))
require.NoError(t, err)
for rows.Next() {
Expand Down
Loading

0 comments on commit 8a6b638

Please sign in to comment.