diff --git a/lib/backend/backend.go b/lib/backend/backend.go index 41115efebf15a..f33a75bb77bac 100644 --- a/lib/backend/backend.go +++ b/lib/backend/backend.go @@ -25,10 +25,10 @@ import ( "strings" "time" - "github.com/gravitational/teleport/api/types" "github.com/gravitational/trace" - "github.com/jonboulle/clockwork" + + "github.com/gravitational/teleport/api/types" ) // Forever means that object TTL will not expire unless deleted @@ -232,36 +232,6 @@ func (p Params) GetString(key string) string { return s } -// Cleanse fixes an issue with yamlv2 decoding nested sections to -// map[interface{}]interface{} rather than map[string]interface{}. -// ObjectToStruct will fail on the former. yamlv3 corrects this behavior. -// All non-string keys are dropped. -func (p Params) Cleanse() { - for key, value := range p { - if mapValue, ok := value.(map[interface{}]interface{}); ok { - p[key] = convertParams(mapValue) - } - } -} - -// convertParams converts from a map[interface{}]interface{} to -// map[string]interface{} recursively. All non-string keys are dropped. -// This function is called by Params.Cleanse. -func convertParams(from map[interface{}]interface{}) (to map[string]interface{}) { - to = make(map[string]interface{}, len(from)) - for key, value := range from { - strKey, ok := key.(string) - if !ok { - continue - } - if mapValue, ok := value.(map[interface{}]interface{}); ok { - value = convertParams(mapValue) - } - to[strKey] = value - } - return to -} - // NoLimit specifies no limits const NoLimit = 0 diff --git a/lib/backend/backend_test.go b/lib/backend/backend_test.go index 61b09a297d8c8..80a5f0b7b9818 100644 --- a/lib/backend/backend_test.go +++ b/lib/backend/backend_test.go @@ -55,28 +55,3 @@ func TestRangeEnd(t *testing.T) { }) } } - -func TestParamsCleanse(t *testing.T) { - source := Params{ - "Addr": "localhost:345", - "TLS": map[interface{}]interface{}{ - "CAFile": "/path/to/file", - "Certs": map[interface{}]interface{}{ - "Cert": "cert.crt", - "Key": "key.crt", - }, - }, - } - expect := Params{ - "Addr": "localhost:345", - "TLS": map[string]interface{}{ - "CAFile": "/path/to/file", - "Certs": map[string]interface{}{ - "Cert": "cert.crt", - "Key": "key.crt", - }, - }, - } - source.Cleanse() - require.Equal(t, source, expect) -} diff --git a/lib/backend/postgres/backend.go b/lib/backend/postgres/backend.go deleted file mode 100644 index d690c01bd1c11..0000000000000 --- a/lib/backend/postgres/backend.go +++ /dev/null @@ -1,165 +0,0 @@ -/* -Copyright 2022 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package postgres - -import ( - "context" - "time" - - "github.com/gravitational/teleport/api/utils" - "github.com/gravitational/teleport/lib/backend" - "github.com/gravitational/teleport/lib/backend/sqlbk" - "github.com/gravitational/trace" - "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" - - // Ensure pgx driver is registered. - _ "github.com/jackc/pgx/v4/stdlib" -) - -const ( - // BackendName is the name of this backend. - BackendName = "postgres" - // AlternativeName is another name of this backend. - AlternativeName = "cockroachdb" -) - -// GetName returns BackendName (postgres). -func GetName() string { - return BackendName -} - -// New returns a Backend that speaks the PostgreSQL protocol when communicating -// with the database. The connection pool is ready and the database has been -// migrated to the most recent version upon return without an error. -func New(ctx context.Context, params backend.Params) (*sqlbk.Backend, error) { - var cfg *Config - err := utils.ObjectToStruct(params, &cfg) - if err != nil { - return nil, trace.BadParameter("invalid configuration: %v", err) - } - err = cfg.CheckAndSetDefaults() - if err != nil { - return nil, trace.Wrap(err) - } - return sqlbk.New(ctx, &pgDriver{cfg: cfg}) -} - -// Config defines a configuration for the postgres backend. -type Config struct { - sqlbk.Config - - // ConnMaxIdleTime sets the maximum amount of time a connection may be idle. - // https://pkg.go.dev/database/sql#DB.SetConnMaxIdleTime - ConnMaxIdleTime time.Duration `json:"conn_max_idle_time,omitempty"` - - // ConnMaxLifetime sets the maximum amount of time a connection may be reused. - // https://pkg.go.dev/database/sql#DB.SetConnMaxLifetime - ConnMaxLifetime time.Duration `json:"conn_max_lifetime,omitempty"` - - // MaxIdleConns sets the maximum number of connections in the idle connection pool. - // https://pkg.go.dev/database/sql#DB.SetMaxIdleConns - MaxIdleConns int `json:"max_idle_conns,omitempty"` - - // SetMaxOpenConns sets the maximum number of open connections to the database. - // https://pkg.go.dev/database/sql#DB.SetMaxOpenConns - MaxOpenConns int `json:"max_open_conns,omitempty"` - - // Add configurations specific to this backend. - // - // AfterConnect pgconn.AfterConnectFunc `json:"-"` - // DialFunc pgconn.DialFunc `json:"-"` - // RuntimeParams struct { - // SearchPath string `json:"search_path"` - // } `json:"runtime_params"` -} - -// CheckAndSetDefaults validates required fields and sets default -// values for fields that have not been set. -func (c *Config) CheckAndSetDefaults() error { - if c.MaxOpenConns == 0 { - c.MaxOpenConns = DefaultMaxOpenConns - } - if c.ConnMaxIdleTime == 0 { - c.ConnMaxIdleTime = DefaultConnMaxIdleTime - } - if c.ConnMaxLifetime == 0 { - c.ConnMaxLifetime = DefaultConnMaxLifetime - } - if c.MaxIdleConns == 0 { - c.MaxIdleConns = DefaultMaxIdleConns - } - if c.Log == nil { - c.Log = logrus.WithFields(logrus.Fields{trace.Component: BackendName}) - } - if c.Clock == nil { - c.Clock = clockwork.NewRealClock() - } - - err := c.Config.CheckAndSetDefaults() - if err != nil { - return trace.Wrap(err) - } - - err = validateDatabaseName(c.Database) - if err != nil { - return trace.Wrap(err) - } - - return nil -} - -// validateDatabaseName returns true when name contains only alphanumeric and/or -// underscore/dollar characters, the first character is not a digit, and the -// name's length is less than MaxDatabaseNameLength (63 bytes). -func validateDatabaseName(name string) error { - if MaxDatabaseNameLength <= len(name) { - return trace.BadParameter("invalid PostgreSQL database name, length exceeds %d bytes. See https://www.postgresql.org/docs/14/sql-syntax-lexical.html.", MaxDatabaseNameLength) - } - for i, r := range name { - switch { - case 'A' <= r && r <= 'Z', 'a' <= r && r <= 'z', r == '_': - case i > 0 && (r == '$' || '0' <= r && r <= '9'): - default: - return trace.BadParameter("invalid PostgreSQL database name: %v. See https://www.postgresql.org/docs/14/sql-syntax-lexical.html.", name) - } - } - return nil -} - -const ( - // DefaultConnMaxIdleTime means connections are not closed due to a - // connection's idle time. - DefaultConnMaxIdleTime = 0 - - // DefaultConnMaxLifetime means connections are not closed due to a - // connection's age. - DefaultConnMaxLifetime = 0 - - // DefaultMaxIdleConns means 2 idle connections are retained in the pool (same - // configuration as the standard library). If MaxIdleConns <= 0, no idle - // connections are retained. - DefaultMaxIdleConns = 2 - - // DefaultMaxOpenConns means the maximum number of open database connections - // is 50. - DefaultMaxOpenConns = 50 - - // MaxDatabaseNameLength is the maximum PostgreSQL identifier length. - // https://www.postgresql.org/docs/14/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS - MaxDatabaseNameLength = 63 -) diff --git a/lib/backend/postgres/backend_test.go b/lib/backend/postgres/backend_test.go deleted file mode 100644 index 5d3fc5517fef5..0000000000000 --- a/lib/backend/postgres/backend_test.go +++ /dev/null @@ -1,437 +0,0 @@ -/* -Copyright 2018-2022 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package postgres - -import ( - "bytes" - "context" - "fmt" - "io" - "net/url" - "os" - "os/exec" - "path" - "strings" - "testing" - "time" - - "github.com/gravitational/teleport/api/utils" - "github.com/gravitational/teleport/lib/backend" - "github.com/gravitational/teleport/lib/backend/sqlbk" - "github.com/gravitational/trace" - "github.com/sirupsen/logrus" - "gopkg.in/yaml.v2" - - "github.com/jackc/pgx/v4" - "github.com/stretchr/testify/require" -) - -var ( - // DatabaseURL is the connection string the SQL backend uses to connect to a test - // database instance. The database must not already exist (it is created by the - // test suite). Set the URL using the environment variable - // TELEPORT_TEST_BACKEND_POSTGRES_URL. - // - // # Use in-memory CockroachDB server (if cockroach is on PATH). - // $ go test -v - // - // # Connect to local PostgreSQL socket: - // $ TELEPORT_TEST_BACKEND_POSTGRES_URL='postgres:///teleport?sslmode=disable' \ - // go test -v - // - // # Connect to PostgreSQL server with mTLS: - // $ postgres://postgres.example.com:5432/teleport?sslmode=verify-full&sslrootcert=postgres.cas&sslcert=client.crt&sslkey=client.key \ - // go test -v - // - DatabaseURL *url.URL - - // NoDrop prevents the test teleport database from being dropped at the end - // of the test. This setting has no impact for the in-memory database test. - // Enable the setting using the environment variable - // TELEPORT_TEST_BACKEND_POSTGRES_NODROP=y. - NoDrop bool - - // LogSQL logs all SQL statements executed during the test. Logging SQL - // statements may require setting the logrus standard logger's log level. - // Enable the setting using the environment variable - // TELEPORT_TEST_BACKEND_POSTGRES_LOGSQL=y. - LogSQL bool -) - -const ( - // envDatabaseURL is the environment variable that sets DatabaseURL. - envDatabaseURL = "TELEPORT_TEST_BACKEND_POSTGRES_URL" - - // envDatabaseURL is the environment variable that sets NoDrop. - envNoDrop = "TELEPORT_TEST_BACKEND_POSTGRES_NODROP" - - // envDatabaseURL is the environment variable that sets LogSQL. - envLogSQL = "TELEPORT_TEST_BACKEND_POSTGRES_LOGSQL" -) - -// TestMain attempts to start a CockroachDB server if it is available -// and no DatabaseURL has been set by an environment variable. -func TestMain(m *testing.M) { - initTestConfig() - stopServerFn := maybeStartRoachServer() - code := m.Run() - stopServerFn() - os.Exit(code) -} - -// TestBackend runs the backend test suite for the postgres driver. -func TestBackend(t *testing.T) { - if DatabaseURL == nil { - t.Skip("Postgres backend test suite is disabled. Set TELEPORT_TEST_BACKEND_POSTGRES_URL to enable or ensure the CockroachDB binary is on PATH.") - } else { - t.Logf("NoDrop=%t LogSQL=%t URL=%q", NoDrop, LogSQL, DatabaseURL) - } - - cfg := &Config{} - cfg.Log = logrus.WithFields(logrus.Fields{trace.Component: BackendName}) - cfg.Addr = "-" - cfg.TLS.CAFile = "-" - cfg.TLS.ClientKeyFile = "-" - cfg.TLS.ClientCertFile = "-" - require.NoError(t, cfg.CheckAndSetDefaults()) - - sqlbk.TestDriver(t, &testDriver{ - t: t, - pgDriver: pgDriver{cfg: cfg}, - }) -} - -// TestConfig verifies the storage section of the YAML configuration file -// supports nested sections. -func TestConfig(t *testing.T) { - const tmpl = `--- -storage: - type: postgres - addr: %q - database: %q - tls: - ca_file: %q - client_cert_file: %q - client_key_file: %q` - - expect := &Config{} - expect.Addr = "postgres.example.com:5432" - expect.Database = "teleport" - expect.TLS.CAFile = "postgres.cas" - expect.TLS.ClientCertFile = "root.crt" - expect.TLS.ClientKeyFile = "root.key" - - source := fmt.Sprintf(tmpl, - expect.Addr, - expect.Database, - expect.TLS.CAFile, - expect.TLS.ClientCertFile, - expect.TLS.ClientKeyFile) - - var doc struct { - Storage struct { - Params backend.Params `yaml:",inline"` - } `yaml:"storage"` - } - err := yaml.UnmarshalStrict([]byte(source), &doc) - require.NoError(t, err) - - doc.Storage.Params.Cleanse() - - var cfg *Config - err = utils.ObjectToStruct(doc.Storage.Params, &cfg) - require.NoError(t, err) - require.Equal(t, expect, cfg) -} - -// TestDriverURL verifies the correct connection string URL -// is created from a Config. -func TestDriverURL(t *testing.T) { - driver := pgDriver{cfg: &Config{}} - driver.cfg.Addr = "host:123" - driver.cfg.Database = "database" - driver.cfg.TLS.CAFile = "cafile" - driver.cfg.TLS.ClientCertFile = "certfile" - driver.cfg.TLS.ClientKeyFile = "keyfile" - - expect, err := url.Parse("postgres://host:123/database?sslmode=verify-full&sslrootcert=cafile&sslcert=certfile&sslkey=keyfile") - require.NoError(t, err) - expectQuery := expect.Query() - expect.RawQuery = "" - - got := driver.url() - gotQuery := got.Query() - got.RawQuery = "" - - require.Equal(t, expect, got) - require.Equal(t, expectQuery, gotQuery) -} - -func TestValidateDatabaseName(t *testing.T) { - testCases := []struct { - valid bool - name string - }{ - {valid: true, name: "a"}, - {valid: true, name: "A"}, - {valid: true, name: "_"}, - {valid: true, name: "aa"}, - {valid: true, name: "aA"}, - {valid: true, name: "a_"}, - {valid: true, name: "a$"}, - {valid: false, name: "0"}, - {valid: false, name: "0a"}, - {valid: false, name: "$a"}, - {valid: false, name: "a*"}, - {valid: false, name: "a%"}, - {valid: false, name: "a;"}, - {valid: false, name: "; drop database postgres;"}, - {valid: false, name: "This_table_name_is_one_more_byte_than_the_63_byte_maximum_limit"}, - {valid: true, name: "This_table_name_is_exactly_the_63_byte_maximum_limit__________"}, - } - for i, test := range testCases { - err := validateDatabaseName(test.name) - require.True(t, test.valid == (err == nil), "Test case %d: %q", i, test.name) - } -} - -// testDriver wraps pgDriver with a new Open method that creates a test database -// and applies test configurations. -type testDriver struct { - pgDriver - t *testing.T -} - -// Open the test database. -func (d *testDriver) Open(ctx context.Context) (sqlbk.DB, error) { - t := d.t - - // Verify test URL. - require.NotNil(t, DatabaseURL) - require.Greaterf(t, len(DatabaseURL.Path), 1, DatabaseURL.Path) - require.Equal(t, byte('/'), DatabaseURL.Path[0]) - dbName := DatabaseURL.Path[1:] - - // Connect to the postgres database to create the test database. Create a - // connection string for the postgres database by copying DatabaseURL and - // changing the path (database). Leave the connection open to delete the - // test database after the test suite completes. - pgURL := *DatabaseURL - pgURL.Path = "/postgres" - pgConn, err := pgx.Connect(ctx, pgURL.String()) - require.NoError(t, err) - t.Cleanup(func() { pgConn.Close(ctx) }) - - // Make sure the test database does not alread exist. - dbExists, err := databaseExists(ctx, pgConn, dbName) - require.NoError(t, err) - require.False(t, dbExists, "Database %v already exists. Tests will not use an existing database.", dbName) - - if LogSQL { - d.sqlLogger = maybeSQLLogger(t) - } - if !NoDrop { - t.Cleanup(func() { - _, err := pgConn.Exec(ctx, fmt.Sprintf("DROP DATABASE %v", dbName)) - require.NoError(t, err, "Failed to drop %v database", dbName) - }) - } - - return d.open(ctx, DatabaseURL) -} - -// maybeSQLLogger returns a new logger when log levels are supported (logrus -// and pgx have different log levels). -func maybeSQLLogger(t *testing.T) pgx.Logger { - level := logrus.GetLevel() - if level >= logrus.DebugLevel { - return &pgxLogger{level: logrus.DebugLevel} - } else if level == logrus.InfoLevel { - return &pgxLogger{level: logrus.InfoLevel} - } - t.Logf("SQL logging is disabled. Logging level must be greater than 'info' but is set to %q", level) - return nil -} - -// maybeStartRoachServer will attempt to search a single-node CockroachDB -// server if the test URL is empty. The returned function should be called after -// all tests have executed to stop the server. -func maybeStartRoachServer() (stopServerFn func()) { - if DatabaseURL != nil { - return func() {} - } - - // Don't start server unless executing TestBackend. - // Or, don't start when -bench flag exists or -run != TestBackend. - for _, arg := range os.Args[1:] { - if strings.HasPrefix(arg, "-test.bench=") { - return func() {} - } - if strings.HasPrefix(arg, "-test.run=") { - if strings.HasSuffix(arg, "=TestBackend") { - break - } - return func() {} - } - } - - var roach roachServer - err := roach.Start() - if err != nil { - if !trace.IsNotFound(err) { - logrus.Warnf("Failed to start cockroach test server: %v", err) - } - return func() {} - } - return func() { <-roach.Stop() } -} - -// roachServer wraps a CockroachDB subprocess. -type roachServer struct { - Stop func() <-chan struct{} -} - -// Start a CockroachDB single-node server for testing. It returns a NotFound -// error if the cockroach executable is not in PATH. Stop field is set when -// a non-nil error is returned. -func (r *roachServer) Start() error { - cockroachPath, err := exec.LookPath("cockroach") - if err != nil { - return trace.NotFound("cockroach executable not found") - } - - // Create io.Writer that will read log messages from the cockroach - // server to determine when it is ready to accept connections and - // extract the client connection string (we need the port). - started := false - startErr := make(chan error) - writer := &peekWriter{ - Writer: io.Discard, // Change to os.Stdout to see log messages. - Peek: func(b []byte) { - if started { - return - } - // I220310 16:23:02.762587 11 1@cli/start.go:759 [-] 83 node startup completed: - // ... - // I220310 16:23:02.762587 11 1@cli/start.go:759 [-] 83 +sql: postgresql://root@name.local:26257/defaultdb?sslmode=disable - if !bytes.Contains(b, []byte("node startup completed:")) { - return - } - const left = " +sql: " - const right = "sslmode=disable" - i := bytes.Index(b, []byte(left)) - if i == -1 { - return - } - j := bytes.Index(b[i:], []byte(right)) - if j == -1 { - return - } - connStr := string(bytes.TrimSpace(b[i+len(left) : i+j+len(right)])) - u, err := url.Parse(connStr) - if err != nil { - err = trace.BadParameter("failed to parse client connection string for CockroachDB %q: %v", connStr, err) - } - DatabaseURL = u - DatabaseURL.Path = "/teleport" - startErr <- err - started = true - }, - } - - logrus.Info("Starting CockroachDB in-memory server") - cmd := exec.Command( - cockroachPath, - "start-single-node", - "--insecure", - "--store=type=mem,size=1G", // Size must be greater than 640 MiB - "--listen-addr=localhost:0") - cmd.Stderr = writer - cmd.Stdout = io.Discard - err = cmd.Start() - if err != nil { - return trace.Wrap(err) - } - - shutdownComplete := make(chan struct{}) - r.Stop = func() <-chan struct{} { - cmd.Process.Signal(os.Interrupt) - return shutdownComplete - } - - go func() { - cmd.Wait() - r.cleanup() - close(shutdownComplete) - }() - - // Wait for cockroach server to be ready for connections. - select { - case err = <-startErr: - case <-time.After(time.Second * 5): - return trace.LimitExceeded("Timeout waiting for the CockroachDB server to accept connections.") - } - return err -} - -// cleanup removes empty directories cockroach leaves behind. -func (r *roachServer) cleanup() { - wd, err := os.Getwd() - if err != nil { - logrus.Error(err) - } - for _, dir := range []string{"goroutine_dump", "inflight_trace_dump", "heap_profiler"} { - err = os.RemoveAll(path.Join(wd, dir)) - if err != nil { - logrus.Error(err) - } - } -} - -// peekWriter wraps an io.Writer and calls peek on each write. -type peekWriter struct { - Writer io.Writer - Peek func([]byte) -} - -// Write implements io.Writer. -func (s *peekWriter) Write(b []byte) (n int, err error) { - s.Peek(b) - return s.Writer.Write(b) -} - -// initTestConfig sets configuration variables based on environment variable -// settings. -func initTestConfig() { - NoDrop = os.Getenv(envNoDrop) == "y" - LogSQL = os.Getenv(envLogSQL) == "y" - - // init DatabaseURL - if envURL := os.Getenv(envDatabaseURL); envURL != "" { - u, err := url.Parse(envURL) - if err != nil { - logrus.Errorf("Failed to parse %v=%q: %v", envDatabaseURL, envURL, err) - } - DatabaseURL = u - } -} - -var ( - _ sqlbk.Driver = (*pgDriver)(nil) - _ sqlbk.DB = (*pgDB)(nil) - _ sqlbk.Tx = (*pgTx)(nil) -) diff --git a/lib/backend/postgres/doc.go b/lib/backend/postgres/doc.go deleted file mode 100644 index e4258e6afb762..0000000000000 --- a/lib/backend/postgres/doc.go +++ /dev/null @@ -1,49 +0,0 @@ -/* -Copyright 2022 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -/* -Package postgres implements a SQL backend for PostgreSQL and CockroachDB. - -# Schema - -The database schema consists of three tables: item, lease, and event. - - ┌──────────┐ ┌──────────┐ ┌──────────┐ - │ item │ │ lease │ │ event │ - ├──────────┤ ├──────────┤ ├──────────┤ - │* key │ │* key │ │* eventid │ - │* id │ │ id │ │ created │ - │ value │ │ expires │ │ key │ - │ │ │ │ │ id │ - │ │ │ │ │ type │ - └──────────┘ └──────────┘ └──────────┘ - -The item table contains the backend item's value and is insert-only. The table -supports multiple items per key. Updates to an item's value creates a new -record with an ID greater than the most recent record. - -The lease table contains the backend item's active record, which may have already -expired. Active leases have a null expires value or expires is greater than the -current time. - -The event table contains events for all changes to backend items and is keyed by an -autoincrementing integer (may not be a sequence/will contain gaps). The event's -type represents the value of types.OpType. - -The design allows for items to be updated before an event for previous item has -been emitted without duplicating storage for value. -*/ -package postgres diff --git a/lib/backend/postgres/driver.go b/lib/backend/postgres/driver.go deleted file mode 100644 index 150e556513738..0000000000000 --- a/lib/backend/postgres/driver.go +++ /dev/null @@ -1,264 +0,0 @@ -/* -Copyright 2022 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package postgres - -import ( - "context" - "crypto/tls" - "crypto/x509" - "database/sql" - "errors" - "fmt" - "net/url" - "time" - - "github.com/gravitational/teleport/lib/backend/sqlbk" - "github.com/gravitational/trace" - "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/stdlib" -) - -// pgDriver implements backend.Driver for a PostgreSQL or CockroachDB database. -type pgDriver struct { - cfg *Config - sqlLogger pgx.Logger // testing only -} - -// BackendName returns the name of the backend that created the driver. -func (d *pgDriver) BackendName() string { - return BackendName -} - -// Config returns the SQL backend configuration. -func (d *pgDriver) Config() *sqlbk.Config { - return &d.cfg.Config -} - -// Open the database. The returned DB is a *pgDB instance. -func (d *pgDriver) Open(ctx context.Context) (sqlbk.DB, error) { - return d.open(ctx, d.url()) -} - -// open the database by connecting to a URL. An error is returned when the URL -// has an invalid configuration or connecting to the database fails. -func (d *pgDriver) open(ctx context.Context, u *url.URL) (sqlbk.DB, error) { - connConfig, err := pgx.ParseConfig(u.String()) - if err != nil { - return nil, trace.Wrap(err) - } - connConfig.Logger = d.sqlLogger - - // extract the user from the first client certificate in TLSConfig. - if connConfig.TLSConfig != nil { - connConfig.User, err = tlsConfigUser(connConfig.TLSConfig) - if err != nil { - return nil, trace.Wrap(err) - } - if connConfig.User == "" { - return nil, trace.BadParameter("storage backend certificate CommonName field is blank; database username is required") - } - } - - // Attempt to create backend database if it does not exist. - err = d.maybeCreateDatabase(ctx, connConfig) - if err != nil { - return nil, trace.Wrap(err) - } - - // Open connection/pool for backend database. - db, err := sql.Open("pgx", stdlib.RegisterConnConfig(connConfig)) - if err != nil { - return nil, trace.Wrap(err) - } - - // Configure the connection pool. - db.SetConnMaxIdleTime(d.cfg.ConnMaxIdleTime) - db.SetConnMaxLifetime(d.cfg.ConnMaxLifetime) - db.SetMaxIdleConns(d.cfg.MaxIdleConns) - db.SetMaxOpenConns(d.cfg.MaxOpenConns) - - pgdb := &pgDB{ - DB: db, - pgDriver: d, - readOnlyOpts: &sql.TxOptions{ReadOnly: true}, - readWriteOpts: &sql.TxOptions{}, - } - - err = pgdb.migrate(ctx) - if err != nil { - return nil, trace.Wrap(err) - } - - return pgdb, nil -} - -// maybeCreateDatabase creates the backend database if it does not exist. -func (d *pgDriver) maybeCreateDatabase(ctx context.Context, connConfig *pgx.ConnConfig) error { - // Verify the database name is valid to prevent SQL injection. This - // should've already been done in CheckAndSetDefaults of the Config, - // but check again to be sure. - err := validateDatabaseName(connConfig.Database) - if err != nil { - return trace.Wrap(err) - } - - // Copy config and connect to postgres database instead. - pgConnConfig := *connConfig - pgConnConfig.Database = "postgres" - pgConn, err := pgx.ConnectConfig(ctx, &pgConnConfig) - if err != nil { - return trace.BadParameter("failed to verify %q database exists: %v", connConfig.Database, err) - } - defer func() { - if err := pgConn.Close(ctx); err != nil { - d.cfg.Log.Errorf("Failed to close connection to postgres database: %v", err) - } - }() - - // Create database if it doesn't exist. - exists, err := databaseExists(ctx, pgConn, connConfig.Database) - if err != nil { - return trace.BadParameter("failed to verify %q database exists: %v", connConfig.Database, err) - } - if !exists { - _, err = pgConn.Exec(ctx, fmt.Sprintf("CREATE DATABASE %v", connConfig.Database)) - if err != nil { - return trace.BadParameter("failed to create %q database: %v", connConfig.Database, err) - } - d.cfg.Log.Infof("Created storage database %q", connConfig.Database) - } - - return nil -} - -// url returns a connection string URL created from pgDriver's config. -func (d *pgDriver) url() *url.URL { - u := url.URL{ - Scheme: "postgres", - Host: d.cfg.Addr, - Path: "/" + d.cfg.Database, - } - q := u.Query() - q.Set("sslmode", "verify-full") - q.Set("sslrootcert", d.cfg.TLS.CAFile) - q.Set("sslcert", d.cfg.TLS.ClientCertFile) - q.Set("sslkey", d.cfg.TLS.ClientKeyFile) - u.RawQuery = q.Encode() - return &u -} - -// databaseExists returns true if a database exits with a specific name. -func databaseExists(ctx context.Context, pgConn *pgx.Conn, dbName string) (exists bool, err error) { - const query = "SELECT EXISTS (SELECT 1 FROM pg_database WHERE datname=$1)" - err = pgConn.QueryRow(ctx, query, dbName).Scan(&exists) - if err != nil { - return false, trace.Wrap(err) - } - return exists, nil -} - -// pgDB implements sqlbk.DB. It is returned from pgDriver.open. -type pgDB struct { - *sql.DB - *pgDriver - readOnlyOpts *sql.TxOptions - readWriteOpts *sql.TxOptions -} - -// Begin a read/write transaction. -func (db *pgDB) Begin(ctx context.Context) sqlbk.Tx { - return db.begin(ctx, db.readWriteOpts) -} - -// ReadOnly begins a read-only transaction. Calling a mutating Tx method -// will result in a failed transaction. -func (db *pgDB) ReadOnly(ctx context.Context) sqlbk.Tx { - return db.begin(ctx, db.readOnlyOpts) -} - -// begin a transaction with options (read/write or read-only). -func (db *pgDB) begin(ctx context.Context, opts *sql.TxOptions) *pgTx { - tx, err := db.DB.BeginTx(ctx, opts) - return &pgTx{ - opts: opts, - pgDB: db, - sqlTx: tx, - ctx: ctx, - err: convertError(err), - } -} - -// sqlNullTime converts a time to a nullable sql time, which is required when -// passing time parameters for nullable SQL database columns such as expires. -func sqlNullTime(t time.Time) sql.NullTime { - if t.IsZero() { - return sql.NullTime{} - } - return sql.NullTime{Time: t, Valid: true} -} - -// newID returns a new backend item ID. A backend item's ID is unique per key. -// -// It returns the current UnixNano time. A clockwork.Clock is not used here -// because it would not be unique for tests using a fake clock. The number -// returned can be anything that has a high probability of being unique per key -// and is incremental. -func newID() int64 { - return time.Now().UnixNano() -} - -// convertError to a trace.Error. -func convertError(err error) error { - if err == nil { - return nil - } - if errors.Is(err, sql.ErrNoRows) { - return trace.Wrap(sqlbk.ErrNotFound) - } - if pgErr, ok := err.(*pgconn.PgError); ok { - switch pgErr.Code { - case errCodeUniqueConstraint: - return trace.Wrap(sqlbk.ErrAlreadyExists) - case errCodeNotSerializable: - return trace.Wrap(sqlbk.ErrRetry) - } - } - return trace.Wrap(err) -} - -// tlsConfigUser returns the user defined in the CommonName field of the first -// client certificate in tlsConfig. -func tlsConfigUser(tlsConfig *tls.Config) (user string, err error) { - if tlsConfig == nil || len(tlsConfig.Certificates) == 0 || len(tlsConfig.Certificates[0].Certificate) == 0 { - return "", trace.BadParameter("unable to extract user from TLS Config") - } - cert, err := x509.ParseCertificate(tlsConfig.Certificates[0].Certificate[0]) - if err != nil { - return "", trace.Wrap(err) - } - return cert.Subject.CommonName, nil -} - -const ( - // errCodeUniqueConstraint means a duplicate key value violated a unique constraint. - errCodeUniqueConstraint = "23505" - - // errCodeNotSerializable means the server could not serialize access due to - // read/write dependencies among transactions. - errCodeNotSerializable = "40001" -) diff --git a/lib/backend/postgres/migrate.go b/lib/backend/postgres/migrate.go deleted file mode 100644 index 5dda06ac19f18..0000000000000 --- a/lib/backend/postgres/migrate.go +++ /dev/null @@ -1,171 +0,0 @@ -/* -Copyright 2018-2022 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package postgres - -import ( - "context" - "database/sql" - "errors" - "fmt" - - "github.com/gravitational/trace" -) - -// migrate the database to the most recent schema version. -func (db *pgDB) migrate(ctx context.Context) error { - tx := db.begin(ctx, db.readWriteOpts) - - tx.setTxIsolationLevel(serializableTxIsolationLevel) - if !tx.migrateTableExists() { - tx.createMigrateTable() - } - - dbVersion := tx.getSchemaVersion() - if tx.err != nil || dbVersion == schemaVersion { - return tx.err - } - - // Can't migrate backwards. - if dbVersion > schemaVersion { - tx.rollback(trace.BadParameter("cannot migrate from database schema version %v to %v", dbVersion, schemaVersion)) - return tx.err - } - - // Prepare statement to insert migration records for each new version. - const query = `INSERT INTO migrate (version) VALUES ($1)` - insertVersionStmt, err := tx.sqlTx.PrepareContext(tx.ctx, query) - if tx.rollback(err) { - return tx.err - } - defer insertVersionStmt.Close() - - // Migrate from dbVersion to schemaVersion. - for version := dbVersion + 1; version <= schemaVersion; version++ { - _, err = tx.sqlTx.ExecContext(tx.ctx, getMigration(version)) - if tx.rollback(err) { - return tx.err - } - _, err = insertVersionStmt.ExecContext(tx.ctx, version) - if tx.rollback(err) { - return tx.err - } - } - - return tx.Commit() -} - -// setTxIsolationLevel sets the current database's target isolation level to targetLevel. -func (tx *pgTx) setTxIsolationLevel(targetLevel string) { - if tx.err != nil { - return - } - - // Query for database's default isolation level. - const levelQuery = `SELECT setting FROM pg_settings WHERE name='default_transaction_isolation'` - var level string - err := tx.sqlTx.QueryRowContext(tx.ctx, levelQuery).Scan(&level) - if tx.rollback(err) { - return - } - - // Return if we're already set to serializable. - if level == targetLevel { - return - } - - // Query for the database name (needed for the next alter db call). We - // could've passed the name in from the backend's configuration, but we - // only need the name once when initializing a new db. - var dbName string - err = tx.sqlTx.QueryRowContext(tx.ctx, "SELECT current_catalog").Scan(&dbName) - if tx.rollback(err) { - return - } - - // Change default isolation level for the database. - const alterQuery = "ALTER DATABASE %s SET DEFAULT_TRANSACTION_ISOLATION TO '%s'" - _, err = tx.sqlTx.ExecContext(tx.ctx, fmt.Sprintf(alterQuery, dbName, targetLevel)) - if tx.rollback(err) { - return - } - - // Change isolation level for the current connection. Changing the database - // level doesn't change the level for the current connection. This connection - // is part of a pool, so it may be reused again. This connection should be - // the only one in the pool at this point. - const setQuery = "SET DEFAULT_TRANSACTION_ISOLATION = '%s'" - _, err = tx.sqlTx.ExecContext(tx.ctx, fmt.Sprintf(setQuery, targetLevel)) - tx.rollback(err) -} - -// migrateTableExists returns true if the migrate table exists. -// It returns false if the transaction in in an error state. -func (tx *pgTx) migrateTableExists() bool { - if tx.err != nil { - return false - } - - // Using EXISTS always returns a boolean result. - const query = ` - SELECT EXISTS ( - SELECT 1 FROM information_schema.tables - WHERE table_schema='public' - AND table_name='migrate' - )` - - var exists bool - err := tx.sqlTx.QueryRowContext(tx.ctx, query).Scan(&exists) - if tx.rollback(err) { - return false - } - return exists -} - -// createMigrateTable creates a new migrate table. -func (tx *pgTx) createMigrateTable() { - if tx.err != nil { - return - } - - const query = ` - CREATE TABLE migrate ( - version INT PRIMARY KEY NOT NULL, - created TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP - )` - - _, err := tx.sqlTx.ExecContext(tx.ctx, query) - tx.rollback(err) -} - -// getSchemaVersion returns the database's version of the schema. -// Zero is returned if there are no version records. -func (tx *pgTx) getSchemaVersion() int { - if tx.err != nil { - return 0 - } - - const query = `SELECT version FROM migrate ORDER BY version DESC LIMIT 1` - var version int - err := tx.sqlTx.QueryRowContext(tx.ctx, query).Scan(&version) - if err != nil && !errors.Is(err, sql.ErrNoRows) { - tx.rollback(err) - return 0 - } - return version -} - -const serializableTxIsolationLevel = "serializable" diff --git a/lib/backend/postgres/schema.go b/lib/backend/postgres/schema.go deleted file mode 100644 index 246bb2c700a67..0000000000000 --- a/lib/backend/postgres/schema.go +++ /dev/null @@ -1,66 +0,0 @@ -/* -Copyright 2018-2022 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package postgres - -import "fmt" - -// schemaVersion defines the current schema version. -// Increment this value when adding a new migration. -const schemaVersion = 1 - -// getMigration returns migration SQL for a schema version. -func getMigration(version int) string { - switch version { - case 1: - return migrateV1 - // case 2: - // return migrateV2 - } - panic(fmt.Sprintf("migration version not implemented: %v", version)) -} - -// migrateV1 is the baseline schema. -// -// Keys are stored as BYTEA to avoid collation ordering. -// When debugging, convert the key to a readable value using: -// -// SELECT encode(key, 'escape') FROM lease; -const migrateV1 = ` - CREATE TABLE item ( - key BYTEA NOT NULL, - id BIGINT NOT NULL, - value BYTEA NOT NULL, - CONSTRAINT item_pk PRIMARY KEY (key,id) - ); - - CREATE TABLE lease ( - key BYTEA NOT NULL, - id BIGINT NOT NULL, - expires TIMESTAMPTZ, - CONSTRAINT lease_pk PRIMARY KEY (key) - ); - CREATE INDEX lease_expires ON lease (expires); - - CREATE TABLE event ( - eventid BIGSERIAL NOT NULL, - created TIMESTAMPTZ NOT NULL, - key BYTEA NOT NULL, - id BIGINT NOT NULL, - type SMALLINT NOT NULL, - CONSTRAINT event_pk PRIMARY KEY (eventid) - ); -` diff --git a/lib/backend/postgres/tx.go b/lib/backend/postgres/tx.go deleted file mode 100644 index 8bda0e188f83e..0000000000000 --- a/lib/backend/postgres/tx.go +++ /dev/null @@ -1,485 +0,0 @@ -/* -Copyright 2022 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package postgres - -import ( - "context" - "database/sql" - "errors" - "time" - - "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/lib/backend" - "github.com/gravitational/teleport/lib/backend/sqlbk" - "github.com/gravitational/trace" - "github.com/jackc/pgx/v4" - "github.com/sirupsen/logrus" -) - -// pgTx implements sqlbk.Tx for postgres. -type pgTx struct { - *pgDB - opts *sql.TxOptions - sqlTx *sql.Tx - ctx context.Context - err error // must be set by calling rollback() -} - -// Err returns a transaction error. An error does not change once the -// transaction is in an error state. Calling other Tx methods has no effect -// on the state of the transaction. -func (tx *pgTx) Err() error { - return tx.err -} - -// Commit the transaction. The same error returned from the Err method is -// returned from Commit when the transaction is in an error state. -func (tx *pgTx) Commit() error { - if tx.err != nil { - return tx.err - } - tx.err = convertError(tx.sqlTx.Commit()) - return tx.err -} - -// Rollback the transaction with an error. The error passed to Rollback is -// converted to a trace error and set as the transaction error returned from -// Err. If the transaction is already in an error state, the error is -// overridden by the error passed. Passing a nil error is considered a bug, -// but the rollback will continue with a generated error if the transaction -// is not already in an error state. -func (tx *pgTx) Rollback(err error) error { - switch { - case err != nil && tx.err == nil: - // Standard path. Rollback with error. - tx.rollback(err) - - case err != nil && tx.err != nil: - // Tx in failed state. Override error (e.g. CompareAndSwap). - tx.err = err - - case err == nil && tx.err == nil: - // Tx OK. Rollback called with nil err. Rollback since that was the intent. - tx.rollback(trace.BadParameter("[BUG] Rollback called with nil error.")) - tx.cfg.Log.Debug(tx.err) - - case err == nil && tx.err != nil: - // Tx in failed state. Rollback called with nil error. Override error to signal issue. - tx.err = trace.BadParameter("[BUG] Rollback called with nil error on failed transaction.") - tx.cfg.Log.Debug(tx.err) - } - return trace.Wrap(tx.err) -} - -// rollback and return true when err is not nil. -// -// This method is a control-flow helper for the other pgTx methods. Accepting -// nil errors and returning true or false improves readability in the same -// manner as trace.Wrap. Its name is purposefully short and discreet. -func (tx *pgTx) rollback(err error) bool { - if err == nil { - return false - } - if e := tx.sqlTx.Rollback(); e != nil { - tx.cfg.Log.WithError(err).Errorf("Failed to rollback: %v", e) - } - tx.err = convertError(err) - return true -} - -// DeleteEvents created before expiryTime. -func (tx *pgTx) DeleteEvents(expiryTime time.Time) { - if tx.err != nil { - return - } - - const query = `DELETE FROM event WHERE created < $1` - _, err := tx.sqlTx.ExecContext(tx.ctx, query, expiryTime) - tx.rollback(err) -} - -// DeleteExpiredLeases removes leases whose expires column is not null and is -// less than the current time. -func (tx *pgTx) DeleteExpiredLeases() { - if tx.err != nil { - return - } - - const query = `DELETE FROM lease WHERE (expires IS NOT NULL AND expires < $1)` - _, err := tx.sqlTx.ExecContext(tx.ctx, query, tx.now()) - tx.rollback(err) -} - -// DeleteItems not referencing an event or a valid lease. -func (tx *pgTx) DeleteItems() { - if tx.err != nil { - return - } - - const query = ` - DELETE FROM item WHERE (key, id) IN ( - SELECT key, id - FROM item - LEFT JOIN lease USING (key, id) - LEFT JOIN event USING (key, id) - WHERE event.key IS NULL - AND (lease.key IS NULL OR lease.expires < $1) - )` - _, err := tx.sqlTx.ExecContext(tx.ctx, query, tx.now()) - tx.rollback(err) -} - -// DeleteLease by key returning the backend item ID from the deleted lease. -// Zero is returned when the delete fails. -func (tx *pgTx) DeleteLease(key []byte) int64 { - if tx.err != nil { - return 0 - } - - var id int64 - const query = `DELETE FROM lease WHERE key=$1 RETURNING id` - err := tx.sqlTx.QueryRowContext(tx.ctx, query, key).Scan(&id) - if tx.rollback(err) { - return 0 - } - return id -} - -// DeleteLeaseRange removes all leases inclusively between startKey -// and endKey. It returns the set of backend items deleted. The returned -// items include only Key and ID. -func (tx *pgTx) DeleteLeaseRange(startKey, endKey []byte) []backend.Item { - if tx.err != nil { - return nil - } - - const query = `DELETE FROM lease WHERE key >= $1 AND key <= $2 RETURNING key, id` - rows, err := tx.sqlTx.QueryContext(tx.ctx, query, startKey, endKey) - if tx.rollback(err) { - return nil - } - - var items []backend.Item - for rows.Next() { - var item backend.Item - err = rows.Scan(&item.Key, &item.ID) - if tx.rollback(err) { - return nil - } - items = append(items, item) - } - if tx.rollback(rows.Err()) { - return nil - } - - return items -} - -// GetEvents returns an ordered set of events up to limit whose ID is -// greater than fromEventID. -func (tx *pgTx) GetEvents(fromEventID int64, limit int) sqlbk.Events { - events := sqlbk.Events{LastID: fromEventID} - if tx.err != nil { - return events - } - - const query = ` - SELECT event.eventid, event.key, event.id, event.type, item.value - FROM event JOIN item USING (key, id) - WHERE event.eventid > $1 - ORDER BY event.eventid LIMIT $2` - rows, err := tx.sqlTx.QueryContext(tx.ctx, query, fromEventID, limit) - if tx.rollback(err) { - return events - } - - lastEventID := fromEventID - var backendEvents []backend.Event - for rows.Next() { - var event backend.Event - err = rows.Scan(&lastEventID, &event.Item.Key, &event.Item.ID, &event.Type, &event.Item.Value) - if tx.rollback(err) { - return events - } - backendEvents = append(backendEvents, event) - } - if tx.rollback(rows.Err()) { - return events - } - - events.LastID = lastEventID - events.BackendEvents = backendEvents - if len(events.BackendEvents) == limit { - events.Remaining = tx.getEventsRemaining(lastEventID) - } - - return events -} - -// getEventsRemaining returns the number of events that exist whose ID is -// greater than fromEventID. -func (tx *pgTx) getEventsRemaining(fromEventID int64) int { - if tx.err != nil { - return 0 - } - - const query = `SELECT count(*) FROM event WHERE event.eventid > $1` - var eventsRemaining int - err := tx.sqlTx.QueryRowContext(tx.ctx, query, fromEventID).Scan(&eventsRemaining) - if tx.rollback(err) { - return 0 - } - return eventsRemaining -} - -// GetExpiredLeases returns all leases whose expires field is less than -// or equal to the current time. -func (tx *pgTx) GetExpiredLeases() []backend.Lease { - if tx.err != nil { - return nil - } - - const query = `SELECT key, id FROM lease WHERE expires <= $1` - rows, err := tx.sqlTx.QueryContext(tx.ctx, query, tx.now()) - if tx.rollback(err) { - return nil - } - - var leases []backend.Lease - for rows.Next() { - var lease backend.Lease - err = rows.Scan(&lease.Key, &lease.ID) - if tx.rollback(err) { - return nil - } - leases = append(leases, lease) - } - if tx.rollback(rows.Err()) { - return nil - } - - return leases -} - -// GetItem by key. Nil is returned if the item has expired. -func (tx *pgTx) GetItem(key []byte) *backend.Item { - if tx.err != nil { - return nil - } - - const query = ` - SELECT item.id, lease.expires, item.value - FROM item JOIN lease USING (key, id) - WHERE item.key = $1 - AND (lease.expires IS NULL OR lease.expires > $2)` - - var expires sql.NullTime - item := &backend.Item{Key: key} - row := tx.sqlTx.QueryRowContext(tx.ctx, query, key, tx.now()) - err := row.Scan(&item.ID, &expires, &item.Value) - if tx.rollback(err) { - return nil - } - if expires.Valid { - item.Expires = expires.Time - } - return item -} - -// GetItemRange returns a set of backend items whose key is inclusively between -// startKey and endKey. The returned items are ordered by key, will not exceed -// limit, and does not include expired items. -func (tx *pgTx) GetItemRange(startKey, endKey []byte, limit int) []backend.Item { - if tx.err != nil { - return nil - } - - const query = ` - SELECT lease.key, lease.id, lease.expires, item.value - FROM lease JOIN item USING (key, id) - WHERE (lease.key >= $1 and lease.key <= $2) - AND (lease.expires IS NULL OR lease.expires > $3) - ORDER BY lease.key LIMIT $4` - - rows, err := tx.sqlTx.QueryContext(tx.ctx, query, startKey, endKey, tx.now(), limit) - if tx.rollback(err) { - return nil - } - - var items []backend.Item - for rows.Next() { - var item backend.Item - var expires sql.NullTime - err = rows.Scan(&item.Key, &item.ID, &expires, &item.Value) - if tx.rollback(err) { - return nil - } - if expires.Valid { - item.Expires = expires.Time - } - items = append(items, item) - } - if tx.rollback(rows.Err()) { - return nil - } - - return items -} - -// GetItemValue returns an item's value by key if the item has not expired. -func (tx *pgTx) GetItemValue(key []byte) []byte { - if tx.err != nil { - return nil - } - - const query = ` - SELECT item.value - FROM lease JOIN item USING (key, id) - WHERE lease.key = $1 - AND (lease.expires IS NULL OR lease.expires > $2)` - - var value []byte - err := tx.sqlTx.QueryRowContext(tx.ctx, query, key, tx.now()).Scan(&value) - if tx.rollback(err) { - return nil - } - return value -} - -// GetLastEventID returns the most recent eventid. Zero is returned when the -// event table is empty. -func (tx *pgTx) GetLastEventID() int64 { - if tx.err != nil { - return 0 - } - - const query = `SELECT eventid FROM event ORDER BY eventid DESC LIMIT 1` - - var eventID int64 - err := tx.sqlTx.QueryRowContext(tx.ctx, query).Scan(&eventID) - if err != nil { - if err = convertError(err); !errors.Is(err, sqlbk.ErrNotFound) { - tx.rollback(err) - return 0 - } - } - return eventID -} - -// InsertEvent for backend item with evenType. -func (tx *pgTx) InsertEvent(eventType types.OpType, item backend.Item) { - if tx.err != nil { - return - } - const query = `INSERT INTO event (created, key, id, type) VALUES ($1,$2,$3,$4)` - _, err := tx.sqlTx.ExecContext(tx.ctx, query, tx.now(), item.Key, item.ID, eventType) - tx.rollback(err) -} - -// InsertItem creates a new backend item ID, inserts the item, and returns the -// new ID. The transaction will be set to an ErrRetry failed state if the ID -// generated is already taken, which can happen when multiple transactions -// are attempting to add the same item (the test suite's concurrent test -// produces this scenario). -func (tx *pgTx) InsertItem(item backend.Item) int64 { - if tx.err != nil { - return 0 - } - - item.ID = newID() - const query = `INSERT INTO item (key, id, value) VALUES ($1,$2,$3)` - _, err := tx.sqlTx.ExecContext(tx.ctx, query, item.Key, item.ID, item.Value) - if tx.rollback(err) && errors.Is(tx.err, sqlbk.ErrAlreadyExists) { - tx.err = sqlbk.ErrRetry - } - return item.ID -} - -// LeaseExists returns true if a lease exists for key that has not expired. -func (tx *pgTx) LeaseExists(key []byte) bool { - if tx.err != nil { - return false - } - - // Using EXISTS means we don't have to check for no rows. - const query = ` - SELECT EXISTS ( - SELECT 1 FROM lease - WHERE key = $1 - AND (expires IS NULL OR expires > $2) - )` - - var exists bool - err := tx.sqlTx.QueryRowContext(tx.ctx, query, key, tx.now()).Scan(&exists) - if tx.rollback(err) { - return false - } - return exists -} - -// UpdateLease for backend item. The transaction is set to a NotFound error -// state if the backend item does not exist. -func (tx *pgTx) UpdateLease(item backend.Item) { - if tx.err != nil { - return - } - - const query = `UPDATE lease SET id=$2, expires=$3 WHERE key=$1` - result, err := tx.sqlTx.ExecContext(tx.ctx, query, item.Key, item.ID, sqlNullTime(item.Expires)) - if tx.rollback(err) { - return - } - rowsAffected, err := result.RowsAffected() - if tx.rollback(err) { - return - } - if rowsAffected == 0 { - tx.rollback(sqlbk.ErrNotFound) - } -} - -// UpsertLease creates or updates a backend item. -func (tx *pgTx) UpsertLease(item backend.Item) { - if tx.err != nil { - return - } - - // CockroachDB has UPSERT, but use ON CONFLICT to support PostgreSQL. - const query = ` - INSERT INTO lease (key, id, expires) VALUES ($1,$2,$3) - ON CONFLICT (key) DO UPDATE SET - id=EXCLUDED.id, - expires=EXCLUDED.expires` - - _, err := tx.sqlTx.ExecContext(tx.ctx, query, item.Key, item.ID, sqlNullTime(item.Expires)) - tx.rollback(err) -} - -// now returns the current clock time. -func (tx *pgTx) now() time.Time { - return tx.cfg.Clock.Now() -} - -// pgxLogger implements pgx.Logger to allow tests to log all SQL queries. -type pgxLogger struct { - level logrus.Level -} - -// Log to the logrus standard logger. -func (l *pgxLogger) Log(ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) { - logrus.WithFields(data).Log(l.level, msg) -} diff --git a/lib/backend/sqlbk/backend.go b/lib/backend/sqlbk/backend.go deleted file mode 100644 index 983adb18a0963..0000000000000 --- a/lib/backend/sqlbk/backend.go +++ /dev/null @@ -1,415 +0,0 @@ -/* -Copyright 2022 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sqlbk - -import ( - "bytes" - "context" - "errors" - "sync/atomic" - "time" - - "github.com/gravitational/trace" - - "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/api/utils/retryutils" - "github.com/gravitational/teleport/lib/backend" - - "github.com/jonboulle/clockwork" -) - -// New returns a Backend that uses a driver to communicate with the database. -// A non-nil error means the connection pool is ready and the database has been -// migrated to the most recent version. -func New(ctx context.Context, driver Driver) (*Backend, error) { - bk, err := newWithConfig(ctx, driver, driver.Config()) - if err != nil { - return nil, trace.Wrap(err) - } - err = bk.start(ctx) - if err != nil { - return nil, trace.Wrap(err) - } - return bk, nil -} - -// newWithConfig opens a connection to the database and returns an initialized -// Backend instance. Background processes have not been started. -func newWithConfig(ctx context.Context, driver Driver, cfg *Config) (*Backend, error) { - db, err := driver.Open(ctx) - if err != nil { - return nil, trace.Wrap(err) - } - bk := &Backend{ - Config: cfg, - db: db, - buf: backend.NewCircularBuffer(backend.BufferCapacity(cfg.BufferSize)), - bgDone: make(chan struct{}), - } - bk.closeCtx, bk.closeFn = context.WithCancel(context.Background()) - return bk, nil -} - -// Backend implements a storage backend for SQL databases. -type Backend struct { - *Config - db DB - buf *backend.CircularBuffer - - closed int32 // atomic - closeCtx context.Context - closeFn context.CancelFunc - bgDone chan struct{} -} - -// Close the backend. -func (b *Backend) Close() error { - if !atomic.CompareAndSwapInt32(&b.closed, 0, 1) { - return nil - } - b.closeFn() - select { - case <-b.bgDone: - case <-time.After(time.Second * 10): - } - return trace.NewAggregate(b.buf.Close(), b.db.Close()) -} - -// NewWatcher returns a new event watcher. -func (b *Backend) NewWatcher(ctx context.Context, watch backend.Watch) (backend.Watcher, error) { - return b.buf.NewWatcher(ctx, watch) -} - -// Clock returns the clock used by this backend. -func (b *Backend) Clock() clockwork.Clock { - return b.Config.Clock -} - -// CloseWatchers closes all event watchers without closing the backend. -func (b *Backend) CloseWatchers() { - b.buf.Clear() -} - -// retryTx retries a transaction when it results in an ErrRetry error. -// Failed transactions are more likely to occur when the transaction isolation -// level of the database is serializable. -// -// Callers supply a begin function to create a new transaction, which creates -// either a read/write or read-only transaction. Delays between retries is -// controlled by setting the RetryDelayPeriod configuration variable. The -// amount of time delayed is passed through a jitter algorithm. And the total -// amount of time allocated for retries is defined by RetryTimeout. -// -// Returning an error from txFn will rollback the transaction and stop retries. -func (b *Backend) retryTx(ctx context.Context, begin func(context.Context) Tx, txFn func(tx Tx) error) error { - ctx, cancel := context.WithTimeout(ctx, b.RetryTimeout) - defer cancel() - - var delay *retryutils.Linear - tx := begin(ctx) - for { - if tx.Err() != nil { - return tx.Err() - } - - err := txFn(tx) - switch { - case err != nil: - return tx.Rollback(err) - - case tx.Commit() == nil: - return nil - - case !errors.Is(tx.Err(), ErrRetry): - return tx.Err() - } - - // Retry transaction after delay. - if delay == nil { - retryDelayPeriod := b.RetryDelayPeriod - if retryDelayPeriod == 0 { // sanity check (0 produces an error in NewLinear) - retryDelayPeriod = DefaultRetryDelayPeriod - } - delay, err = retryutils.NewLinear(retryutils.LinearConfig{ - First: retryDelayPeriod, - Step: retryDelayPeriod, - Max: retryDelayPeriod, - Jitter: retryutils.NewJitter(), - }) - if err != nil { - return trace.BadParameter("[BUG] invalid retry delay configuration: %v", err) - } - } - select { - case <-delay.After(): - tx = begin(ctx) - delay.Inc() - - case <-ctx.Done(): - return trace.Wrap(ctx.Err()) - } - } -} - -// Create backend item if it does not exist. A put event is emitted if the item -// is created without error. -func (b *Backend) Create(ctx context.Context, item backend.Item) (*backend.Lease, error) { - if len(item.Key) == 0 { - return nil, trace.BadParameter("missing parameter key") - } - var lease backend.Lease - err := b.retryTx(ctx, b.db.Begin, func(tx Tx) error { - if tx.LeaseExists(item.Key) { - return trace.AlreadyExists("backend item already exists for %v", string(item.Key)) - } - item.ID = tx.InsertItem(item) - tx.UpsertLease(item) - tx.InsertEvent(types.OpPut, item) - lease = newLease(item) - return nil - }) - if err != nil { - return nil, trace.Wrap(err) - } - return &lease, nil -} - -// Put creates or updates a backend item. A put event is emitted if the item is -// created without error. -func (b *Backend) Put(ctx context.Context, item backend.Item) (*backend.Lease, error) { - if len(item.Key) == 0 { - return nil, trace.BadParameter("missing parameter key") - } - var lease backend.Lease - err := b.retryTx(ctx, b.db.Begin, func(tx Tx) error { - item.ID = tx.InsertItem(item) - tx.UpsertLease(item) - tx.InsertEvent(types.OpPut, item) - lease = newLease(item) - return nil - }) - if err != nil { - return nil, trace.Wrap(err) - } - return &lease, nil -} - -// PutRange creates or updates a list of backend items. The batch operation is -// all-or-none. A put event is emitted for each item if the entire batch is successful. -func (b *Backend) PutRange(ctx context.Context, items []backend.Item) error { - if len(items) == 0 { - return nil - } - return b.retryTx(ctx, b.db.Begin, func(tx Tx) error { - for _, item := range items { - item.ID = tx.InsertItem(item) - tx.UpsertLease(item) - tx.InsertEvent(types.OpPut, item) - if tx.Err() != nil { - return nil - } - } - return nil - }) -} - -// CompareAndSwap replaces a backend item if the existing item has an expected -// value. A trace.CompareFailed error is returned when the item does not exist -// or the current item's value is not equal to the expected value. A put event -// is emitted if the operation succeeds without error. -func (b *Backend) CompareAndSwap(ctx context.Context, expected, replaceWith backend.Item) (*backend.Lease, error) { - if len(expected.Key) == 0 { - return nil, trace.BadParameter("missing parameter Key") - } - if len(replaceWith.Key) == 0 { - return nil, trace.BadParameter("missing parameter Key") - } - if !bytes.Equal(expected.Key, replaceWith.Key) { - return nil, trace.BadParameter("expected and replaceWith keys should match") - } - - var lease backend.Lease - err := b.retryTx(ctx, b.db.Begin, func(tx Tx) error { - value := tx.GetItemValue(expected.Key) - if tx.Err() != nil { - if errors.Is(tx.Err(), ErrNotFound) { - return trace.CompareFailed("backend item does not exist for key %q", string(expected.Key)) - } - return nil - } - if !bytes.Equal(value, expected.Value) { - return trace.CompareFailed("current value does not match expected for %v", string(expected.Key)) - } - replaceWith.ID = tx.InsertItem(replaceWith) - tx.UpsertLease(replaceWith) - tx.InsertEvent(types.OpPut, replaceWith) - lease = newLease(replaceWith) - return nil - }) - if err != nil { - return nil, trace.Wrap(err) - } - return &lease, nil -} - -// Update an existing backend item. A put event is emitted if the item is -// updated without error. -func (b *Backend) Update(ctx context.Context, item backend.Item) (*backend.Lease, error) { - if len(item.Key) == 0 { - return nil, trace.BadParameter("missing parameter key") - } - var lease backend.Lease - err := b.retryTx(ctx, b.db.Begin, func(tx Tx) error { - item.ID = tx.InsertItem(item) - tx.UpdateLease(item) - tx.InsertEvent(types.OpPut, item) - lease = newLease(item) - return nil - }) - if err != nil { - if errors.Is(err, ErrNotFound) { - return nil, trace.NotFound("backend item does not exist for key %q", string(item.Key)) - } - return nil, trace.Wrap(err) - } - return &lease, nil -} - -// Get a backend item. -func (b *Backend) Get(ctx context.Context, key []byte) (*backend.Item, error) { - if len(key) == 0 { - return nil, trace.BadParameter("missing parameter key") - } - - var item *backend.Item - err := b.retryTx(ctx, b.db.ReadOnly, func(tx Tx) error { - item = tx.GetItem(key) - return nil - }) - if err != nil { - if errors.Is(err, ErrNotFound) { - return nil, trace.NotFound("backend item does not exist for key %q", string(key)) - } - return nil, trace.Wrap(err) - } - return item, nil -} - -// GetRange returns a list of backend items whose key is inclusively between startKey and endKey. -// DefaultRangeLimit is used when limit is zero. -func (b *Backend) GetRange(ctx context.Context, startKey, endKey []byte, limit int) (*backend.GetResult, error) { - if len(startKey) == 0 { - return nil, trace.BadParameter("missing parameter startKey") - } - if len(endKey) == 0 { - return nil, trace.BadParameter("missing parameter endKey") - } - if limit <= 0 { - limit = backend.DefaultRangeLimit - } - - var items []backend.Item - err := b.retryTx(ctx, b.db.ReadOnly, func(tx Tx) error { - items = tx.GetItemRange(startKey, endKey, limit) - return nil - }) - if err != nil { - if errors.Is(err, ErrNotFound) { - return nil, trace.NotFound("backend items do not exist for key range: %q - %q", string(startKey), string(endKey)) - } - return nil, trace.Wrap(err) - } - return &backend.GetResult{Items: items}, nil -} - -// Delete a backend item. A delete event is emitted if the item existed and -// was deleted without error. -func (b *Backend) Delete(ctx context.Context, key []byte) error { - if len(key) == 0 { - return trace.BadParameter("missing parameter key") - } - - err := b.retryTx(ctx, b.db.Begin, func(tx Tx) error { - id := tx.DeleteLease(key) - tx.InsertEvent(types.OpDelete, backend.Item{Key: key, ID: id}) - return nil - }) - if errors.Is(err, ErrNotFound) { - return trace.NotFound("backend item does not exist for key %q", string(key)) - } - return trace.Wrap(err) -} - -// DeleteRange deletes all backend items whose key is inclusively between -// startKey and endKey. Delete events are emitted for all deleted items. -func (b *Backend) DeleteRange(ctx context.Context, startKey, endKey []byte) error { - if len(startKey) == 0 { - return trace.BadParameter("missing parameter startKey") - } - if len(endKey) == 0 { - return trace.BadParameter("missing parameter endKey") - } - - err := b.retryTx(ctx, b.db.Begin, func(tx Tx) error { - items := tx.DeleteLeaseRange(startKey, endKey) - for _, item := range items { - tx.InsertEvent(types.OpDelete, item) - } - return nil - }) - if errors.Is(err, ErrNotFound) { - return trace.NotFound("backend items do not exist for key range: %q - %q", string(startKey), string(endKey)) - } - return trace.Wrap(err) -} - -// KeepAlive updates expiry for a backend item. A put event is emitted if the -// backend item was updated without error. -func (b *Backend) KeepAlive(ctx context.Context, lease backend.Lease, expires time.Time) error { - if len(lease.Key) == 0 { - return trace.BadParameter("lease key is not specified") - } - - item := backend.Item{ - Key: lease.Key, - ID: lease.ID, - Expires: expires, - } - err := b.retryTx(ctx, b.db.Begin, func(tx Tx) error { - tx.UpdateLease(item) - tx.InsertEvent(types.OpPut, item) - return nil - }) - if errors.Is(err, ErrNotFound) { - return trace.NotFound("backend item does not exist for key %q", string(item.Key)) - } - return trace.Wrap(err) -} - -// now returns the current clock time. -func (b *Backend) now() time.Time { - return b.Config.Clock.Now() -} - -// newLease returns a backend lease for the backend item. -// An empty lease is returned when the backend item never expires. -func newLease(item backend.Item) backend.Lease { - if item.Expires.IsZero() { - return backend.Lease{} - } - return backend.Lease{Key: item.Key, ID: item.ID} -} diff --git a/lib/backend/sqlbk/backend_test.go b/lib/backend/sqlbk/backend_test.go deleted file mode 100644 index f21345c7ba0f5..0000000000000 --- a/lib/backend/sqlbk/backend_test.go +++ /dev/null @@ -1,142 +0,0 @@ -/* -Copyright 2022 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sqlbk - -import ( - "context" - "testing" - "time" - - "github.com/gravitational/teleport/lib/backend" - "github.com/gravitational/trace" - "github.com/stretchr/testify/require" -) - -func TestRetryTx(t *testing.T) { - ctx := context.Background() - b := &Backend{Config: &Config{RetryTimeout: time.Minute}} - t.Run("Return without calling txFn when begin() returns a failed Tx", func(t *testing.T) { - begin := func(context.Context) Tx { - return &testTx{err: errFailedTx} - } - err := b.retryTx(ctx, begin, nil) - require.ErrorIs(t, err, errFailedTx) - }) - t.Run("Commit when txFn returns nil", func(t *testing.T) { - var tx testTx - begin := func(context.Context) Tx { - return &tx - } - txFn := func(Tx) error { - return nil - } - err := b.retryTx(ctx, begin, txFn) - require.Nil(t, err) - require.Equal(t, tx.committed, 1) - }) - t.Run("Rollback when txFn returns an error", func(t *testing.T) { - var tx testTx - begin := func(context.Context) Tx { - return &tx - } - txFn := func(Tx) error { - return errFailedTx - } - err := b.retryTx(ctx, begin, txFn) - require.ErrorIs(t, err, errFailedTx) - require.ErrorIs(t, tx.rollbackErr, errFailedTx) - }) - t.Run("Return Tx error when not ErrRetry", func(t *testing.T) { - var tx testTx - begin := func(context.Context) Tx { - return &tx - } - txFn := func(Tx) error { - tx.err = errFailedTx - return nil - } - err := b.retryTx(ctx, begin, txFn) - require.ErrorIs(t, err, errFailedTx) - require.Nil(t, tx.rollbackErr) - }) - t.Run("Rollback when context is canceled during delay", func(t *testing.T) { - var tx testTx - ctx, cancel := context.WithCancel(ctx) - begin := func(context.Context) Tx { - return &tx - } - txFn := func(Tx) error { - tx.err = ErrRetry - cancel() - return nil - } - b.Config.RetryDelayPeriod = time.Minute - err := b.retryTx(ctx, begin, txFn) - require.ErrorIs(t, err, context.Canceled) - }) - t.Run("fnTx is retried", func(t *testing.T) { - var i int - var txns [2]testTx - begin := func(context.Context) Tx { - return &txns[i] - } - txFn := func(Tx) error { - if i == 0 { - txns[i].err = ErrRetry - } - i++ - return nil - } - b.Config.RetryDelayPeriod = time.Millisecond - err := b.retryTx(ctx, begin, txFn) - require.Nil(t, err) - require.Equal(t, i, 2) - require.Equal(t, ErrRetry, txns[0].err) - require.Nil(t, txns[1].rollbackErr) - require.Equal(t, txns[0].committed, 0) - require.Equal(t, txns[1].committed, 1) - }) -} - -var errFailedTx = trace.BadParameter("failedTx") - -// testTx is a Tx that exposes the transaction err -// and tracks calls to Commit and Rollback. -type testTx struct { - Tx - err error // Transaction error - committed int // Incremented each time Commit is called. - rollbackErr error // Set with err passed to Rollback. -} - -func (tx *testTx) Err() error { - return tx.err -} - -func (tx *testTx) Commit() error { - if tx.err == nil { - tx.committed++ - } - return tx.err -} - -func (tx *testTx) Rollback(err error) error { - tx.rollbackErr = err - return err -} - -var _ backend.Backend = (*Backend)(nil) diff --git a/lib/backend/sqlbk/background.go b/lib/backend/sqlbk/background.go deleted file mode 100644 index 355ed426e602b..0000000000000 --- a/lib/backend/sqlbk/background.go +++ /dev/null @@ -1,197 +0,0 @@ -/* -Copyright 2018-2022 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sqlbk - -import ( - "context" - "errors" - "time" - - "github.com/gravitational/trace" - - "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/api/utils/retryutils" - "github.com/gravitational/teleport/lib/backend" - "github.com/gravitational/teleport/lib/utils" - "github.com/gravitational/teleport/lib/utils/interval" -) - -// start background goroutine to track expired leases, emit events, and purge records. -func (b *Backend) start(ctx context.Context) error { - lastEventID, err := b.initLastEventID(ctx) - if err != nil { - return trace.Wrap(err) - } - b.buf.SetInit() - go b.run(lastEventID) - return nil -} - -// initLastEventID returns the ID of the most recent event stored in the -// database. It will continue to retry on error until the context is canceled. -// -// No background processing can continue until this routine succeeds, so there -// is no internal timeout. Typically, errors will occur when the database is -// down, so this routine will keep trying until the context is canceled or the -// database is up and responds to the query. On startup, the context is the one -// passed to New; after startup it is the backend's close context. -func (b *Backend) initLastEventID(ctx context.Context) (lastEventID int64, err error) { - var periodic *interval.Interval - var logged bool - for { - tx := b.db.ReadOnly(ctx) - lastEventID = tx.GetLastEventID() - if tx.Commit() == nil { - break - } - if !logged { - b.Log.Errorf("Failed to query for last event ID: %v. Background routine is paused.", tx.Err()) - logged = true - } - - // Retry after a short delay. - if periodic == nil { - periodic = interval.New(interval.Config{ - Duration: b.PollStreamPeriod, - FirstDuration: utils.HalfJitter(b.PollStreamPeriod), - Jitter: retryutils.NewSeventhJitter(), - }) - defer periodic.Stop() - } - select { - case <-periodic.Next(): - case <-ctx.Done(): - return 0, trace.Wrap(ctx.Err()) - } - } - - if logged { - b.Log.Info("Successfully queried for last event ID. Background routine has started.") - } - - return lastEventID, nil -} - -// run background process. -// - Poll the database to delete expired leases and emit events every PollStreamPeriod (1s). -// - Purge expired backend items and emitted events every PurgePeriod (20s). -func (b *Backend) run(eventID int64) { - defer close(b.bgDone) - - pollPeriodic := interval.New(interval.Config{ - Duration: b.PollStreamPeriod, - FirstDuration: utils.HalfJitter(b.PollStreamPeriod), - Jitter: retryutils.NewSeventhJitter(), - }) - defer pollPeriodic.Stop() - - purgePeriodic := interval.New(interval.Config{ - Duration: b.PurgePeriod, - FirstDuration: utils.HalfJitter(b.PurgePeriod), - Jitter: retryutils.NewSeventhJitter(), - }) - defer purgePeriodic.Stop() - - var err error - var loggedError bool // don't spam logs - for { - select { - case <-b.closeCtx.Done(): - return - - case <-pollPeriodic.Next(): - eventID, err = b.poll(eventID) - - case <-purgePeriodic.Next(): - err = b.purge() - } - - if err == nil { - loggedError = false - continue - } - - if !loggedError { - // Downgrade log level on timeout. Operation will try again. - if errors.Is(err, context.Canceled) { - b.Log.Warn(err) - } else { - b.Log.Error(err) - } - loggedError = true - } - } -} - -// purge events and expired items. -func (b *Backend) purge() error { - ctx, cancel := context.WithTimeout(b.closeCtx, b.PollStreamPeriod) - defer cancel() - tx := b.db.Begin(ctx) - tx.DeleteExpiredLeases() - tx.DeleteEvents(b.now().Add(-backend.DefaultEventsTTL)) - tx.DeleteItems() - return tx.Commit() -} - -// poll for expired leases and create delete events. Then emit events whose ID -// is greater than fromEventID. Events are emitted in the order they were -// created. Return the event ID of the last event emitted. -// -// This function also resets the buffer when it detects latency emitting events. -// The buffer is reset when the number of events remaining to emit combined with -// the maximum number of events emitted each poll period exceeds EventsTTL. Or -// simply, there are too many events to emit before they will be deleted, so we -// need to start over to prevent missing events and corrupting downstream caches. -func (b *Backend) poll(fromEventID int64) (lastEventID int64, err error) { - ctx, cancel := context.WithTimeout(b.closeCtx, b.PollStreamPeriod) - defer cancel() - - tx := b.db.Begin(ctx) - - var item backend.Item - for _, lease := range tx.GetExpiredLeases() { - item.ID = lease.ID - item.Key = lease.Key - tx.InsertEvent(types.OpDelete, item) - if tx.Err() != nil { - return fromEventID, tx.Err() - } - } - - limit := b.Config.BufferSize / 2 - events := tx.GetEvents(fromEventID, limit) - if tx.Commit() != nil { - return fromEventID, tx.Err() - } - - // Latency check. - timeNeeded := time.Duration(events.Remaining/limit) * b.PollStreamPeriod - if timeNeeded > b.EventsTTL { - b.buf.Reset() - lastEventID, err := b.initLastEventID(b.closeCtx) - if err != nil { // err = closeCtx.Err() - return 0, trace.Wrap(err) - } - b.buf.SetInit() - return lastEventID, nil - } - - b.buf.Emit(events.BackendEvents...) - - return events.LastID, nil -} diff --git a/lib/backend/sqlbk/config.go b/lib/backend/sqlbk/config.go deleted file mode 100644 index dd30863cce9b2..0000000000000 --- a/lib/backend/sqlbk/config.go +++ /dev/null @@ -1,140 +0,0 @@ -/* -Copyright 2018-2022 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sqlbk - -import ( - "time" - - "github.com/gravitational/teleport/lib/backend" - "github.com/gravitational/trace" - "github.com/jonboulle/clockwork" - "github.com/sirupsen/logrus" -) - -const ( - // DefaultPurgePeriod is the default frequency for purging database records. - DefaultPurgePeriod = 20 * time.Second - - // DefaultDatabase is default name of the backend database. - DefaultDatabase = "teleport" - - // DefaultRetryDelayPeriod is the default delay before a transaction will retry on - // serialization failure. - DefaultRetryDelayPeriod = 250 * time.Millisecond - - // DefaultRetryTimeout is the default amount time allocated to retrying transactions. - DefaultRetryTimeout = 10 * time.Second -) - -// Config defines a configuration for the Backend. -type Config struct { - // Addr defines the host:port of the database instance. - Addr string `json:"addr,omitempty"` - - // Database is the database where teleport will store its data. - Database string `json:"database,omitempty"` - - // TLS defines configurations for validating server certificates - // and mutual authentication. - TLS struct { - // ClientKeyFile is the path to the database user's private - // key file used for authentication. - ClientKeyFile string `json:"client_key_file,omitempty"` - - // ClientCertFile is the path to the database user's certificate - // file used for authentication. - ClientCertFile string `json:"client_cert_file,omitempty"` - - // TLSCAFile is the trusted certificate authority used to generate the - // client certificates. - CAFile string `json:"ca_file,omitempty"` - } `json:"tls"` - - // BufferSize is a default buffer size used to emit events. - BufferSize int `json:"buffer_size,omitempty"` - - // EventsTTL is amount of time before an event is purged. - EventsTTL time.Duration `json:"events_ttl,omitempty"` - - // PollStreamPeriod is the polling period for the event stream. - PollStreamPeriod time.Duration `json:"poll_stream_period,omitempty"` - - // PurgePeriod is the frequency for purging database records. - PurgePeriod time.Duration `json:"purge_period,omitempty"` - - // RetryDelayPeriod is the frequency a transaction is retried due to - // serialization conflict. - RetryDelayPeriod time.Duration `json:"retry_period,omitempty"` - - // RetryTimeout is the amount of time allocated to retrying transactions. - // Setting a value less than RetryDelayPeriod disables retries. - RetryTimeout time.Duration `json:"retry_timeout,omitempty"` - - // Clock overrides the clock used by the backend. - Clock clockwork.Clock `json:"-"` - - // Log defines the log entry used by the backend. - Log *logrus.Entry `json:"-"` -} - -// CheckAndSetDefaults validates required fields and sets default -// values for fields that have not been set. -func (c *Config) CheckAndSetDefaults() error { - if c.Database == "" { - c.Database = DefaultDatabase - } - if c.BufferSize <= 0 { - c.BufferSize = backend.DefaultBufferCapacity - } - if c.EventsTTL == 0 { - c.EventsTTL = backend.DefaultEventsTTL - } - if c.PollStreamPeriod <= 0 { - c.PollStreamPeriod = backend.DefaultPollStreamPeriod - } - if c.PurgePeriod <= 0 { - c.PurgePeriod = DefaultPurgePeriod - } - if c.RetryDelayPeriod == 0 { - c.RetryDelayPeriod = DefaultRetryDelayPeriod - } - if c.RetryTimeout == 0 { - c.RetryTimeout = DefaultRetryTimeout - } - if c.EventsTTL < c.PollStreamPeriod { - return trace.BadParameter("PollStreamPeriod must be greater than EventsTTL to emit storage events") - } - if c.Log == nil { - return trace.BadParameter("Log is required") - } - if c.Clock == nil { - return trace.BadParameter("Clock is required") - } - if c.Addr == "" { - return trace.BadParameter("Addr is required") - } - if c.TLS.CAFile == "" { - return trace.BadParameter("TLS.CAFile is required") - } - if c.TLS.ClientKeyFile == "" { - return trace.BadParameter("TLS.ClientKeyFile is required") - } - if c.TLS.ClientCertFile == "" { - return trace.BadParameter("TLS.ClientCertFile is required") - } - return nil -} diff --git a/lib/backend/sqlbk/doc.go b/lib/backend/sqlbk/doc.go deleted file mode 100644 index b44a9db4afac6..0000000000000 --- a/lib/backend/sqlbk/doc.go +++ /dev/null @@ -1,33 +0,0 @@ -/* -Copyright 2022 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -/* -Package sqlbk implements a storage backend SQL databases. - -The backend requires a Driver, which is an abstraction for communicating with a -specific database platforms such as PostgreSQL. A Driver opens a connection pool -that communicates with a database instance through a DB interface. A DB exposes -an interface to create transactions with cancellation through a Tx interface. - - Driver -> DB -> Tx - -# Testing - -Test a Driver implementation using the TestDriver package function. The test -will configure the driver for use with a test backend and execute the backend -test suite. See driver implementations for details about configuring tests. -*/ -package sqlbk diff --git a/lib/backend/sqlbk/driver.go b/lib/backend/sqlbk/driver.go deleted file mode 100644 index 88a7e8eee493d..0000000000000 --- a/lib/backend/sqlbk/driver.go +++ /dev/null @@ -1,166 +0,0 @@ -/* -Copyright 2022 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sqlbk - -import ( - "context" - "errors" - "io" - "time" - - "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/lib/backend" -) - -// The following errors are used as signals returned by driver implementations to -// a backend instance. It is important to not return trace errors such as -// trace.AlreadyExists and trace.NotFound from driver implementations because -// they have a specific meaning when returned from the backend. It is the -// responsibility of the backend to return the correct type of error, not the -// driver. -var ( - // ErrRetry is set as a transaction error when the transaction should be retried - // due to serialization failure. - ErrRetry = errors.New("retry") - - // ErrNotFound is returned by a transaction when a SQL query returns sql.ErrNoRows. - ErrNotFound = errors.New("not found") - - // ErrAlreadyExists is returned by a transaction when a SQL query returns a - // unique constraint violation. - ErrAlreadyExists = errors.New("already exists") -) - -// Driver defines the interface implemented by specific SQL backend -// implementations such as postgres. -type Driver interface { - // BackendName returns the name of the backend that created the driver. - BackendName() string - - // Config returns the SQL backend configuration. - Config() *Config - - // Open the database. The returned DB represents a database connection pool - // referencing a specific database instance. - Open(context.Context) (DB, error) -} - -// DB defines an interface to a database instance backed by a connection pool. -type DB interface { - io.Closer - - // Begin a read/write transaction. Canceling context will rollback the - // transaction. - Begin(context.Context) Tx - - // ReadOnly begins a read-only transaction. Canceling context will rollback - // the transaction. Calling a mutating Tx method will result in a failed - // transaction. - ReadOnly(context.Context) Tx -} - -// Tx defines a database transaction. A transaction can be in one of three -// states: committed, error, or active. New transactions begin in an active -// state until either Commit or Rollback is called or another method call -// places it in an error state. Calling any method other than Err after Commit -// is called is an undefined operation. -type Tx interface { - // Err returns a transaction error. Calling other Tx methods has no effect - // on the state of the transaction. - Err() error - - // Commit the transaction. The same error returned from the Err method is - // returned from Commit when the transaction is in an error state. - Commit() error - - // Rollback the transaction with an error. The error passed to Rollback is - // converted to a trace error and set as the transaction error returned from - // Err. If the transaction is already in an error state, the error is - // overridden by the error passed. Passing a nil error is considered a bug, - // but the rollback will continue with a generated error if the transaction - // is not already in an error state. - Rollback(error) error - - // DeleteEvents created before expiryTime. - DeleteEvents(expiryTime time.Time) - - // DeleteExpiredLeases removes leases whose expires column is not null and is - // less than the current time. - DeleteExpiredLeases() - - // DeleteItems not referencing an event or a valid lease. - DeleteItems() - - // DeleteLease by key returning the backend item ID from the deleted lease. - // Zero is returned when the delete fails. - DeleteLease(key []byte) (id int64) - - // DeleteLeaseRange removes all leases inclusively between startKey - // and endKey. It returns the set of backend items deleted. The returned - // items include only Key and ID. - DeleteLeaseRange(startKey, endKey []byte) []backend.Item - - // GetEvents returns an ordered set of events up to limit whose ID is - // greater than fromEventID. - GetEvents(fromEventID int64, limit int) Events - - // GetExpiredLeases returns all leases whose expires field is less than - // or equal to the current time. - GetExpiredLeases() []backend.Lease - - // GetItem by key. Nil is returned if the item has expired. - GetItem(key []byte) *backend.Item - - // GetItemRange returns a set of backend items whose key is inclusively between - // startKey and endKey. The returned items are ordered by key, will not exceed - // limit, and does not include expired items. - GetItemRange(startKey, endKey []byte, limit int) []backend.Item - - // GetItemValue returns an item's value by key if the item has not expired. - GetItemValue(key []byte) []byte - - // GetLastEventID returns the most recent eventid. Zero is returned when the - // event table is empty. - GetLastEventID() int64 - - // InsertEvent for backend item with evenType. - InsertEvent(types.OpType, backend.Item) - - // InsertItem creates a new backend item ID, inserts the item, and returns the - // new ID. The transaction will be set to an ErrRetry failed state if the ID - // generated is already taken, which can happen when multiple transactions - // are attempting to add the same item (the test suite's concurrent test - // produces this scenario). - InsertItem(item backend.Item) (id int64) - - // LeaseExists returns true if a lease exists for key that has not expired. - LeaseExists(key []byte) bool - - // UpdateLease creates or updates a backend item. - UpdateLease(backend.Item) - - // UpsertLease for backend item. The transaction is set to a NotFound error - // state if the backend item does not exist. - UpsertLease(backend.Item) -} - -// Events is returned from the GetEvents Tx method. -type Events struct { - LastID int64 // ID of the most recent event in BackendEvents. - Remaining int // Number of events whose ID is greater than LastID. - BackendEvents []backend.Event // Set of backend events. -} diff --git a/lib/backend/sqlbk/test.go b/lib/backend/sqlbk/test.go deleted file mode 100644 index 511c1a839adac..0000000000000 --- a/lib/backend/sqlbk/test.go +++ /dev/null @@ -1,232 +0,0 @@ -/* -Copyright 2018-2022 Gravitational, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package sqlbk - -import ( - "context" - "testing" - "time" - - "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/lib/backend" - "github.com/gravitational/teleport/lib/backend/test" - "github.com/gravitational/trace" - - "github.com/jonboulle/clockwork" - "github.com/stretchr/testify/require" -) - -// TestDriver executes the backend compliance suite for a driver. A single -// backend is created so connections remain open for all subtests. -func TestDriver(t *testing.T, driver Driver) { - // Create test configuration. - fakeClock := clockwork.NewFakeClock() - cfg := driver.Config() - cfg.Clock = fakeClock - cfg.PurgePeriod = time.Minute - cfg.RetryTimeout = time.Minute - cfg.PollStreamPeriod = time.Millisecond * 300 - - // Init Backend - bk, err := newWithConfig(context.Background(), driver, cfg) - require.NoError(t, err) - t.Cleanup(func() { bk.Close() }) - - // Start background process. - err = bk.start(context.Background()) - require.NoError(t, err) - - // Run test suite. - t.Run("Backend Compliance Suite", func(t *testing.T) { - newBackend := func(options ...test.ConstructionOption) (backend.Backend, clockwork.FakeClock, error) { - opts, err := test.ApplyOptions(options) - if err != nil { - return nil, nil, trace.Wrap(err) - } - - if opts.MirrorMode { - return nil, nil, test.ErrMirrorNotSupported - } - - bk := &testBackend{Backend: bk} - bk.buf = backend.NewCircularBuffer(backend.BufferCapacity(bk.BufferSize)) - bk.buf.SetInit() - return bk, fakeClock, nil - } - test.RunBackendComplianceSuite(t, newBackend) - }) - - // Stop background routine for the remaining tests. - bk.closeFn() - <-bk.bgDone - bk.closeCtx, bk.closeFn = context.WithCancel(context.Background()) - - // Purge tests the background's purge function. - t.Run("Purge", func(t *testing.T) { - // - Create 4 items (a, b, c, d) - // - a/c are active - // - b/d have expired - // - Call purge with d-1 event ID - // - Confirm: - // - b item removed (no event or lease) - // - b/d leases removed (expired) - // - a event removed (before DefaultEventsTTL) - - // Create items - createItem := func(tx Tx, key string, expires time.Time) backend.Item { - item := backend.Item{Key: []byte(key), Expires: expires, Value: []byte("value")} - item.ID = tx.InsertItem(item) - tx.UpsertLease(item) - tx.InsertEvent(types.OpPut, item) - return item - } - tx := bk.db.Begin(context.Background()) - a := createItem(tx, "/purgetest/a", time.Time{}) // active - bk.Config.Clock.(clockwork.FakeClock).Advance(backend.DefaultEventsTTL + time.Second) - b := createItem(tx, "/purgetest/b", fakeClock.Now().Add(-time.Second)) // expired - c := createItem(tx, "/purgetest/c", fakeClock.Now().Add(backend.DefaultEventsTTL*2)) // active - d := createItem(tx, "/purgetest/d", fakeClock.Now().Add(-time.Second)) // expired with event - require.Greater(t, tx.GetLastEventID(), int64(0)) - require.NoError(t, tx.Commit()) - - // Purge - require.NoError(t, bk.purge()) - - // Validate results. - tx = bk.db.ReadOnly(context.Background()) - t.Cleanup(func() { tx.Commit() }) - - // Get a single event so we can cover getEventsRemaining. - var fromEventID int64 - events := tx.GetEvents(fromEventID, 1) - require.Greater(t, events.LastID, fromEventID) - require.Equal(t, 2, events.Remaining) - require.Equal(t, 1, len(events.BackendEvents)) - require.Equal(t, b.Key, events.BackendEvents[0].Item.Key) - - // Get the rest of the events. - fromEventID = events.LastID - events = tx.GetEvents(fromEventID, 10) - require.Greater(t, events.LastID, fromEventID) - require.Equal(t, 0, events.Remaining) - require.Equal(t, 2, len(events.BackendEvents)) - require.Equal(t, c.Key, events.BackendEvents[0].Item.Key) - require.Equal(t, d.Key, events.BackendEvents[1].Item.Key) - - // Assert leases exist or not. - require.True(t, tx.LeaseExists(a.Key)) - require.False(t, tx.LeaseExists(b.Key)) - require.True(t, tx.LeaseExists(c.Key)) - require.False(t, tx.LeaseExists(d.Key)) - - // Validate a range query returns the correct items. This joins the item - // and lease tables so we can test both at the same time. - items := tx.GetItemRange(a.Key, d.Key, 10) - require.Equal(t, 2, len(items)) - require.Equal(t, items[0].Key, a.Key) - require.Equal(t, items[1].Key, c.Key) - }) - - // Poll tests the backend poll function's ability to reset the buffer when it - // falls behind emitting events due to latency. - t.Run("Poll", func(t *testing.T) { - // - Configure backend so a single event is emitted at a time and the - // buffer is reset when there are two or more events remaining. - // - Create three items/events and detect that the watcher is closed. - // - Add a fourth item and detect that the buffer emits it and skips all - // previous items. - - backupConfig := *bk.Config - t.Cleanup(func() { *bk.Config = backupConfig }) - - bk.buf = backend.NewCircularBuffer(backend.BufferCapacity(bk.BufferSize)) - bk.buf.SetInit() - - // Setup watcher to receive events. - createWatcher := func() backend.Watcher { - watcher, err := bk.NewWatcher(context.Background(), backend.Watch{Name: "PollTest"}) - require.NoError(t, err) - select { - case event := <-watcher.Events(): - require.Equal(t, types.OpInit, event.Type) - case <-watcher.Done(): - t.Fatal("watcher done unexpectedly") - } - return watcher - } - watcher := createWatcher() - - // Update config to trigger buffer reset due to latency emitting events. - // Formula: /(BufferSize/2)*PollStreamPeriod > EventsTTL - bk.BufferSize = 2 // emit 1 event at a time - bk.EventsTTL = time.Second - bk.PurgePeriod = time.Second - bk.PollStreamPeriod = time.Second - - // Insert three events. Poll will get first event and detect 2 remaining. - createEvent := func(tx Tx, key string) backend.Item { - item := backend.Item{Key: []byte(key), Value: []byte("value")} - item.ID = tx.InsertItem(item) - tx.InsertEvent(types.OpPut, item) - return item - } - tx := bk.db.Begin(context.Background()) - createEvent(tx, "/polltest/a") - createEvent(tx, "/polltest/b") - createEvent(tx, "/polltest/c") - require.NoError(t, tx.Commit()) - - // First poll call should detect latency and reset the buffer. - lastEventID, err := bk.poll(0) - require.NoError(t, err) - require.Greater(t, lastEventID, int64(0)) // points to "c" event - select { - case <-watcher.Done(): - // OK: buffer reset closed watcher. - case event := <-watcher.Events(): - require.Failf(t, "expected watcher to close", "received %+v", event) - } - - // lastEventID should now be set to "c" event. - // Adding a new "d" item should emit an event for "d" and not "b". - watcher = createWatcher() - fromEventID := lastEventID - tx = bk.db.Begin(context.Background()) - d := createEvent(tx, "/polltest/d") - require.NoError(t, tx.Commit()) - lastEventID, err = bk.poll(fromEventID) - require.NoError(t, err) - require.Greater(t, lastEventID, fromEventID) - select { - case event := <-watcher.Events(): - require.Equal(t, types.OpPut, event.Type) - require.Equal(t, d.Key, event.Item.Key) - case <-watcher.Done(): - require.Fail(t, "watcher done unexpectedly") - } - }) -} - -// testBackend wraps Backend overriding Close. -type testBackend struct { - *Backend -} - -// Close only the buffer so buffer watchers are notified of close events. -func (b *testBackend) Close() error { - return b.buf.Close() -} diff --git a/lib/config/configuration.go b/lib/config/configuration.go index d42f2805c493b..06f48b7f7637c 100644 --- a/lib/config/configuration.go +++ b/lib/config/configuration.go @@ -35,10 +35,11 @@ import ( "time" "unicode" - "golang.org/x/crypto/ssh" - "github.com/go-ldap/ldap/v3" "github.com/gravitational/trace" + log "github.com/sirupsen/logrus" + "golang.org/x/crypto/ssh" + kyaml "k8s.io/apimachinery/pkg/util/yaml" "github.com/gravitational/teleport" "github.com/gravitational/teleport/api/constants" @@ -48,7 +49,6 @@ import ( "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/backend/lite" "github.com/gravitational/teleport/lib/backend/memory" - "github.com/gravitational/teleport/lib/backend/postgres" "github.com/gravitational/teleport/lib/client" "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/limiter" @@ -57,9 +57,6 @@ import ( "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/tlsca" "github.com/gravitational/teleport/lib/utils" - - log "github.com/sirupsen/logrus" - kyaml "k8s.io/apimachinery/pkg/util/yaml" ) // CommandLineFlags stores command line flag values, it's a much simplified subset @@ -275,13 +272,6 @@ func ApplyFileConfig(fc *FileConfig, cfg *service.Config) error { if fc.Storage.Type == lite.AlternativeName { fc.Storage.Type = lite.GetName() } - // If the alternative name "cockroachdb" is given, update it to "postgres". - if fc.Storage.Type == postgres.AlternativeName { - fc.Storage.Type = postgres.GetName() - } - - // Fix yamlv2 issue with nested storage sections. - fc.Storage.Params.Cleanse() cfg.Auth.StorageConfig = fc.Storage // backend is specified, but no path is set, set a reasonable default diff --git a/lib/service/service.go b/lib/service/service.go index c63e90b3c84b1..74d3627a15be2 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -71,7 +71,6 @@ import ( "github.com/gravitational/teleport/lib/backend/firestore" "github.com/gravitational/teleport/lib/backend/lite" "github.com/gravitational/teleport/lib/backend/memory" - "github.com/gravitational/teleport/lib/backend/postgres" "github.com/gravitational/teleport/lib/bpf" "github.com/gravitational/teleport/lib/cache" "github.com/gravitational/teleport/lib/cloud" @@ -4539,9 +4538,6 @@ func (process *TeleportProcess) initAuthStorage() (bk backend.Backend, err error // etcd backend. case etcdbk.GetName(): bk, err = etcdbk.New(ctx, bc.Params) - // PostgreSQL backend - case postgres.GetName(): - bk, err = postgres.New(ctx, bc.Params) default: err = trace.BadParameter("unsupported secrets storage type: %q", bc.Type) }