Skip to content

Commit

Permalink
pkg/cache: Update last_accessed_at less frequently (#41)
Browse files Browse the repository at this point in the history
To avoid many of the `database locked` errors I am seeing, lower the
update frequency to once every 5 minutes per record.

closes #38
  • Loading branch information
kalbasit authored Dec 6, 2024
1 parent 98cef5a commit 39aad14
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 10 deletions.
55 changes: 47 additions & 8 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ var (
ErrNotFound = errors.New("not found")
)

const recordAgeIgnoreTouch = 5 * time.Minute

// Cache represents the main cache service.
type Cache struct {
hostName string
Expand All @@ -62,16 +64,23 @@ type Cache struct {
upstreamCaches []upstream.Cache
db *database.DB

// recordAgeIgnoreTouch represents the duration at which a record is
// considered up to date and a touch is not invoked. This helps avoid
// repetitive touching of records in the database which are causing `database
// is locked` errors
recordAgeIgnoreTouch time.Duration

mu sync.Mutex
upstreamJobs map[string]chan struct{}
}

// New returns a new Cache.
func New(logger log15.Logger, hostName, cachePath string, ucs []upstream.Cache) (*Cache, error) {
c := &Cache{
logger: logger,
upstreamCaches: ucs,
upstreamJobs: make(map[string]chan struct{}),
logger: logger,
upstreamCaches: ucs,
upstreamJobs: make(map[string]chan struct{}),
recordAgeIgnoreTouch: recordAgeIgnoreTouch,
}

if err := c.validateHostname(hostName); err != nil {
Expand Down Expand Up @@ -106,6 +115,10 @@ func New(logger log15.Logger, hostName, cachePath string, ucs []upstream.Cache)
return c, c.setup()
}

// SetRecordAgeIgnoreTouch changes the duration at which a record is considered
// up to date and a touch is not invoked.
func (c *Cache) SetRecordAgeIgnoreTouch(d time.Duration) { c.recordAgeIgnoreTouch = d }

// GetHostname returns the hostname.
func (c *Cache) GetHostname() string { return c.hostName }

Expand Down Expand Up @@ -237,8 +250,20 @@ func (c *Cache) getNarFromStore(hash, compression string) (int64, io.ReadCloser,
}
}()

if _, err := c.db.TouchNarRecord(tx, hash); err != nil {
return 0, nil, fmt.Errorf("error touching the nar record: %w", err)
nr, err := c.db.GetNarRecord(tx, hash)
if err != nil {
// TODO: If record not found, record it instead!
if errors.Is(err, database.ErrNotFound) {
return size, r, nil
}

return 0, nil, fmt.Errorf("error fetching the nar record: %w", err)
}

if time.Since(nr.LastAccessedAt) > c.recordAgeIgnoreTouch {
if _, err := c.db.TouchNarRecord(tx, hash); err != nil {
return 0, nil, fmt.Errorf("error touching the nar record: %w", err)
}
}

if err := tx.Commit(); err != nil {
Expand Down Expand Up @@ -427,8 +452,20 @@ func (c *Cache) getNarInfoFromStore(hash string) (*narinfo.NarInfo, error) {
}
}()

if _, err := c.db.TouchNarInfoRecord(tx, hash); err != nil {
return nil, fmt.Errorf("error touching the narinfo record: %w", err)
nir, err := c.db.GetNarInfoRecord(tx, hash)
if err != nil {
// TODO: If record not found, record it instead!
if errors.Is(err, database.ErrNotFound) {
return ni, nil
}

return nil, fmt.Errorf("error fetching the narinfo record: %w", err)
}

if time.Since(nir.LastAccessedAt) > c.recordAgeIgnoreTouch {
if _, err := c.db.TouchNarInfoRecord(tx, hash); err != nil {
return nil, fmt.Errorf("error touching the narinfo record: %w", err)
}
}

if err := tx.Commit(); err != nil {
Expand Down Expand Up @@ -482,7 +519,9 @@ func (c *Cache) putNarInfoInStore(hash string, narInfo *narinfo.NarInfo) error {
}

func (c *Cache) storeInDatabase(hash string, narInfo *narinfo.NarInfo) error {
c.logger.Info("storing narinfo and nar record in the database", "hash", hash, "nar-url", narInfo.URL)
log := c.logger.New("hash", hash, "nar-url", narInfo.URL)

log.Info("storing narinfo and nar record in the database")

tx, err := c.db.Begin()
if err != nil {
Expand Down
137 changes: 135 additions & 2 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,8 @@ func TestGetNarInfo(t *testing.T) {
t.Errorf("expected no error, got %q", err)
}

c.SetRecordAgeIgnoreTouch(0)

db, err := sql.Open("sqlite3", filepath.Join(dir, "var", "ncps", "db", "db.sqlite"))
if err != nil {
t.Fatalf("error opening the database: %s", err)
Expand Down Expand Up @@ -519,6 +521,61 @@ func TestGetNarInfo(t *testing.T) {
}
})

t.Run("pulling it another time within recordAgeIgnoreTouch should not update last_accessed_at", func(t *testing.T) {
time.Sleep(time.Second)

c.SetRecordAgeIgnoreTouch(time.Hour)

defer func() {
c.SetRecordAgeIgnoreTouch(0)
}()

_, err := c.GetNarInfo(context.Background(), narInfoHash2)
if err != nil {
t.Fatalf("no error expected, got: %s", err)
}

t.Run("narinfo does exist in the database with the same last_accessed_at", func(t *testing.T) {
const query = `
SELECT hash, created_at, last_accessed_at
FROM narinfos
`

rows, err := db.Query(query)
if err != nil {
t.Fatalf("error selecting narinfos: %s", err)
}

nims := make([]database.NarInfoModel, 0)

for rows.Next() {
var nim database.NarInfoModel

if err := rows.Scan(&nim.Hash, &nim.CreatedAt, &nim.LastAccessedAt); err != nil {
t.Fatalf("expected no error got: %s", err)
}

nims = append(nims, nim)
}

if err := rows.Err(); err != nil {
t.Errorf("not expecting an error got: %s", err)
}

if want, got := 1, len(nims); want != got {
t.Fatalf("want %d got %d", want, got)
}

if want, got := narInfoHash2, nims[0].Hash; want != got {
t.Errorf("want %q got %q", want, got)
}

if want, got := nims[0].CreatedAt, nims[0].LastAccessedAt; want.Unix() != got.Unix() {
t.Errorf("expected created_at == last_accessed_at got: %q == %q", want, got)
}
})
})

t.Run("pulling it another time should update last_accessed_at only for narinfo", func(t *testing.T) {
time.Sleep(time.Second)

Expand Down Expand Up @@ -583,6 +640,8 @@ func TestPutNarInfo(t *testing.T) {
t.Errorf("expected no error, got %q", err)
}

c.SetRecordAgeIgnoreTouch(0)

db, err := sql.Open("sqlite3", filepath.Join(dir, "var", "ncps", "db", "db.sqlite"))
if err != nil {
t.Fatalf("error opening the database: %s", err)
Expand Down Expand Up @@ -776,6 +835,8 @@ func TestDeleteNarInfo(t *testing.T) {
t.Errorf("expected no error, got %q", err)
}

c.SetRecordAgeIgnoreTouch(0)

t.Run("file does not exist in the store", func(t *testing.T) {
storePath := filepath.Join(dir, "store", narInfoHash1+".narinfo")

Expand Down Expand Up @@ -865,6 +926,8 @@ func TestGetNar(t *testing.T) {
t.Errorf("expected no error, got %q", err)
}

c.SetRecordAgeIgnoreTouch(0)

db, err := sql.Open("sqlite3", filepath.Join(dir, "var", "ncps", "db", "db.sqlite"))
if err != nil {
t.Fatalf("error opening the database: %s", err)
Expand Down Expand Up @@ -914,6 +977,13 @@ func TestGetNar(t *testing.T) {
}
})

t.Run("getting the narinfo so the record in the database now exists", func(t *testing.T) {
_, err := c.GetNarInfo(context.Background(), narInfoHash1)
if err != nil {
t.Fatalf("no error expected, got: %s", err)
}
})

size, r, err := c.GetNar(narHash1, "")
if err != nil {
t.Fatalf("no error expected, got: %s", err)
Expand Down Expand Up @@ -949,8 +1019,6 @@ func TestGetNar(t *testing.T) {
if err != nil {
t.Fatalf("no error expected, got: %s", err)
}

defer r.Close()
})

t.Run("nar does exist in the database, and has initial last_accessed_at", func(t *testing.T) {
Expand Down Expand Up @@ -998,6 +1066,67 @@ func TestGetNar(t *testing.T) {
}
})

t.Run("pulling it another time within recordAgeIgnoreTouch should not update last_accessed_at", func(t *testing.T) {
time.Sleep(time.Second)

c.SetRecordAgeIgnoreTouch(time.Hour)

defer func() {
c.SetRecordAgeIgnoreTouch(0)
}()

_, r, err := c.GetNar(narHash1, "")
if err != nil {
t.Fatalf("no error expected, got: %s", err)
}
defer r.Close()

t.Run("narinfo does exist in the database with the same last_accessed_at", func(t *testing.T) {
const query = `
SELECT hash, created_at, last_accessed_at
FROM nars
`

rows, err := db.Query(query)
if err != nil {
t.Fatalf("error selecting narinfos: %s", err)
}

nims := make([]database.NarModel, 0)

for rows.Next() {
var nim database.NarModel

err := rows.Scan(
&nim.Hash,
&nim.CreatedAt,
&nim.LastAccessedAt,
)
if err != nil {
t.Fatalf("expected no error got: %s", err)
}

nims = append(nims, nim)
}

if err := rows.Err(); err != nil {
t.Errorf("not expecting an error got: %s", err)
}

if want, got := 1, len(nims); want != got {
t.Fatalf("want %d got %d", want, got)
}

if want, got := narHash1, nims[0].Hash; want != got {
t.Errorf("want %q got %q", want, got)
}

if want, got := nims[0].CreatedAt, nims[0].LastAccessedAt; want.Unix() != got.Unix() {
t.Errorf("expected created_at == last_accessed_at got: %q != %q", want, got)
}
})
})

t.Run("pulling it another time should update last_accessed_at", func(t *testing.T) {
time.Sleep(time.Second)

Expand Down Expand Up @@ -1068,6 +1197,8 @@ func TestPutNar(t *testing.T) {
t.Errorf("expected no error, got %q", err)
}

c.SetRecordAgeIgnoreTouch(0)

t.Run("without compression", func(t *testing.T) {
storePath := filepath.Join(dir, "store", "nar", narHash1+".nar")

Expand Down Expand Up @@ -1154,6 +1285,8 @@ func TestDeleteNar(t *testing.T) {
t.Errorf("expected no error, got %q", err)
}

c.SetRecordAgeIgnoreTouch(0)

t.Run("without compression", func(t *testing.T) {
storePath := filepath.Join(dir, "store", "nar", narHash1+".nar")

Expand Down
Loading

0 comments on commit 39aad14

Please sign in to comment.