Skip to content

Commit

Permalink
feat: add support for RESTORE DATABASE
Browse files Browse the repository at this point in the history
  • Loading branch information
NoyException committed Dec 26, 2024
1 parent 377eadb commit 5e96845
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 24 deletions.
27 changes: 25 additions & 2 deletions .github/workflows/backup-restore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches:
- main
- compatibility
- noy/feat-restore-database
- test
pull_request:
branches: [ "main" ]
Expand Down Expand Up @@ -147,7 +147,7 @@ jobs:
psql "postgres://postgres:password@localhost:15432/testdb" \
-c "INSERT INTO test_table VALUES (4, 'offline data 4');"
- name: Restore MyDuck
- name: Restore MyDuck at Startup
run: |
# Restart MyDuck
./myduckserver \
Expand All @@ -165,6 +165,29 @@ jobs:
# Kill MyDuck
pkill myduckserver
rm -f ./myduck.db
- name: Restore MyDuck at Runtime
run: |
# Start MyDuck
./myduckserver &
sleep 5
psql "postgres://postgres:@127.0.0.1:5432" <<-EOSQL
RESTORE DATABASE testdb2 FROM 's3c://myduck-backup/myduck/myduck.bak'
ENDPOINT = '127.0.0.1:9001'
ACCESS_KEY_ID = 'minioadmin'
SECRET_ACCESS_KEY = 'minioadmin';
EOSQL
sleep 10
- name: Test Replication
run: |
# Verify replication catches up
psql -h 127.0.0.1 -p 5432 -U postgres -d testdb2 -c "SELECT 1 FROM test_table WHERE id = 4 AND name = 'offline data 4';" | grep -q 1
# Kill MyDuck
pkill myduckserver
- name: Cleanup
if: always()
Expand Down
31 changes: 24 additions & 7 deletions catalog/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,33 @@ func (prov *DatabaseProvider) attachCatalogs() error {
return fmt.Errorf("failed to read data directory: %w", err)
}
for _, file := range files {
if file.IsDir() {
continue
err := prov.AttachCatalog(file, true)
if err != nil {
logrus.Error(err)
}
if !strings.HasSuffix(file.Name(), ".db") {
continue
}
return nil
}

func (prov *DatabaseProvider) AttachCatalog(file interface {
IsDir() bool
Name() string
}, ignoreNonDB bool) error {
if file.IsDir() {
if ignoreNonDB {
return nil
}
name := strings.TrimSuffix(file.Name(), ".db")
if _, err := prov.storage.ExecContext(context.Background(), "ATTACH IF NOT EXISTS '"+filepath.Join(prov.dataDir, file.Name())+"' AS "+name); err != nil {
logrus.WithError(err).Errorf("Failed to attach database %s", name)
return fmt.Errorf("file %s is a directory", file.Name())
}
if !strings.HasSuffix(file.Name(), ".db") {
if ignoreNonDB {
return nil
}
return fmt.Errorf("file %s is not a database file", file.Name())
}
name := strings.TrimSuffix(file.Name(), ".db")
if _, err := prov.storage.ExecContext(context.Background(), "ATTACH IF NOT EXISTS '"+filepath.Join(prov.dataDir, file.Name())+"' AS "+name); err != nil {
return fmt.Errorf("failed to attach database %s: %w", name, err)
}
return nil
}
Expand Down
1 change: 1 addition & 0 deletions pgserver/connection_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type ConvertedStatement struct {
PgParsable bool
SubscriptionConfig *SubscriptionConfig
BackupConfig *BackupConfig
RestoreConfig *RestoreConfig
}

// copyFromStdinState tracks the metadata for an import of data into a table using a COPY FROM STDIN statement. When
Expand Down
18 changes: 17 additions & 1 deletion pgserver/connection_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,14 @@ func (h *ConnectionHandler) run(statement ConvertedStatement) error {
return h.send(&pgproto3.ErrorResponse{
Message: msg,
})
} else if statement.RestoreConfig != nil {
msg, err := h.executeRestore(statement.RestoreConfig)
if err != nil {
return err
}
return h.send(&pgproto3.ErrorResponse{
Message: msg,
})
}

callback := h.spoolRowsCallback(statement.Tag, &rowsAffected, false)
Expand Down Expand Up @@ -1221,7 +1229,7 @@ func (h *ConnectionHandler) convertQuery(query string, modifiers ...QueryModifie
}}, nil
}

// Check if the query is a backup query, and if so, parse it as a backup query.
// Check if the query is a backup/restore query, and if so, parse it as a backup/restore query.
backupConfig, err := parseBackupSQL(query)
if backupConfig != nil && err == nil {
return []ConvertedStatement{{
Expand All @@ -1230,6 +1238,14 @@ func (h *ConnectionHandler) convertQuery(query string, modifiers ...QueryModifie
BackupConfig: backupConfig,
}}, nil
}
restoreConfig, err := parseRestoreSQL(query)
if restoreConfig != nil && err == nil {
return []ConvertedStatement{{
String: query,
PgParsable: true,
RestoreConfig: restoreConfig,
}}, nil
}

stmts, err := parser.Parse(query)
if err != nil {
Expand Down
104 changes: 90 additions & 14 deletions pgserver/restore_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,118 @@ package pgserver
import (
"fmt"
"github.com/apecloud/myduckserver/storage"
"os"
"path/filepath"
"regexp"
"strings"
)

// Since MyDuck Server currently supports only a single database (catalog),
// restore operations are performed only at startup. Once multiple databases
// are supported, we will implement restore as a SQL command.
// This file implements the logic for handling RESTORE SQL statements.
//
// Syntax:
// RESTORE DATABASE my_database FROM '<uri>'
// ENDPOINT = '<endpoint>'
// ACCESS_KEY_ID = '<access_key>'
// SECRET_ACCESS_KEY = '<secret_key>'
//
// Example Usage:
// RESTORE DATABASE my_database FROM 's3://my_bucket/my_database/'
// ENDPOINT = 's3.cn-northwest-1.amazonaws.com.cn'
// ACCESS_KEY_ID = 'xxxxxxxxxxxxx'
// SECRET_ACCESS_KEY = 'xxxxxxxxxxxx'

type RestoreConfig struct {
DbName string
RemoteFile string
StorageConfig *storage.ObjectStorageConfig
}

func NewRestoreConfig(dbName, remoteUri, endpoint, accessKeyId, secretAccessKey string) (*RestoreConfig, error) {
var restoreRegex = regexp.MustCompile(
`(?i)RESTORE\s+DATABASE\s+(\S+)\s+FROM\s+'(s3c?://[^']+)'` +
`(?:\s+ENDPOINT\s*=\s*'([^']+)')?` +
`(?:\s+ACCESS_KEY_ID\s*=\s*'([^']+)')?` +
`(?:\s+SECRET_ACCESS_KEY\s*=\s*'([^']+)')?`)

func NewRestoreConfig(dbName, remotePath string, storageConfig *storage.ObjectStorageConfig) *RestoreConfig {
return &RestoreConfig{
DbName: dbName,
RemoteFile: remotePath,
StorageConfig: storageConfig,
}
}

func parseRestoreSQL(sql string) (*RestoreConfig, error) {
matches := restoreRegex.FindStringSubmatch(sql)
if matches == nil {
// No match means the SQL doesn't follow the expected pattern
return nil, nil
}

// matches:
// [1] DbName
// [2] RemoteUri
// [3] Endpoint
// [4] AccessKeyId
// [5] SecretAccessKey
dbName := strings.TrimSpace(matches[1])
remoteUri := strings.TrimSpace(matches[2])
endpoint := strings.TrimSpace(matches[3])
accessKeyId := strings.TrimSpace(matches[4])
secretAccessKey := strings.TrimSpace(matches[5])

if dbName == "" {
return nil, fmt.Errorf("missing required restore configuration: DATABASE")
}
if remoteUri == "" {
return nil, fmt.Errorf("missing required restore configuration: TO '<URI>'")
}
if endpoint == "" {
return nil, fmt.Errorf("missing required restore configuration: ENDPOINT")
}
if accessKeyId == "" {
return nil, fmt.Errorf("missing required restore configuration: ACCESS_KEY_ID")
}
if secretAccessKey == "" {
return nil, fmt.Errorf("missing required restore configuration: SECRET_ACCESS_KEY")
}

storageConfig, remotePath, err := storage.ConstructStorageConfig(remoteUri, endpoint, accessKeyId, secretAccessKey)
if err != nil {
return nil, fmt.Errorf("failed to construct storage configuration for restore: %w", err)
}

if strings.HasSuffix(remotePath, "/") {
return nil, fmt.Errorf("remote path must be a file, not a directory")
}
return NewRestoreConfig(dbName, remotePath, storageConfig), nil
}

return &RestoreConfig{
DbName: dbName,
RemoteFile: remotePath,
StorageConfig: storageConfig,
}, nil
func (h *ConnectionHandler) executeRestore(restoreConfig *RestoreConfig) (string, error) {
provider := h.server.Provider
msg, err := restoreConfig.StorageConfig.DownloadFile(restoreConfig.RemoteFile, provider.DataDir(), restoreConfig.DbName+".db")
if err != nil {
return "", fmt.Errorf("failed to download file: %w", err)
}
dbFile := filepath.Join(provider.DataDir(), restoreConfig.DbName+".db")
// load dbFile as DirEntry
file, err := os.Stat(dbFile)
if err != nil {
return "", fmt.Errorf("failed to stat file: %w", err)
}
err = provider.AttachCatalog(file, false)
if err != nil {
return "", fmt.Errorf("failed to attach catalog: %w", err)
}
return msg, nil
}

// ExecuteRestore downloads the specified file from the remote storage and restores it to the specified local directory.
// Note that this should only be called at startup, as this function does not attach the restored database to the catalog.
func ExecuteRestore(dbName, localDir, localFile, remoteUri, endpoint, accessKeyId, secretAccessKey string) (string, error) {
config, err := NewRestoreConfig(dbName, remoteUri, endpoint, accessKeyId, secretAccessKey)
storageConfig, remotePath, err := storage.ConstructStorageConfig(remoteUri, endpoint, accessKeyId, secretAccessKey)
if err != nil {
return "", fmt.Errorf("failed to create restore configuration: %w", err)
return "", fmt.Errorf("failed to construct storage configuration for restore: %w", err)
}

config := NewRestoreConfig(dbName, remotePath, storageConfig)

msg, err := config.StorageConfig.DownloadFile(config.RemoteFile, localDir, localFile)
if err != nil {
return "", fmt.Errorf("failed to download file: %w", err)
Expand Down
1 change: 1 addition & 0 deletions pgserver/stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var wellKnownStatementTags = map[string]struct{}{
"EXECUTE": {},
"ATTACH": {},
"DETACH": {},
"RESTORE": {},
}

func IsWellKnownStatementTag(tag string) bool {
Expand Down

0 comments on commit 5e96845

Please sign in to comment.