Skip to content

Commit

Permalink
Adding db transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
mchavez committed Aug 28, 2024
1 parent 041395f commit 40c5957
Show file tree
Hide file tree
Showing 53 changed files with 277,072 additions and 18 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/google/go-cmp v0.6.0
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/klauspost/compress v1.17.8
github.com/mattn/go-sqlite3 v1.14.22
github.com/pquerna/xjwt v0.2.0
github.com/segmentio/ksuid v1.0.4
github.com/shirou/gopsutil/v3 v3.24.4
Expand Down Expand Up @@ -74,7 +75,6 @@ require (
github.com/lufia/plan9stats v0.0.0-20240408141607-282e7b5d6b74 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
Expand Down
82 changes: 65 additions & 17 deletions pkg/uhttp/dbcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"strings"
"time"

_ "github.com/doug-martin/goqu/v9/dialect/sqlite3"
_ "github.com/glebarez/go-sqlite"
"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
_ "github.com/mattn/go-sqlite3"
"go.uber.org/zap"
)

Expand All @@ -32,6 +32,14 @@ type DBCache struct {
defaultExpiration time.Duration
}

const (
failStartTransaction = "Failed to start a transaction"
nilConnection = "Database connection is nil"
errQueryingTable = "Error querying cache table"
failRollback = "Failed to rollback transaction"
failInsert = "Failed to insert data into cache table"
)

func NewDBCache(ctx context.Context, cfg CacheConfig) (*DBCache, error) {
l := ctxzap.Extract(ctx)
cacheDir, err := os.UserCacheDir()
Expand All @@ -41,7 +49,7 @@ func NewDBCache(ctx context.Context, cfg CacheConfig) (*DBCache, error) {
}

// Connect to db
db, err := sql.Open("sqlite", filepath.Join(cacheDir, "lcache.db"))
db, err := sql.Open("sqlite3", filepath.Join(cacheDir, "lcache.db"))
if err != nil {
l.Debug("Failed to open database", zap.Error(err))
return &DBCache{}, err
Expand Down Expand Up @@ -120,7 +128,7 @@ func (d *DBCache) CreateCacheKey(req *http.Request) (string, error) {
// Get returns cached response (if exists).
func (d *DBCache) Get(ctx context.Context, key string) (*http.Response, error) {
if d.IsNilConnection() {
return nil, fmt.Errorf("database connection is nil")
return nil, fmt.Errorf("%s", nilConnection)
}

entry, err := d.Select(ctx, key)
Expand All @@ -140,7 +148,7 @@ func (d *DBCache) Get(ctx context.Context, key string) (*http.Response, error) {
// Set stores and save response in the db.
func (d *DBCache) Set(ctx context.Context, key string, value *http.Response) error {
if d.IsNilConnection() {
return fmt.Errorf("database connection is nil")
return fmt.Errorf("%s", nilConnection)
}

cacheableResponse, err := httputil.DumpResponse(value, true)
Expand All @@ -159,7 +167,7 @@ func (d *DBCache) Set(ctx context.Context, key string, value *http.Response) err
// Remove stored keys.
func (d *DBCache) Delete(ctx context.Context, key string) error {
if d.IsNilConnection() {
return fmt.Errorf("database connection is nil")
return fmt.Errorf("%s", nilConnection)
}

err := d.Remove(ctx, key)
Expand All @@ -172,7 +180,7 @@ func (d *DBCache) Delete(ctx context.Context, key string) error {

func (d *DBCache) Clear(ctx context.Context) error {
if d.IsNilConnection() {
return fmt.Errorf("database connection is nil")
return fmt.Errorf("%s", nilConnection)
}

err := d.close(ctx)
Expand All @@ -191,7 +199,7 @@ func (d *DBCache) Insert(ctx context.Context, key string, value any) error {
ok bool
)
if d.IsNilConnection() {
return fmt.Errorf("database connection is nil")
return fmt.Errorf("%s", nilConnection)
}

l := ctxzap.Extract(ctx)
Expand All @@ -204,13 +212,33 @@ func (d *DBCache) Insert(ctx context.Context, key string, value any) error {
}

if ok, _ := d.Has(ctx, key); !ok {
_, err := d.db.Exec("INSERT INTO http_cache(key, data, expiration) values(?, ?, ?)",
tx, err := d.db.Begin()
if err != nil {
l.Debug(failStartTransaction, zap.Error(err))
return err
}

_, err = tx.Exec("INSERT INTO http_cache(key, data, expiration) values(?, ?, ?)",
key,
bytes,
time.Now().UnixNano(),
)
if err != nil {
l.Debug("Failed to insert data into cache table", zap.Error(err))
if errtx := tx.Rollback(); errtx != nil {
l.Debug(failRollback, zap.Error(errtx))
}

l.Debug(failInsert, zap.Error(err))
return err
}

err = tx.Commit()
if err != nil {
if errtx := tx.Rollback(); errtx != nil {
l.Debug(failRollback, zap.Error(errtx))
}

l.Debug(failInsert, zap.Error(err))
return err
}
}
Expand All @@ -221,7 +249,7 @@ func (d *DBCache) Insert(ctx context.Context, key string, value any) error {
// Has query for cached keys.
func (d *DBCache) Has(ctx context.Context, key string) (bool, error) {
if d.IsNilConnection() {
return false, fmt.Errorf("database connection is nil")
return false, fmt.Errorf("%s", nilConnection)
}

l := ctxzap.Extract(ctx)
Expand All @@ -248,13 +276,13 @@ func (d *DBCache) IsNilConnection() bool {
func (d *DBCache) Select(ctx context.Context, key string) ([]byte, error) {
var data []byte
if d.IsNilConnection() {
return nil, fmt.Errorf("database connection is nil")
return nil, fmt.Errorf("%s", nilConnection)
}

l := ctxzap.Extract(ctx)
rows, err := d.db.Query("SELECT data FROM http_cache where key = ?", key)
if err != nil {
l.Debug("error querying datatable", zap.Error(err))
l.Debug(errQueryingTable, zap.Error(err))
return nil, err
}

Expand All @@ -272,24 +300,44 @@ func (d *DBCache) Select(ctx context.Context, key string) ([]byte, error) {

func (d *DBCache) Remove(ctx context.Context, key string) error {
if d.IsNilConnection() {
return fmt.Errorf("database connection is nil")
return fmt.Errorf("%s", nilConnection)
}

l := ctxzap.Extract(ctx)
if ok, _ := d.Has(ctx, key); ok {
_, err := d.db.Exec("DELETE FROM http_cache WHERE key = ?", key)
tx, err := d.db.Begin()
if err != nil {
l.Debug(failStartTransaction, zap.Error(err))
return err
}

_, err = d.db.Exec("DELETE FROM http_cache WHERE key = ?", key)
if err != nil {
if errtx := tx.Rollback(); errtx != nil {
l.Debug(failRollback, zap.Error(errtx))
}

l.Debug("error deleting key", zap.Error(err))
return err
}

err = tx.Commit()
if err != nil {
if errtx := tx.Rollback(); errtx != nil {
l.Debug(failRollback, zap.Error(errtx))
}

l.Debug("Failed to remove cache value", zap.Error(err))
return err
}
}

return nil
}

func (d *DBCache) close(ctx context.Context) error {
if d.IsNilConnection() {
return fmt.Errorf("database connection is nil")
return fmt.Errorf("%s", nilConnection)
}

err := d.db.Close()
Expand All @@ -314,13 +362,13 @@ func (d *DBCache) DeleteExpired(ctx context.Context) error {
)

if d.IsNilConnection() {
return fmt.Errorf("database connection is nil")
return fmt.Errorf("%s", nilConnection)
}

l := ctxzap.Extract(ctx)
rows, err := d.db.Query("SELECT key, expiration FROM http_cache")
if err != nil {
l.Debug("error querying datatable", zap.Error(err))
l.Debug(errQueryingTable, zap.Error(err))
return err
}

Expand Down
4 changes: 4 additions & 0 deletions vendor/github.com/mattn/go-sqlite3/.codecov.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions vendor/github.com/mattn/go-sqlite3/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions vendor/github.com/mattn/go-sqlite3/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 40c5957

Please sign in to comment.