diff --git a/pkg/scyllaclient/client_agent.go b/pkg/scyllaclient/client_agent.go index 67ccef330..f7b65df1f 100644 --- a/pkg/scyllaclient/client_agent.go +++ b/pkg/scyllaclient/client_agent.go @@ -254,6 +254,29 @@ func (ni *NodeInfo) SupportsSafeDescribeSchemaWithInternals() (SafeDescribeMetho return "", nil } +// SupportsScyllaBackupRestoreAPI returns whether node exposes backup/restore API +// that can be used instead of the Rclone API for backup/restore tasks. +func (ni *NodeInfo) SupportsScyllaBackupRestoreAPI() (bool, error) { + // Check master builds + if scyllaversion.MasterVersion(ni.ScyllaVersion) { + return true, nil + } + // Check OSS + supports, err := scyllaversion.CheckConstraint(ni.ScyllaVersion, ">= 6.3, < 2000") + if err != nil { + return false, errors.Errorf("Unsupported Scylla version: %s", ni.ScyllaVersion) + } + if supports { + return true, nil + } + // Check ENT + supports, err = scyllaversion.CheckConstraint(ni.ScyllaVersion, ">= 2024.3") + if err != nil { + return false, errors.Errorf("Unsupported Scylla version: %s", ni.ScyllaVersion) + } + return supports, nil +} + // FreeOSMemory calls debug.FreeOSMemory on the agent to return memory to OS. func (c *Client) FreeOSMemory(ctx context.Context, host string) error { p := operations.FreeOSMemoryParams{ diff --git a/pkg/service/backup/service.go b/pkg/service/backup/service.go index 048fb0927..f8ca0bf3e 100644 --- a/pkg/service/backup/service.go +++ b/pkg/service/backup/service.go @@ -677,6 +677,11 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID return errors.Wrap(err, "invalid cluster") } + nodeConfig, err := s.configCache.ReadAll(clusterID) + if err != nil { + return errors.Wrap(err, "read all nodes config") + } + // Create a worker w := &worker{ workerTools: workerTools{ @@ -687,6 +692,7 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID SnapshotTag: run.SnapshotTag, Config: s.config, Client: client, + NodeConfig: nodeConfig, }, PrevStage: run.Stage, Metrics: s.metrics, diff --git a/pkg/service/backup/worker.go b/pkg/service/backup/worker.go index 3555f8041..a5acbd025 100644 --- a/pkg/service/backup/worker.go +++ b/pkg/service/backup/worker.go @@ -12,6 +12,7 @@ import ( "github.com/scylladb/scylla-manager/v3/pkg/metrics" "github.com/scylladb/scylla-manager/v3/pkg/scyllaclient" . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" + "github.com/scylladb/scylla-manager/v3/pkg/service/configcache" "github.com/scylladb/scylla-manager/v3/pkg/util/parallel" "github.com/scylladb/scylla-manager/v3/pkg/util/uuid" ) @@ -42,6 +43,9 @@ type snapshotDir struct { SkippedBytesOffset int64 NewFilesSize int64 + // willCreateVersioned is set to true when uploading the snapshot directory after + // the deduplication results in creating versioned SSTables. + willCreateVersioned bool } func (sd snapshotDir) String() string { @@ -58,6 +62,7 @@ type workerTools struct { SnapshotTag string Config Config Client *scyllaclient.Client + NodeConfig map[string]configcache.NodeConfig Logger log.Logger } diff --git a/pkg/service/backup/worker_deduplicate.go b/pkg/service/backup/worker_deduplicate.go index e3b5fdc2a..23d390bc2 100644 --- a/pkg/service/backup/worker_deduplicate.go +++ b/pkg/service/backup/worker_deduplicate.go @@ -82,10 +82,11 @@ func (w *worker) deduplicateHost(ctx context.Context, h hostInfo) error { } deduplicatedUUIDSSTables := w.deduplicateUUIDSStables(remoteSSTableBundles, localSSTableBundles) - deduplicatedIntSSTables, err := w.deduplicateIntSSTables(ctx, h.IP, dataDst, d.Path, remoteSSTableBundles, localSSTableBundles) + deduplicatedIntSSTables, willCreateVersioned, err := w.deduplicateIntSSTables(ctx, h.IP, dataDst, d.Path, remoteSSTableBundles, localSSTableBundles) if err != nil { return errors.Wrap(err, "deduplication based on .crc32 content") } + d.willCreateVersioned = willCreateVersioned deduplicated := make([]string, 0, len(deduplicatedUUIDSSTables)+len(deduplicatedIntSSTables)) var totalSkipped int64 @@ -134,9 +135,10 @@ func (w *worker) deduplicateUUIDSStables(remoteSSTables, localSSTables *sstableB return deduplicated } +// willCreateVersioned corresponds to snapshotDir.willCreateVersioned. func (w *worker) deduplicateIntSSTables(ctx context.Context, host string, remoteDir, localDir string, remoteSSTables, localSSTables *sstableBundlesByID, -) (deduplicated []fileInfo, err error) { +) (deduplicated []fileInfo, willCreateVersioned bool, err error) { // Reference to SSTables 3.0 Data File Format // https://opensource.docs.scylladb.com/stable/architecture/sstable/sstable3/sstables-3-data-file-format.html @@ -144,38 +146,44 @@ func (w *worker) deduplicateIntSSTables(ctx context.Context, host string, remote // to the remote -Digest.crc32 content. // The same content implies that SSTable can be deduplicated and removed from local directory. for id, localBundle := range localSSTables.intID { + remoteBundle, ok := remoteSSTables.intID[id] + if !ok { + continue + } + // At this point analyzed SSTable ID is present in both local and remote dirs. + // Not being able to deduplicate it results in setting 'willCreateVersioned' to true. crc32Idx := slices.IndexFunc(localBundle, func(fi fileInfo) bool { return strings.HasSuffix(fi.Name, "Digest.crc32") }) if crc32Idx == -1 { + willCreateVersioned = true continue } crc32FileName := localBundle[crc32Idx].Name - remoteBundle, ok := remoteSSTables.intID[id] - if !ok { - continue - } if !isSSTableBundleSizeEqual(localBundle, remoteBundle) { + willCreateVersioned = true continue } remoteCRC32Path := path.Join(remoteDir, crc32FileName) remoteCRC32, err := w.Client.RcloneCat(ctx, host, remoteCRC32Path) if err != nil { - return deduplicated, errors.Wrapf(err, "get content of remote CRC32 %s", remoteCRC32Path) + return nil, true, errors.Wrapf(err, "get content of remote CRC32 %s", remoteCRC32Path) } localCRC32Path := path.Join(localDir, crc32FileName) localCRC32, err := w.Client.RcloneCat(ctx, host, localCRC32Path) if err != nil { - return deduplicated, errors.Wrapf(err, "get content of local CRC32 %s", localCRC32Path) + return nil, true, errors.Wrapf(err, "get content of local CRC32 %s", localCRC32Path) } if bytes.Equal(localCRC32, remoteCRC32) { deduplicated = append(deduplicated, localBundle...) + } else { + willCreateVersioned = true } } - return deduplicated, nil + return deduplicated, willCreateVersioned, nil } type sstableBundlesByID struct { diff --git a/pkg/service/backup/worker_scylla_upload.go b/pkg/service/backup/worker_scylla_upload.go new file mode 100644 index 000000000..426353a85 --- /dev/null +++ b/pkg/service/backup/worker_scylla_upload.go @@ -0,0 +1,29 @@ +// Copyright (C) 2024 ScyllaDB + +package backup + +import ( + "slices" + + . "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec" +) + +// Decides whether we should use Scylla backup API for uploading the files. +func (w *worker) useScyllaBackupAPI(d snapshotDir, hi hostInfo) (bool, error) { + // Scylla backup API does not handle creation of versioned files. + if d.willCreateVersioned { + return false, nil + } + // List of object storage providers supported by Scylla backup API. + scyllaSupportedProviders := []Provider{ + S3, + } + if !slices.Contains(scyllaSupportedProviders, hi.Location.Provider) { + return false, nil + } + nc, ok := w.NodeConfig[hi.IP] + if !ok { + return false, errors.Errorf("no config for node %s", hi.IP) + } + return nc.SupportsScyllaBackupRestoreAPI() +}