Skip to content

Commit

Permalink
Cache Audit info
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandros Filios <[email protected]>
  • Loading branch information
alexandrosfilios committed Jul 18, 2024
1 parent 8f3f4f4 commit aa40343
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 77 deletions.
133 changes: 57 additions & 76 deletions token/services/db/sql/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"bytes"
"database/sql"
"fmt"
"sync"

"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/cache/secondcache"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/hash"
Expand All @@ -20,9 +19,10 @@ import (
"go.uber.org/zap/zapcore"
)

type cache interface {
Get(key string) (interface{}, bool)
Add(key string, value interface{})
type cache[T any] interface {
Get(key string) (T, bool)
GetOrLoad(key string, loader func() (T, error)) (T, bool, error)
Add(key string, value T)
Delete(key string)
}

Expand All @@ -36,33 +36,45 @@ type IdentityDB struct {
db *sql.DB
table identityTables

singerInfoCacheMutex sync.RWMutex
singerInfoCache cache
signerInfoCache cache[bool]
auditInfoCache cache[[]byte]
}

func newIdentityDB(db *sql.DB, tables identityTables, singerInfoCache cache) *IdentityDB {
func newIdentityDB(db *sql.DB, tables identityTables, singerInfoCache cache[bool], auditInfoCache cache[[]byte]) *IdentityDB {
return &IdentityDB{
db: db,
table: tables,
singerInfoCache: singerInfoCache,
signerInfoCache: singerInfoCache,
auditInfoCache: auditInfoCache,
}
}

func NewCachedIdentityDB(db *sql.DB, tablePrefix string, createSchema bool) (driver.IdentityDB, error) {
return NewIdentityDB(db, tablePrefix, createSchema, secondcache.New(1000))
return NewIdentityDB(
db,
tablePrefix,
createSchema,
secondcache.NewTyped[bool](1000),
secondcache.NewTyped[[]byte](1000),
)
}

func NewIdentityDB(db *sql.DB, tablePrefix string, createSchema bool, signerInfoCache cache) (*IdentityDB, error) {
func NewIdentityDB(db *sql.DB, tablePrefix string, createSchema bool, signerInfoCache cache[bool], auditInfoCache cache[[]byte]) (*IdentityDB, error) {
tables, err := getTableNames(tablePrefix)
if err != nil {
return nil, errors.Wrapf(err, "failed to get table names")
}

identityDB := newIdentityDB(db, identityTables{
IdentityConfigurations: tables.IdentityConfigurations,
IdentityInfo: tables.IdentityInfo,
Signers: tables.Signers,
}, signerInfoCache)
identityDB := newIdentityDB(
db,
identityTables{
IdentityConfigurations: tables.IdentityConfigurations,
IdentityInfo: tables.IdentityInfo,
Signers: tables.Signers,
},
signerInfoCache,
auditInfoCache,
)
if createSchema {
if err = initSchema(db, identityDB.GetSchema()); err != nil {
return nil, err
Expand Down Expand Up @@ -119,24 +131,31 @@ func (db *IdentityDB) StoreIdentityData(id []byte, identityAudit []byte, tokenMe
}
return nil
}

db.auditInfoCache.Add(h, identityAudit)

return err
}

func (db *IdentityDB) GetAuditInfo(id []byte) ([]byte, error) {
h := token.Identity(id).String()
//logger.Infof("get identity data for [%s] from [%s]", view.Identity(id), string(debug.Stack()))
query := fmt.Sprintf("SELECT identity_audit_info FROM %s WHERE identity_hash = $1", db.table.IdentityInfo)
logger.Debug(query)
row := db.db.QueryRow(query, h)
var info []byte
err := row.Scan(&info)
if err != nil {

value, _, err := db.auditInfoCache.GetOrLoad(h, func() ([]byte, error) {
//logger.Infof("get identity data for [%s] from [%s]", view.Identity(id), string(debug.Stack()))
query := fmt.Sprintf("SELECT identity_audit_info FROM %s WHERE identity_hash = $1", db.table.IdentityInfo)
logger.Debug(query)
row := db.db.QueryRow(query, h)
var info []byte
err := row.Scan(&info)
if err == nil {
return info, nil
}
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, errors.Wrapf(err, "error querying db")
}
return info, nil
})
return value, err
}

func (db *IdentityDB) GetTokenInfo(id []byte) ([]byte, []byte, error) {
Expand Down Expand Up @@ -172,51 +191,28 @@ func (db *IdentityDB) StoreSignerInfo(id, info []byte) error {
}
}

db.singerInfoCacheMutex.Lock()
db.singerInfoCache.Add(h, true)
db.singerInfoCacheMutex.Unlock()
db.signerInfoCache.Add(h, true)
return nil
}

func (db *IdentityDB) SignerInfoExists(id []byte) (bool, error) {
h := token.Identity(id).String()

// is in cache?
db.singerInfoCacheMutex.RLock()
v, ok := db.singerInfoCache.Get(h)
if ok {
db.singerInfoCacheMutex.RUnlock()
return v != nil && v.(bool), nil
}
db.singerInfoCacheMutex.RUnlock()

// get from store
db.singerInfoCacheMutex.Lock()
defer db.singerInfoCacheMutex.Unlock()

// is in cache, first?
v, ok = db.singerInfoCache.Get(h)
if ok {
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("hit the cache, len state [%b]", v.(bool))
value, _, err := db.signerInfoCache.GetOrLoad(h, func() (bool, error) {
query := fmt.Sprintf("SELECT info FROM %s WHERE identity_hash = $1", db.table.Signers)
logger.Debug(query)
row := db.db.QueryRow(query, h)
var info []byte
err := row.Scan(&info)
if err == nil {
return true, nil
}
return v != nil && v.(bool), nil
}

// get from store and store in cache
exists, err := db.signerInfoExists(h)
if err != nil {
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("failed getting state [%s]", h)
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
db.singerInfoCache.Delete(h)
return false, err
}
db.singerInfoCache.Add(h, exists)
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("signer info [%s] exists [%v]", h, exists)
}
return exists, nil
return false, errors.Wrapf(err, "error querying db")
})
return value, err
}

func (db *IdentityDB) GetSignerInfo(identity []byte) ([]byte, error) {
Expand All @@ -235,21 +231,6 @@ func (db *IdentityDB) GetSignerInfo(identity []byte) ([]byte, error) {
return info, nil
}

func (db *IdentityDB) signerInfoExists(h string) (bool, error) {
query := fmt.Sprintf("SELECT info FROM %s WHERE identity_hash = $1", db.table.Signers)
logger.Debug(query)
row := db.db.QueryRow(query, h)
var info []byte
err := row.Scan(&info)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return false, nil
}
return false, errors.Wrapf(err, "error querying db")
}
return true, nil
}

type IdentityConfigurationIterator struct {
rows *sql.Rows
configurationType string
Expand Down
2 changes: 1 addition & 1 deletion token/services/db/sql/identity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func initIdentityDB(driverName, dataSourceName, tablePrefix string, maxOpenConns
if err != nil {
return nil, err
}
return NewIdentityDB(sqlDB, tablePrefix, true, secondcache.New(1000))
return NewIdentityDB(sqlDB, tablePrefix, true, secondcache.NewTyped[bool](1000), secondcache.NewTyped[[]byte](1000))
}

func TestIdentitySqlite(t *testing.T) {
Expand Down

0 comments on commit aa40343

Please sign in to comment.