Skip to content

Commit

Permalink
DB Access: OnChange cache (sonic-net#82)
Browse files Browse the repository at this point in the history
* DB Access: OnChange cache

1) Introduced the OnChange cache in the DB object. Cache is maintained
as map[string]db.Value, where map key is the redis key.

OnChange cache is not enabled by default. DB instance should be inited
with IsEnableOnChange=true option and caching should be enabled for
specific tables by calling DB.RegisterTableForOnChangeCaching().
DB.OnChangeCacheUpdate() and DB.OnChangeCacheDelete() functions should
be used to refresh the cache contents.

2) Enhanced DB.GetEntry() to return value from the OnChnage cahce if
the key exists in the cache. Otherwise the value will be read from
redis and cache will be updated.

Signed-off-by: Arun Barboza <[email protected]>
Signed-off-by: Sachin Holla <[email protected]>

* Rename IsEnableOnChange option as IsOnChangeEnabled

---------

Signed-off-by: Arun Barboza <[email protected]>
Signed-off-by: Sachin Holla <[email protected]>
  • Loading branch information
sachinholla authored May 24, 2023
1 parent cc214fb commit cf40f46
Show file tree
Hide file tree
Showing 4 changed files with 429 additions and 14 deletions.
101 changes: 87 additions & 14 deletions translib/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ type Options struct {
InitIndicator string
TableNameSeparator string //Overriden by the DB config file's separator.
KeySeparator string //Overriden by the DB config file's separator.
IsWriteDisabled bool //Indicated if write is allowed
IsWriteDisabled bool //Indicated if write is allowed
IsOnChangeEnabled bool // whether OnChange cache enabled

DisableCVLCheck bool
}
Expand Down Expand Up @@ -249,6 +250,15 @@ type Table struct {
db *DB
}

type dbCache struct {
Tables map[string]Table
}

const (
ConnectionClosed = tlerr.TranslibDBInvalidState("connection closed")
OnChangeDisabled = tlerr.TranslibDBInvalidState("OnChange disabled")
)

type _txOp int

const (
Expand All @@ -275,6 +285,9 @@ type DB struct {
cv *cvl.CVL
cvlEditConfigData []cvl.CVLEditConfigData

onCReg dbOnChangeReg // holds OnChange enabled table names
cache dbCache // holds OnChange cache

/*
sKeys []*SKey // Subscribe Key array
sHandler HFunc // Handler Function
Expand Down Expand Up @@ -341,6 +354,7 @@ func GetdbNameToIndex(dbName string) DBNum {
// NewDB is the factory method to create new DB's.
func NewDB(opt Options) (*DB, error) {

var d DB
var e error

if glog.V(3) {
Expand Down Expand Up @@ -370,8 +384,14 @@ func NewDB(opt Options) (*DB, error) {
} else {
glog.Error(fmt.Errorf("Invalid database number %d", dbId))
}

d := DB{client: redis.NewClient(&redis.Options{

if opt.IsOnChangeEnabled && !opt.IsWriteDisabled {
glog.Errorf("NewDB: IsEnableOnChange cannot be set on write enabled DB")
e = tlerr.TranslibDBCannotOpen{}
goto NewDBExit
}

d = DB{client: redis.NewClient(&redis.Options{
Network: "tcp",
Addr: ipAddr,
//Addr: DefaultRedisRemoteTCPEP,
Expand All @@ -395,6 +415,10 @@ func NewDB(opt Options) (*DB, error) {
goto NewDBExit
}

if opt.IsOnChangeEnabled {
d.onCReg = dbOnChangeReg{CacheTables: make(map[string]bool)}
}

if opt.DBNo != ConfigDB {
if glog.V(3) {
glog.Info("NewDB: ! ConfigDB. Skip init. check.")
Expand Down Expand Up @@ -438,6 +462,17 @@ func (d *DB) DeleteDB() error {
return d.client.Close()
}

func (d *DB) Name() string {
if d == nil {
return ""
}
return getDBInstName(d.Opts.DBNo)
}

func (d *DB) IsOpen() bool {
return d != nil && d.client != nil
}

func (d *DB) key2redis(ts *TableSpec, key Key) string {

if glog.V(5) {
Expand Down Expand Up @@ -482,31 +517,61 @@ func (d *DB) ts2redisUpdated(ts *TableSpec) string {

// GetEntry retrieves an entry(row) from the table.
func (d *DB) GetEntry(ts *TableSpec, key Key) (Value, error) {
if !d.IsOpen() {
return Value{}, ConnectionClosed
}
return d.getEntry(ts, key, false)
}

func (d *DB) getEntry(ts *TableSpec, key Key, forceReadDB bool) (Value, error) {

if glog.V(3) {
glog.Info("GetEntry: Begin: ", "ts: ", ts, " key: ", key)
}

var value Value
var cacheHit bool
var e error

/*
m := make(map[string]string)
m["f0.0"] = "v0.0"
m["f0.1"] = "v0.1"
m["f0.2"] = "v0.2"
v := Value{Field: m}
*/
entry := d.key2redis(ts, key)
useCache := d.Opts.IsOnChangeEnabled && d.onCReg.isCacheTable(ts.Name)

if !forceReadDB && useCache {
if table, ok := d.cache.Tables[ts.Name]; ok {
if value, ok = table.entry[entry]; ok {
value = value.Copy()
cacheHit = true
}
}
}

v, e := d.client.HGetAll(d.key2redis(ts, key)).Result()
if !cacheHit {
value.Field, e = d.client.HGetAll(d.key2redis(ts, key)).Result()
}

if len(v) != 0 {
value = Value{Field: v}
} else {
if e != nil {
glog.V(2).Infof("GetEntry: %s: HGetAll(%q) error: %v", d.Name(), entry, e)
value = Value{}

} else if !value.IsPopulated() {
if glog.V(4) {
glog.Info("GetEntry: HGetAll(): empty map")
}
// e = errors.New("Entry does not exist")
e = tlerr.TranslibRedisClientEntryNotExist{Entry: d.key2redis(ts, key)}

} else if !cacheHit && useCache {
if _, ok := d.cache.Tables[ts.Name]; !ok {
if d.cache.Tables == nil {
d.cache.Tables = make(map[string]Table, d.onCReg.size())
}
d.cache.Tables[ts.Name] = Table{
ts: ts,
entry: make(map[string]Value),
db: d,
}
}
d.cache.Tables[ts.Name].entry[entry] = value.Copy()
}

if glog.V(3) {
Expand Down Expand Up @@ -1097,6 +1162,14 @@ func (t *Table) GetEntry(key Key) (Value, error) {

//===== Functions for db.Value =====

func (v Value) Copy() (rV Value) {
rV = Value{Field: make(map[string]string, len(v.Field))}
for k, v1 := range v.Field {
rV.Field[k] = v1
}
return
}

func (v *Value) IsPopulated() bool {
return len(v.Field) > 0
}
Expand Down
123 changes: 123 additions & 0 deletions translib/db/db_onchangecache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
////////////////////////////////////////////////////////////////////////////////
// //
// Copyright 2021 Broadcom. The term Broadcom refers to Broadcom Inc. and/or //
// its subsidiaries. //
// //
// Licensed under the Apache License, Version 2.0 (the "License"); //
// you may not use this file except in compliance with the License. //
// You may obtain a copy of the License at //
// //
// http://www.apache.org/licenses/LICENSE-2.0 //
// //
// Unless required by applicable law or agreed to in writing, software //
// distributed under the License is distributed on an "AS IS" BASIS, //
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. //
// See the License for the specific language governing permissions and //
// limitations under the License. //
// //
////////////////////////////////////////////////////////////////////////////////

package db

import (
"github.com/golang/glog"
)

////////////////////////////////////////////////////////////////////////////////
// Internal Types //
////////////////////////////////////////////////////////////////////////////////

type dbOnChangeReg struct {
CacheTables map[string]bool // Only cache these tables.
}

////////////////////////////////////////////////////////////////////////////////
// Exported Functions //
////////////////////////////////////////////////////////////////////////////////

func (d *DB) RegisterTableForOnChangeCaching(ts *TableSpec) error {
if glog.V(3) {
glog.Info("RegisterTableForOnChange: ts:", ts)
}
if !d.IsOpen() {
return ConnectionClosed
}
if !d.Opts.IsOnChangeEnabled {
return OnChangeDisabled
}

d.onCReg.CacheTables[ts.Name] = true
return nil
}

// OnChangeCacheUpdate reads a db entry from redis and updates the on_change cache.
// Returns both previously cached Value and current Value. Previous Value will be
// empty if there was no such cache entry earlier. Returns an error if DB entry
// does not exists or could not be read.
func (d *DB) OnChangeCacheUpdate(ts *TableSpec, key Key) (Value, Value, error) {
if glog.V(3) {
glog.Info("OnChangeCacheUpdate: Begin: ", "ts: ", ts, " key: ", key)
}
if !d.IsOpen() {
return Value{}, Value{}, ConnectionClosed
}
if !d.Opts.IsOnChangeEnabled {
return Value{}, Value{}, OnChangeDisabled
}

var valueOrig Value
if _, ok := d.cache.Tables[ts.Name]; ok {
valueOrig = d.cache.Tables[ts.Name].entry[d.key2redis(ts, key)]
}

// Get New Value from the DB
value, e := d.getEntry(ts, key, true)

return valueOrig, value, e
}

// OnChangeCacheDelete deletes an entry from the on_change cache.
// Returns the previously cached Value object; or an empty Value if there was
// no such cache entry.
func (d *DB) OnChangeCacheDelete(ts *TableSpec, key Key) (Value, error) {
if glog.V(3) {
glog.Info("OnChangeCacheDelete: Begin: ", "ts:", ts, " key:", key)
}
if !d.IsOpen() {
return Value{}, ConnectionClosed
}
if !d.Opts.IsOnChangeEnabled {
return Value{}, OnChangeDisabled
}

redisKey := d.key2redis(ts, key)
var valueOrig Value
_, ok := d.cache.Tables[ts.Name]
if ok {
valueOrig, ok = d.cache.Tables[ts.Name].entry[redisKey]
}

if ok {
glog.V(2).Info("OnChangeCacheDelete: Delete ts:", ts, " key:", key)
delete(d.cache.Tables[ts.Name].entry, redisKey)
} else {
glog.V(2).Info("OnChangeCacheDelete: Not found; ts:", ts, " key:", key)
}

return valueOrig, nil
}

////////////////////////////////////////////////////////////////////////////////
// Internal Functions //
////////////////////////////////////////////////////////////////////////////////

func init() {
}

func (reg *dbOnChangeReg) isCacheTable(name string) bool {
return reg.CacheTables[name]
}

func (reg *dbOnChangeReg) size() int {
return len(reg.CacheTables)
}
Loading

0 comments on commit cf40f46

Please sign in to comment.