Skip to content

Commit

Permalink
feat: added restore/sql API endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
williamhbaker committed Jun 2, 2021
1 parent 2ec0d6f commit f5771f9
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 83 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ This release adds an embedded SQLite database for storing metadata required by t
1. [21543](https://github.com/influxdata/influxdb/pull/21543): Added `influxd` configuration flag `--sqlite-path` for specifying a user-defined path to the SQLite database file
1. [21543](https://github.com/influxdata/influxdb/pull/21543): Updated `influxd` configuration flag `--store` to work with string values `disk` or `memory`. Memory continues to store metadata in-memory for testing; `disk` will persist metadata to disk via bolt and SQLite
1. [21547](https://github.com/influxdata/influxdb/pull/21547): Allow hiding the tooltip independently of the static legend
1. [21584](https://github.com/influxdata/influxdb/pull/21584): Added the `api/v2/backup/metadata` endpoint for backing up both KV and SQL metadata, and the `api/v2/restore/sql` for restoring SQL metadata.

### Bug Fixes

Expand Down
14 changes: 10 additions & 4 deletions authorizer/sqlbackup.go → authorizer/sql_backup_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,22 @@ func NewSqlBackupRestoreService(s influxdb.SqlBackupRestoreService) *SqlBackupRe
}
}

func (b SqlBackupRestoreService) BackupSqlStore(ctx context.Context, w io.Writer) error {
func (s SqlBackupRestoreService) BackupSqlStore(ctx context.Context, w io.Writer) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

if err := IsAllowedAll(ctx, influxdb.OperPermissions()); err != nil {
return err
}
return b.s.BackupSqlStore(ctx, w)
return s.s.BackupSqlStore(ctx, w)
}

func (b SqlBackupRestoreService) RestoreSqlStore(ctx context.Context, r io.Reader) error {
return nil
func (s SqlBackupRestoreService) RestoreSqlStore(ctx context.Context, r io.Reader) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()

if err := IsAllowedAll(ctx, influxdb.OperPermissions()); err != nil {
return err
}
return s.s.RestoreSqlStore(ctx, r)
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,47 @@ func Test_BackupSqlStore(t *testing.T) {
})
}
}

func Test_RestoreSqlStore(t *testing.T) {
t.Parallel()

tests := []struct {
name string
permList []influxdb.Permission
wantErr error
}{
{
"authorized to do the restore",
influxdb.OperPermissions(),
nil,
},
{
"not authorized to do the restore",
influxdb.ReadAllPermissions(),
&errors.Error{
Msg: "write:authorizations is unauthorized",
Code: errors.EUnauthorized,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrlr := gomock.NewController(t)
svc := mock.NewMockSqlBackupRestoreService(ctrlr)
s := authorizer.NewSqlBackupRestoreService(svc)

w := bytes.NewBuffer([]byte{})

if tt.wantErr == nil {
svc.EXPECT().
RestoreSqlStore(gomock.Any(), w).
Return(nil)
}

ctx := influxdbcontext.SetAuthorizer(context.Background(), mock.NewMockAuthorizer(false, tt.permList))
err := s.RestoreSqlStore(ctx, w)
require.Equal(t, tt.wantErr, err)
})
}
}
4 changes: 4 additions & 0 deletions backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ type BackupService interface {
BackupShard(ctx context.Context, w io.Writer, shardID uint64, since time.Time) error
}

// SqlBackupRestoreService represents the backup and restore functions for the sqlite database.
type SqlBackupRestoreService interface {
// BackupSqlStore creates a live backup copy of the sqlite database.
BackupSqlStore(ctx context.Context, w io.Writer) error

// RestoreSqlStore restores & replaces the sqlite database.
RestoreSqlStore(ctx context.Context, r io.Reader) error
}

Expand Down
1 change: 1 addition & 0 deletions http/api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func NewAPIHandler(b *APIBackend, opts ...APIHandlerOptFn) *APIHandler {

restoreBackend := NewRestoreBackend(b)
restoreBackend.RestoreService = authorizer.NewRestoreService(restoreBackend.RestoreService)
restoreBackend.SqlBackupRestoreService = authorizer.NewSqlBackupRestoreService(restoreBackend.SqlBackupRestoreService)
h.Mount(prefixRestore, NewRestoreHandler(restoreBackend))

h.Mount(dbrp.PrefixDBRP, dbrp.NewHTTPHandler(b.Logger, b.DBRPService, b.OrganizationService))
Expand Down
142 changes: 106 additions & 36 deletions sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"

"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/pkg/fs"
"github.com/jmoiron/sqlx"
"github.com/mattn/go-sqlite3"

Expand All @@ -25,30 +26,44 @@ const (
// SqlStore is a wrapper around the db and provides basic functionality for maintaining the db
// including flushing the data from the db during end-to-end testing.
type SqlStore struct {
Mu sync.Mutex
DB *sqlx.DB
log *zap.Logger
Mu sync.Mutex
DB *sqlx.DB
log *zap.Logger
path string
}

func NewSqlStore(path string, log *zap.Logger) (*SqlStore, error) {
db, err := sqlx.Open("sqlite3", path)
if err != nil {
s := &SqlStore{
log: log,
path: path,
}

if err := s.openDB(); err != nil {
return nil, err
}
log.Info("Resources opened", zap.String("path", path))

return s, nil
}

// open the file at the specified path
func (s *SqlStore) openDB() error {
db, err := sqlx.Open("sqlite3", s.path)
if err != nil {
return err
}
s.log.Info("Resources opened", zap.String("path", s.path))

// If using an in-memory database, don't allow more than 1 connection. Each connection
// is given a "new" database. We can't use a shared cache in-memory database because
// parallel tests that run multiple launchers in the same process will have issues doing
// concurrent writes to the database. See: https://sqlite.org/inmemorydb.html
if path == InmemPath {
if s.path == InmemPath {
db.SetMaxOpenConns(1)
}

return &SqlStore{
DB: db,
log: log,
}, nil
s.DB = db

return nil
}

// Close the connection to the sqlite database
Expand Down Expand Up @@ -106,6 +121,23 @@ func (s *SqlStore) BackupSqlStore(ctx context.Context, w io.Writer) error {
return err
}

if err := backup(ctx, dest, s); err != nil {
return err
}

// open the backup file so it can be copied to the destination writer
f, err := os.Open(destPath)
if err != nil {
return err
}
defer f.Close()

// copy the backup to the destination writer
_, err = io.Copy(w, f)
return err
}

func backup(ctx context.Context, dest, src *SqlStore) error {
// get the connection for the destination so we can get the underlying sqlite connection
destConn, err := dest.DB.Conn(ctx)
if err != nil {
Expand All @@ -120,7 +152,7 @@ func (s *SqlStore) BackupSqlStore(ctx context.Context, w io.Writer) error {
}

// get the connection for the source database so we can get the underlying sqlite connection
srcConn, err := s.DB.Conn(ctx)
srcConn, err := src.DB.Conn(ctx)
if err != nil {
return err
}
Expand All @@ -145,25 +177,77 @@ func (s *SqlStore) BackupSqlStore(ctx context.Context, w io.Writer) error {
}

// close the backup once it's done
err = bk.Finish()
return bk.Finish()
}

// sqliteFromSqlConn returns the underlying sqlite3 connection from an sql connection
func sqliteFromSqlConn(c *sql.Conn) (*sqlite3.SQLiteConn, error) {
var sqliteConn *sqlite3.SQLiteConn
err := c.Raw(func(driverConn interface{}) error {
sqliteConn = driverConn.(*sqlite3.SQLiteConn)
return nil
})
if err != nil {
return nil, err
}

return sqliteConn, nil
}

// RestoreSqlStore replaces the underlying database with the data from r.
func (s *SqlStore) RestoreSqlStore(ctx context.Context, r io.Reader) error {
tempDir, err := ioutil.TempDir("", "")
if err != nil {
return err
}
defer os.RemoveAll(tempDir)

// open the backup file so it can be copied to the destination writer
f, err := os.Open(destPath)
tempFileName := fmt.Sprintf("%s/%s", tempDir, DefaultFilename)

f, err := os.Create(tempFileName)
if err != nil {
return err
}
defer f.Close()

// copy the backup to the destination writer
_, err = io.Copy(w, f)
return err
}
// Copy the contents of r to the temporary file
if _, err := io.Copy(f, r); err != nil {
return err
} else if err := f.Sync(); err != nil {
return err
} else if err := f.Close(); err != nil {
return err
}

func (s *SqlStore) RestoreSqlStore(ctx context.Context, r io.Reader) error {
return nil
// Close the current DB.
if err := s.Close(); err != nil {
return err
}

// If we're using a :memory: database, we need to open a new DB (which will be completely empty),
// and then use the sqlite backup API to copy the data from the restored db file into the database.
// Otherwise, we can just atomically swap the file and re-open the DB.
if s.path == InmemPath {
if err := s.openDB(); err != nil {
return err
}
// Open the temporary file - this is the "source" DB for doing the restore
tempDB, err := NewSqlStore(tempFileName, s.log.With(zap.String("service", "temp backup sqlite")))
if err != nil {
return err
}
defer tempDB.Close()

// Copy the data from the temporary restored DB into the currently open DB
return backup(ctx, s, tempDB)
}

// Atomically swap the temporary file with the current DB file.
if err := fs.RenameFileWithReplacement(tempFileName, s.path); err != nil {
return err
}

// Reopen the new database file
return s.openDB()
}

func (s *SqlStore) execTrans(ctx context.Context, stmt string) error {
Expand Down Expand Up @@ -233,17 +317,3 @@ func (s *SqlStore) queryToStrings(stmt string) ([]string, error) {

return output, nil
}

// sqliteFromSqlConn returns the underlying sqlite3 connection from an sql connection
func sqliteFromSqlConn(c *sql.Conn) (*sqlite3.SQLiteConn, error) {
var sqliteConn *sqlite3.SQLiteConn
err := c.Raw(func(driverConn interface{}) error {
sqliteConn = driverConn.(*sqlite3.SQLiteConn)
return nil
})
if err != nil {
return nil, err
}

return sqliteConn, nil
}
Loading

0 comments on commit f5771f9

Please sign in to comment.