Skip to content

Commit

Permalink
feat(backup): check when Scylla backup API can be used
Browse files Browse the repository at this point in the history
Scylla backup API can be used when:
- node exposes Scylla backup API
- s3 is the used provider
- backup won't create versioned files
  • Loading branch information
Michal-Leszczynski committed Dec 17, 2024
1 parent 46c246d commit 1229264
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 9 deletions.
23 changes: 23 additions & 0 deletions pkg/scyllaclient/client_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,29 @@ func (ni *NodeInfo) SupportsSafeDescribeSchemaWithInternals() (SafeDescribeMetho
return "", nil
}

// SupportsScyllaBackupRestoreAPI returns whether node exposes backup/restore API
// that can be used instead of the Rclone API for backup/restore tasks.
func (ni *NodeInfo) SupportsScyllaBackupRestoreAPI() (bool, error) {
// Check master builds
if scyllaversion.MasterVersion(ni.ScyllaVersion) {
return true, nil
}
// Check OSS
supports, err := scyllaversion.CheckConstraint(ni.ScyllaVersion, ">= 6.3, < 2000")
if err != nil {
return false, errors.Errorf("Unsupported Scylla version: %s", ni.ScyllaVersion)
}
if supports {
return true, nil
}
// Check ENT
supports, err = scyllaversion.CheckConstraint(ni.ScyllaVersion, ">= 2024.3")
if err != nil {
return false, errors.Errorf("Unsupported Scylla version: %s", ni.ScyllaVersion)
}
return supports, nil
}

// FreeOSMemory calls debug.FreeOSMemory on the agent to return memory to OS.
func (c *Client) FreeOSMemory(ctx context.Context, host string) error {
p := operations.FreeOSMemoryParams{
Expand Down
6 changes: 6 additions & 0 deletions pkg/service/backup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,11 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID
return errors.Wrap(err, "invalid cluster")
}

nodeConfig, err := s.configCache.ReadAll(clusterID)
if err != nil {
return errors.Wrap(err, "read all nodes config")
}

// Create a worker
w := &worker{
workerTools: workerTools{
Expand All @@ -687,6 +692,7 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID
SnapshotTag: run.SnapshotTag,
Config: s.config,
Client: client,
NodeConfig: nodeConfig,
},
PrevStage: run.Stage,
Metrics: s.metrics,
Expand Down
5 changes: 5 additions & 0 deletions pkg/service/backup/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/metrics"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/service/configcache"
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
"github.com/scylladb/scylla-manager/v3/pkg/util/uuid"
)
Expand Down Expand Up @@ -42,6 +43,9 @@ type snapshotDir struct {

SkippedBytesOffset int64
NewFilesSize int64
// willCreateVersioned is set to true when uploading the snapshot directory after
// the deduplication results in creating versioned SSTables.
willCreateVersioned bool
}

func (sd snapshotDir) String() string {
Expand All @@ -58,6 +62,7 @@ type workerTools struct {
SnapshotTag string
Config Config
Client *scyllaclient.Client
NodeConfig map[string]configcache.NodeConfig
Logger log.Logger
}

Expand Down
26 changes: 17 additions & 9 deletions pkg/service/backup/worker_deduplicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ func (w *worker) deduplicateHost(ctx context.Context, h hostInfo) error {
}

deduplicatedUUIDSSTables := w.deduplicateUUIDSStables(remoteSSTableBundles, localSSTableBundles)
deduplicatedIntSSTables, err := w.deduplicateIntSSTables(ctx, h.IP, dataDst, d.Path, remoteSSTableBundles, localSSTableBundles)
deduplicatedIntSSTables, willCreateVersioned, err := w.deduplicateIntSSTables(ctx, h.IP, dataDst, d.Path, remoteSSTableBundles, localSSTableBundles)
if err != nil {
return errors.Wrap(err, "deduplication based on .crc32 content")
}
d.willCreateVersioned = willCreateVersioned
deduplicated := make([]string, 0, len(deduplicatedUUIDSSTables)+len(deduplicatedIntSSTables))

var totalSkipped int64
Expand Down Expand Up @@ -134,48 +135,55 @@ func (w *worker) deduplicateUUIDSStables(remoteSSTables, localSSTables *sstableB
return deduplicated
}

// willCreateVersioned corresponds to snapshotDir.willCreateVersioned.
func (w *worker) deduplicateIntSSTables(ctx context.Context, host string, remoteDir, localDir string,
remoteSSTables, localSSTables *sstableBundlesByID,
) (deduplicated []fileInfo, err error) {
) (deduplicated []fileInfo, willCreateVersioned bool, err error) {
// Reference to SSTables 3.0 Data File Format
// https://opensource.docs.scylladb.com/stable/architecture/sstable/sstable3/sstables-3-data-file-format.html

// Per every SSTable files group, compare local <ID>-Digest.crc32 content
// to the remote <ID>-Digest.crc32 content.
// The same content implies that SSTable can be deduplicated and removed from local directory.
for id, localBundle := range localSSTables.intID {
remoteBundle, ok := remoteSSTables.intID[id]
if !ok {
continue
}
// At this point analyzed SSTable ID is present in both local and remote dirs.
// Not being able to deduplicate it results in setting 'willCreateVersioned' to true.
crc32Idx := slices.IndexFunc(localBundle, func(fi fileInfo) bool {
return strings.HasSuffix(fi.Name, "Digest.crc32")
})
if crc32Idx == -1 {
willCreateVersioned = true
continue
}
crc32FileName := localBundle[crc32Idx].Name
remoteBundle, ok := remoteSSTables.intID[id]
if !ok {
continue
}
if !isSSTableBundleSizeEqual(localBundle, remoteBundle) {
willCreateVersioned = true
continue
}

remoteCRC32Path := path.Join(remoteDir, crc32FileName)
remoteCRC32, err := w.Client.RcloneCat(ctx, host, remoteCRC32Path)
if err != nil {
return deduplicated, errors.Wrapf(err, "get content of remote CRC32 %s", remoteCRC32Path)
return nil, true, errors.Wrapf(err, "get content of remote CRC32 %s", remoteCRC32Path)
}

localCRC32Path := path.Join(localDir, crc32FileName)
localCRC32, err := w.Client.RcloneCat(ctx, host, localCRC32Path)
if err != nil {
return deduplicated, errors.Wrapf(err, "get content of local CRC32 %s", localCRC32Path)
return nil, true, errors.Wrapf(err, "get content of local CRC32 %s", localCRC32Path)
}

if bytes.Equal(localCRC32, remoteCRC32) {
deduplicated = append(deduplicated, localBundle...)
} else {
willCreateVersioned = true
}
}
return deduplicated, nil
return deduplicated, willCreateVersioned, nil
}

type sstableBundlesByID struct {
Expand Down
29 changes: 29 additions & 0 deletions pkg/service/backup/worker_scylla_upload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (C) 2024 ScyllaDB

package backup

import (
"slices"

. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
)

// Decides whether we should use Scylla backup API for uploading the files.
func (w *worker) useScyllaBackupAPI(d snapshotDir, hi hostInfo) (bool, error) {
// Scylla backup API does not handle creation of versioned files.
if d.willCreateVersioned {
return false, nil
}
// List of object storage providers supported by Scylla backup API.
scyllaSupportedProviders := []Provider{
S3,
}
if !slices.Contains(scyllaSupportedProviders, hi.Location.Provider) {
return false, nil
}
nc, ok := w.NodeConfig[hi.IP]
if !ok {
return false, errors.Errorf("no config for node %s", hi.IP)
}
return nc.SupportsScyllaBackupRestoreAPI()
}

0 comments on commit 1229264

Please sign in to comment.