diff --git a/go.mod b/go.mod index b44fdf1e..93defee9 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ toolchain go1.22.9 require ( github.com/Rican7/retry v0.3.1 - github.com/canonical/go-dqlite v1.5.1 github.com/go-sql-driver/mysql v1.8.1 github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa github.com/jackc/pgx/v5 v5.7.1 @@ -39,7 +38,6 @@ require ( github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect github.com/dustin/go-humanize v1.0.0 // indirect - github.com/ghodss/yaml v1.0.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -98,7 +96,6 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect sigs.k8s.io/yaml v1.4.0 // indirect diff --git a/go.sum b/go.sum index ef643ca0..dfda5f95 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,6 @@ github.com/Rican7/retry v0.3.1/go.mod h1:CxSDrhAyXmTMeEuRAnArMu1FHu48vtfjLREWqVl github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= -github.com/canonical/go-dqlite v1.5.1 h1:1YjtIrFsC1A3XlgsX38ARAiKhvkZS63PqsEd8z3T4yU= -github.com/canonical/go-dqlite v1.5.1/go.mod h1:wp00vfMvPYgNCyxcPdHB5XExmDoCGoPUGymloAQT17Y= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -44,7 +42,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM= github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4= -github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= diff --git a/pkg/drivers/dqlite/dqlite.go b/pkg/drivers/dqlite/dqlite.go deleted file mode 100644 index f1c8bedd..00000000 --- a/pkg/drivers/dqlite/dqlite.go +++ /dev/null @@ -1,247 +0,0 @@ -//go:build dqlite -// +build dqlite - -package dqlite - -import ( - "context" - "database/sql" - "fmt" - "net/url" - "os" - "strconv" - "strings" - - "github.com/canonical/go-dqlite" - "github.com/canonical/go-dqlite/client" - "github.com/canonical/go-dqlite/driver" - "github.com/k3s-io/kine/pkg/drivers" - "github.com/k3s-io/kine/pkg/drivers/generic" - "github.com/k3s-io/kine/pkg/drivers/sqlite" - "github.com/k3s-io/kine/pkg/server" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -var ( - Dialer = client.DefaultDialFunc - Logger = client.DefaultLogFunc -) - -func init() { - generic.RegisterDriver("dqlite", New) - // We assume SQLite will be used multi-threaded - if err := dqlite.ConfigMultiThread(); err != nil { - panic(errors.Wrap(err, "failed to set dqlite multithreaded mode")) - } -} - -type opts struct { - peers []client.NodeInfo - peerFile string - dsn string -} - -func AddPeers(ctx context.Context, nodeStore client.NodeStore, additionalPeers ...client.NodeInfo) error { - existing, err := nodeStore.Get(ctx) - if err != nil { - return err - } - - var peers []client.NodeInfo - -outer: - for _, peer := range additionalPeers { - for _, check := range existing { - if check.Address == peer.Address { - continue outer - } - } - peers = append(peers, peer) - } - - if len(peers) > 0 { - err = nodeStore.Set(ctx, append(existing, peers...)) - if err != nil { - return err - } - } - - return nil -} - -func New(ctx context.Context, cfg *drivers.Config) (bool, server.Backend, error) { - dataSourceName = cfg.Address - opts, err := parseOpts(datasourceName) - if err != nil { - return nil, err - } - - var nodeStore client.NodeStore - if opts.peerFile != "" { - nodeStore, err = client.DefaultNodeStore(opts.peerFile) - if err != nil { - return nil, errors.Wrap(err, "opening peerfile") - } - } else { - nodeStore = client.NewInmemNodeStore() - } - - if err := AddPeers(ctx, nodeStore, opts.peers...); err != nil { - return nil, errors.Wrap(err, "add peers") - } - - d, err := driver.New(nodeStore, - driver.WithLogFunc(Logger), - driver.WithContext(ctx), - driver.WithDialFunc(Dialer)) - if err != nil { - return nil, errors.Wrap(err, "new dqlite driver") - } - - sql.Register("dqlite", d) - cfg.Address = opts.dsn - backend, generic, err := sqlite.NewVariant(ctx, "dqlite", cfg) - if err != nil { - return nil, errors.Wrap(err, "sqlite client") - } - if err := migrate(ctx, generic.DB); err != nil { - return nil, errors.Wrap(err, "failed to migrate DB from sqlite") - } - - generic.LockWrites = true - generic.Retry = func(err error) bool { - if err, ok := err.(driver.Error); ok { - return err.Code == driver.ErrBusy - } - return false - } - generic.TranslateErr = func(err error) error { - if strings.Contains(err.Error(), "UNIQUE constraint") { - return server.ErrKeyExists - } - return err - } - - return true, backend, nil -} - -func migrate(ctx context.Context, newDB *sql.DB) (exitErr error) { - row := newDB.QueryRowContext(ctx, "SELECT COUNT(*) FROM kine") - var count int64 - if err := row.Scan(&count); err != nil { - return err - } - if count > 0 { - return nil - } - - if _, err := os.Stat("./db/state.db"); err != nil { - return nil - } - - oldDB, err := sql.Open("sqlite3", "./db/state.db") - if err != nil { - return nil - } - defer oldDB.Close() - - oldData, err := oldDB.QueryContext(ctx, "SELECT id, name, created, deleted, create_revision, prev_revision, lease, value, old_value FROM kine") - if err != nil { - logrus.Errorf("failed to find old data to migrate: %v", err) - return nil - } - defer oldData.Close() - - tx, err := newDB.BeginTx(ctx, nil) - if err != nil { - return err - } - defer func() { - if exitErr == nil { - exitErr = tx.Commit() - } else { - tx.Rollback() - } - }() - - for oldData.Next() { - row := []interface{}{ - new(int), - new(string), - new(int), - new(int), - new(int), - new(int), - new(int), - new([]byte), - new([]byte), - } - if err := oldData.Scan(row...); err != nil { - return err - } - - if _, err := newDB.ExecContext(ctx, "INSERT INTO kine(id, name, created, deleted, create_revision, prev_revision, lease, value, old_value) values(?, ?, ?, ?, ?, ?, ?, ?, ?)", - row...); err != nil { - return err - } - } - - if err := oldData.Err(); err != nil { - return err - } - - return nil -} - -func parseOpts(dsn string) (opts, error) { - result := opts{ - dsn: dsn, - } - - parts := strings.SplitN(dsn, "?", 2) - if len(parts) == 1 { - return result, nil - } - - values, err := url.ParseQuery(parts[1]) - if err != nil { - return result, err - } - - for k, vs := range values { - if len(vs) == 0 { - continue - } - - switch k { - case "peer": - for _, v := range vs { - parts := strings.SplitN(v, ":", 3) - if len(parts) != 3 { - return result, fmt.Errorf("must be ID:IP:PORT format got: %s", v) - } - id, err := strconv.ParseUint(parts[0], 10, 64) - if err != nil { - return result, errors.Wrapf(err, "failed to parse %s", parts[0]) - } - result.peers = append(result.peers, client.NodeInfo{ - ID: id, - Address: parts[1] + ":" + parts[2], - }) - } - delete(values, k) - case "peer-file": - result.peerFile = vs[0] - delete(values, k) - } - } - - if len(values) == 0 { - result.dsn = parts[0] - } else { - result.dsn = fmt.Sprintf("%s?%s", parts[0], values.Encode()) - } - - return result, nil -} diff --git a/pkg/drivers/dqlite/no_dqlite.go b/pkg/drivers/dqlite/no_dqlite.go deleted file mode 100644 index 078ba06b..00000000 --- a/pkg/drivers/dqlite/no_dqlite.go +++ /dev/null @@ -1,20 +0,0 @@ -//go:build !dqlite -// +build !dqlite - -package dqlite - -import ( - "context" - "errors" - - "github.com/k3s-io/kine/pkg/drivers" - "github.com/k3s-io/kine/pkg/server" -) - -func New(ctx context.Context, cfg *drivers.Config) (bool, server.Backend, error) { - return false, nil, errors.New(`this binary is built without dqlite support, compile with "-tags dqlite"`) -} - -func init() { - drivers.Register("dqlite", New) -} diff --git a/pkg/drivers/sqlite/sqlite.go b/pkg/drivers/sqlite/sqlite.go index 13e86f3b..a642c52e 100644 --- a/pkg/drivers/sqlite/sqlite.go +++ b/pkg/drivers/sqlite/sqlite.go @@ -8,7 +8,6 @@ import ( "database/sql" "fmt" "os" - "time" "github.com/k3s-io/kine/pkg/drivers" "github.com/k3s-io/kine/pkg/drivers/generic" @@ -99,22 +98,7 @@ func NewVariant(ctx context.Context, driverName string, cfg *drivers.Config) (se return err.Error() } - // this is the first SQL that will be executed on a new DB conn so - // loop on failure here because in the case of dqlite it could still be initializing - for i := 0; i < 300; i++ { - err = setup(dialect.DB) - if err == nil { - break - } - logrus.Errorf("failed to setup db: %v", err) - select { - case <-ctx.Done(): - return nil, nil, ctx.Err() - case <-time.After(time.Second): - } - time.Sleep(time.Second) - } - if err != nil { + if err := setup(dialect.DB); err != nil { return nil, nil, errors.Wrap(err, "setup db") } diff --git a/pkg/endpoint/init.go b/pkg/endpoint/init.go index ade555e8..90658da5 100644 --- a/pkg/endpoint/init.go +++ b/pkg/endpoint/init.go @@ -2,7 +2,6 @@ package endpoint import ( // Import all the default drivers - _ "github.com/k3s-io/kine/pkg/drivers/dqlite" _ "github.com/k3s-io/kine/pkg/drivers/http" _ "github.com/k3s-io/kine/pkg/drivers/mysql" _ "github.com/k3s-io/kine/pkg/drivers/nats"