Skip to content

Commit

Permalink
fix(pipelineloop): caching should include the params for making cachi…
Browse files Browse the repository at this point in the history
…ng key. (kubeflow#1056)

* fix(pipelineloop): caching should include the params for makeing caching key.

* Get params from run spec itself.

* Migrated cache for custom task controllers to gorm v2.

* code cleanup.

* Added retry for cache connect until timoeut.

* improved tests to be able to detect config maps. Better error reporting.
  • Loading branch information
ScrapCodes authored Oct 24, 2022
1 parent 9b9b932 commit 98a2332
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 160 deletions.
7 changes: 3 additions & 4 deletions tekton-catalog/cache/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ module github.com/kubeflow/kfp-tekton/tekton-catalog/cache
go 1.13

require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/go-sql-driver/mysql v1.6.0
github.com/jinzhu/gorm v1.9.16
github.com/mattn/go-sqlite3 v1.14.0
gorm.io/driver/mysql v1.4.3
gorm.io/driver/sqlite v1.4.2
gorm.io/gorm v1.24.0
)
25 changes: 10 additions & 15 deletions tekton-catalog/cache/pkg/db/db_conn_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ import (
"fmt"
"time"

_ "github.com/go-sql-driver/mysql"
"github.com/jinzhu/gorm"
"github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/model"
"gorm.io/gorm"
)

type ConnectionParams struct {
Expand All @@ -32,36 +31,32 @@ type ConnectionParams struct {
DbPwd string
DbGroupConcatMaxLen string
DbExtraParams string
Timeout time.Duration
Timeout time.Duration
}

func InitDBClient(params ConnectionParams, initConnectionTimeout time.Duration) (*gorm.DB, error) {
driverName := params.DbDriver
var arg string
var db *gorm.DB
var err error

switch driverName {
case mysqlDBDriverDefault:
arg, err = initMysql(params, initConnectionTimeout)
if err != nil {
return nil, err
}
case sqliteDriverDefault:
arg = initSqlite(params.DbName)
case "mysql":
db, err = initMysql(params)
case "sqlite":
db, err = initSqlite(params.DbName)
default:
return nil, fmt.Errorf("driver %v is not supported", driverName)
return nil, fmt.Errorf("driver %s is not supported", driverName)
}

// db is safe for concurrent use by multiple goroutines
// and maintains its own pool of idle connections.
db, err := gorm.Open(driverName, arg)
if err != nil {
return nil, err
}
// Create table
response := db.AutoMigrate(&model.TaskCache{})
if response.Error != nil {
return nil, fmt.Errorf("failed to initialize the databases: Error: %w", response.Error)
if response != nil {
return nil, fmt.Errorf("failed to initialize the databases: Error: %v", response)
}
return db, nil
}
87 changes: 23 additions & 64 deletions tekton-catalog/cache/pkg/db/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
package db

import (
"database/sql"
"encoding/json"
"fmt"
"time"

"github.com/cenkalti/backoff"

"github.com/go-sql-driver/mysql"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)

const (
Expand Down Expand Up @@ -52,72 +50,37 @@ func (params *ConnectionParams) LoadMySQLDefaults() {
}
}

func initMysql(params ConnectionParams, initConnectionTimeout time.Duration) (string, error) {
func initMysql(params ConnectionParams) (*gorm.DB, error) {
var mysqlExtraParams = map[string]string{}
data := []byte(params.DbExtraParams)
_ = json.Unmarshal(data, &mysqlExtraParams)
mysqlConfig := CreateMySQLConfig(
mysqlConfigDSN := CreateMySQLConfigDSN(
params.DbUser,
params.DbPwd,
params.DbHost,
params.DbPort,
"",
params.DbName,
params.DbGroupConcatMaxLen,
mysqlExtraParams,
)

var db *sql.DB
var err error
var operation = func() error {
db, err = sql.Open(params.DbDriver, mysqlConfig.FormatDSN())
if err != nil {
return err
}
return nil
}
b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = initConnectionTimeout
err = backoff.Retry(operation, b)
if err != nil {
return "", err
}
defer db.Close()

// Create database if not exist
dbName := params.DbName
operation = func() error {
_, err = db.Exec(fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", dbName))
if err != nil {
return err
}
return nil
}
b = backoff.NewExponentialBackOff()
b.MaxElapsedTime = initConnectionTimeout
err = backoff.Retry(operation, b)

operation = func() error {
_, err = db.Exec(fmt.Sprintf("USE %s", dbName))
if err != nil {
return err
}
return nil
}
b = backoff.NewExponentialBackOff()
b.MaxElapsedTime = initConnectionTimeout
err = backoff.Retry(operation, b)

mysqlConfig.DBName = dbName
// Config reference: https://github.com/go-sql-driver/mysql#clientfoundrows
mysqlConfig.ClientFoundRows = true
return mysqlConfig.FormatDSN(), nil
db, err := gorm.Open(mysql.New(mysql.Config{
DSN: mysqlConfigDSN, // data source name, refer https://github.com/go-sql-driver/mysql#dsn-data-source-name
DefaultStringSize: 256, // add default size for string fields, by default, will use db type `longtext` for fields without size, not a primary key, no index defined and don't have default values
DontSupportRenameIndex: true, // drop & create index when rename index, rename index not supported before MySQL 5.7, MariaDB
DontSupportRenameColumn: true, // use change when rename column, rename rename not supported before MySQL 8, MariaDB
SkipInitializeWithVersion: false, // smart configure based on used version
}), &gorm.Config{})

return db, err
}

func CreateMySQLConfig(user, password, mysqlServiceHost, mysqlServicePort, dbName, mysqlGroupConcatMaxLen string,
mysqlExtraParams map[string]string) *mysql.Config {
func CreateMySQLConfigDSN(user, password, mysqlServiceHost, mysqlServicePort, dbName, mysqlGroupConcatMaxLen string,
mysqlExtraParams map[string]string) string {

if mysqlGroupConcatMaxLen == "" {
mysqlGroupConcatMaxLen = "4194304"
}
params := map[string]string{
"charset": "utf8",
"parseTime": "True",
"loc": "Local",
"group_concat_max_len": mysqlGroupConcatMaxLen,
Expand All @@ -126,14 +89,10 @@ func CreateMySQLConfig(user, password, mysqlServiceHost, mysqlServicePort, dbNam
for k, v := range mysqlExtraParams {
params[k] = v
}
dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4", user, password, mysqlServiceHost, mysqlServicePort, dbName)

return &mysql.Config{
User: user,
Passwd: password,
Net: "tcp",
Addr: fmt.Sprintf("%s:%s", mysqlServiceHost, mysqlServicePort),
Params: params,
DBName: dbName,
AllowNativePasswords: true,
for k, v := range params {
dsn = fmt.Sprintf("%s&%s=%s", dsn, k, v)
}
return dsn
}
21 changes: 12 additions & 9 deletions tekton-catalog/cache/pkg/db/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@

package db

const sqliteDriverDefault = "sqlite3"
import (
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)

func (params *ConnectionParams) LoadSqliteDefaults() {
setDefault(&params.DbDriver, sqliteDriverDefault)
setDefault(&params.DbName, ":memory:")
}

func initSqlite(dbName string) string {
func initSqlite(dbName string) (*gorm.DB, error) {
var db *gorm.DB
var err error
if dbName == "" {
dbName = ":memory:" // default db.
db, err = gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
} else {
db, err = gorm.Open(sqlite.Open(dbName), &gorm.Config{})
}
return dbName

return db, err
}
22 changes: 8 additions & 14 deletions tekton-catalog/cache/pkg/task_cache_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ import (
"fmt"
"time"

"github.com/jinzhu/gorm"
"github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/db"
"github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/model"
"gorm.io/gorm"
)


type TaskCacheStore struct {
db *gorm.DB
Disabled bool
Params db.ConnectionParams
Params db.ConnectionParams
}

func (t *TaskCacheStore) Connect() error {
Expand All @@ -44,28 +43,23 @@ func (t *TaskCacheStore) Get(taskHashKey string) (*model.TaskCache, error) {
return nil, nil
}
entry := &model.TaskCache{}
d := t.db.Table("task_caches").Where("TaskHashKey = ?", taskHashKey).
d := t.db.Model(&model.TaskCache{}).Where("TaskHashKey = ?", taskHashKey).
Order("CreatedAt DESC").First(entry)
if d.Error != nil {
return nil, fmt.Errorf("failed to get entry from cache: %q. Error: %v", taskHashKey, d.Error)
}
return entry, nil
}

func (t *TaskCacheStore) Put(entry *model.TaskCache) (*model.TaskCache, error) {
func (t *TaskCacheStore) Put(entry *model.TaskCache) error {
if t.Disabled || t.db == nil {
return nil, nil
}
ok := t.db.NewRecord(entry)
if !ok {
return nil, fmt.Errorf("failed to create a new cache entry, %#v, Error: %v", entry, t.db.Error)
return nil
}
rowInsert := &model.TaskCache{}
d := t.db.Create(entry).Scan(rowInsert)
d := t.db.Create(entry)
if d.Error != nil {
return nil, d.Error
return fmt.Errorf("failed to create a new cache entry, %#v, Error: %v", entry, t.db.Error)
}
return rowInsert, nil
return nil
}

func (t *TaskCacheStore) Delete(id string) error {
Expand Down
33 changes: 14 additions & 19 deletions tekton-catalog/cache/pkg/task_cache_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import (

"github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/db"
"github.com/kubeflow/kfp-tekton/tekton-catalog/cache/pkg/model"
_ "github.com/mattn/go-sqlite3"
)

func newTestingCacheStore(disabled bool) (*TaskCacheStore, error) {
t := TaskCacheStore{
Disabled: disabled,
Params: db.ConnectionParams{DbDriver: "sqlite3", DbName: ":memory:"},
// Params: db.ConnectionParams{DbDriver: "mysql", DbName: "testdb",
// DbHost: "127.0.0.1", DbPort: "3306", DbPwd: "", DbUser: "root",
// Timeout: 10 * time.Second,
// },
Params: db.ConnectionParams{DbDriver: "sqlite", DbName: ":memory:"},
}
err := t.Connect()
return &t, err
Expand All @@ -47,19 +50,11 @@ func TestPut(t *testing.T) {
t.Fatal(err)
}
entry := createTaskCache("x", "y")
taskCache, err := taskCacheStore.Put(entry)
err = taskCacheStore.Put(entry)
if err != nil {
t.Fatal(err)
}
if taskCache.TaskHashKey != entry.TaskHashKey {
t.Errorf("Mismatached key. Expected %s Found: %s", entry.TaskHashKey,
taskCache.TaskHashKey)
}
if taskCache.TaskOutput != entry.TaskOutput {
t.Errorf("Mismatached output. Expected : %s Found: %s",
entry.TaskOutput,
taskCache.TaskOutput)
}

}

func TestGet(t *testing.T) {
Expand All @@ -68,11 +63,11 @@ func TestGet(t *testing.T) {
t.Fatal(err)
}
entry := createTaskCache("x", "y")
taskCache, err := taskCacheStore.Put(entry)
err = taskCacheStore.Put(entry)
if err != nil {
t.Fatal(err)
}
cacheResult, err := taskCacheStore.Get(taskCache.TaskHashKey)
cacheResult, err := taskCacheStore.Get(entry.TaskHashKey)
if err != nil {
t.Error(err)
}
Expand All @@ -95,11 +90,11 @@ func TestGetLatest(t *testing.T) {
}
for i := 1; i < 10; i++ {
entry := createTaskCache("x", fmt.Sprintf("y%d", i))
taskCache, err := taskCacheStore.Put(entry)
err := taskCacheStore.Put(entry)
if err != nil {
t.Fatal(err)
}
cacheResult, err := taskCacheStore.Get(taskCache.TaskHashKey)
cacheResult, err := taskCacheStore.Get(entry.TaskHashKey)
if err != nil {
t.Error(err)
}
Expand All @@ -122,7 +117,7 @@ func TestDisabledCache(t *testing.T) {
}
taskCache, err := taskCacheStore.Get("random")
if err != nil {
t.Errorf("a disabled cache returned non nil error: %w", err)
t.Errorf("a disabled cache returned non nil error: %s", err)
}
if taskCache != nil {
t.Errorf("a disabled cache should return nil")
Expand All @@ -141,7 +136,7 @@ func TestPruneOlderThan(t *testing.T) {
TaskOutput: "cacheOutput",
CreatedAt: time.UnixMicro(int64(i * 100)),
}
_, err = taskCacheStore.Put(t1)
err = taskCacheStore.Put(t1)
if err != nil {
t.Fatal(err)
}
Expand All @@ -157,7 +152,7 @@ func TestPruneOlderThan(t *testing.T) {
if err != nil {
t.Fatal(err)
}
taskCache, err = taskCacheStore.Get(hashKey)
_, err = taskCacheStore.Get(hashKey)
if err == nil {
t.Errorf("Expected error to be not nil")
}
Expand Down
2 changes: 1 addition & 1 deletion tekton-catalog/pipeline-loops/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ module github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops
go 1.13

require (
github.com/cenkalti/backoff/v4 v4.1.3
github.com/google/go-cmp v0.5.8
github.com/hashicorp/go-multierror v1.1.1
github.com/kubeflow/kfp-tekton/tekton-catalog/cache v0.0.0
github.com/kubeflow/kfp-tekton/tekton-catalog/objectstore v0.0.0
github.com/mattn/go-sqlite3 v1.14.0
github.com/tektoncd/pipeline v0.38.4
go.uber.org/zap v1.21.0
gomodules.xyz/jsonpatch/v2 v2.2.0
Expand Down
Loading

0 comments on commit 98a2332

Please sign in to comment.