diff --git a/pkg/cli/cmds/etcd_snapshot.go b/pkg/cli/cmds/etcd_snapshot.go index 59585a515f6d..86dcbc0bd96a 100644 --- a/pkg/cli/cmds/etcd_snapshot.go +++ b/pkg/cli/cmds/etcd_snapshot.go @@ -25,6 +25,11 @@ var EtcdSnapshotFlags = []cli.Flag{ Usage: "(data) Folder to hold state default /var/lib/rancher/" + version.Program + " or ${HOME}/.rancher/" + version.Program + " if not root", Destination: &ServerConfig.DataDir, }, + &cli.StringFlag{ + Name: "dir,etcd-snapshot-dir", + Usage: "(db) Directory to save etcd on-demand snapshot. (default: ${data-dir}/db/snapshots)", + Destination: &ServerConfig.EtcdSnapshotDir, + }, &cli.StringFlag{ Name: "name", Usage: "(db) Set the base name of the etcd on-demand snapshot (appended with UNIX timestamp).", @@ -101,11 +106,7 @@ func NewEtcdSnapshotCommand(action func(*cli.Context) error, subcommands []cli.C SkipArgReorder: true, Action: action, Subcommands: subcommands, - Flags: append(EtcdSnapshotFlags, &cli.StringFlag{ - Name: "dir,etcd-snapshot-dir", - Usage: "(db) Directory to save etcd on-demand snapshot. (default: ${data-dir}/db/snapshots)", - Destination: &ServerConfig.EtcdSnapshotDir, - }), + Flags: EtcdSnapshotFlags, } } @@ -130,7 +131,7 @@ func NewEtcdSnapshotSubcommands(delete, list, prune, save func(ctx *cli.Context) }, { Name: "prune", - Usage: "Remove snapshots that exceed the configured retention count", + Usage: "Remove snapshots that match the name prefix that exceed the configured retention count", SkipFlagParsing: false, SkipArgReorder: true, Action: prune, @@ -147,11 +148,7 @@ func NewEtcdSnapshotSubcommands(delete, list, prune, save func(ctx *cli.Context) SkipFlagParsing: false, SkipArgReorder: true, Action: save, - Flags: append(EtcdSnapshotFlags, &cli.StringFlag{ - Name: "dir", - Usage: "(db) Directory to save etcd on-demand snapshot. (default: ${data-dir}/db/snapshots)", - Destination: &ServerConfig.EtcdSnapshotDir, - }), + Flags: EtcdSnapshotFlags, }, } } diff --git a/pkg/cli/etcdsnapshot/etcd_snapshot.go b/pkg/cli/etcdsnapshot/etcd_snapshot.go index 8586ab0a5b6b..ccdc72d67a26 100644 --- a/pkg/cli/etcdsnapshot/etcd_snapshot.go +++ b/pkg/cli/etcdsnapshot/etcd_snapshot.go @@ -173,11 +173,19 @@ func list(app *cli.Context, cfg *cmds.Server) error { w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', 0) defer w.Flush() - for _, s := range sf { - if cfg.EtcdS3 { - fmt.Fprintf(w, "%s\t%d\t%s\n", s.Name, s.Size, s.CreatedAt.Format(time.RFC3339)) - } else { - fmt.Fprintf(w, "%s\t%s\t%d\t%s\n", s.Name, s.Location, s.Size, s.CreatedAt.Format(time.RFC3339)) + if cfg.EtcdS3 { + fmt.Fprint(w, "Name\tSize\tCreated\n") + for _, s := range sf { + if s.NodeName == "s3" { + fmt.Fprintf(w, "%s\t%d\t%s\n", s.Name, s.Size, s.CreatedAt.Format(time.RFC3339)) + } + } + } else { + fmt.Fprint(w, "Name\tLocation\tSize\tCreated\n") + for _, s := range sf { + if s.NodeName != "s3" { + fmt.Fprintf(w, "%s\t%s\t%d\t%s\n", s.Name, s.Location, s.Size, s.CreatedAt.Format(time.RFC3339)) + } } } @@ -201,10 +209,17 @@ func prune(app *cli.Context, cfg *cmds.Server) error { serverConfig.ControlConfig.DataDir = dataDir serverConfig.ControlConfig.EtcdSnapshotRetention = cfg.EtcdSnapshotRetention + serverConfig.ControlConfig.Runtime.KubeConfigAdmin = filepath.Join(dataDir, "cred", "admin.kubeconfig") ctx := signals.SetupSignalContext() e := etcd.NewETCD() e.SetControlConfig(&serverConfig.ControlConfig) + sc, err := server.NewContext(ctx, serverConfig.ControlConfig.Runtime.KubeConfigAdmin) + if err != nil { + return err + } + serverConfig.ControlConfig.Runtime.Core = sc.Core + return e.PruneSnapshots(ctx) } diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index ad1212146856..3fd9250e97a8 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -104,7 +104,7 @@ func (c *Cluster) Start(ctx context.Context) (<-chan struct{}, error) { } if !c.config.EtcdDisableSnapshots { - if err := c.managedDB.StoreSnapshotData(ctx); err != nil { + if err := c.managedDB.ReconcileSnapshotData(ctx); err != nil { logrus.Errorf("Failed to record snapshots for cluster: %v", err) } } diff --git a/pkg/cluster/managed/drivers.go b/pkg/cluster/managed/drivers.go index 28528b171c01..46df7bc5096c 100644 --- a/pkg/cluster/managed/drivers.go +++ b/pkg/cluster/managed/drivers.go @@ -22,7 +22,7 @@ type Driver interface { Restore(ctx context.Context) error EndpointName() string Snapshot(ctx context.Context, config *config.Control) error - StoreSnapshotData(ctx context.Context) error + ReconcileSnapshotData(ctx context.Context) error GetMembersClientURLs(ctx context.Context) ([]string, error) RemoveSelf(ctx context.Context) error } diff --git a/pkg/etcd/etcd.go b/pkg/etcd/etcd.go index 255adbb4af02..326b63e42654 100644 --- a/pkg/etcd/etcd.go +++ b/pkg/etcd/etcd.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/tls" + "encoding/base64" "encoding/json" "fmt" "io/ioutil" @@ -68,7 +69,8 @@ var ( // AddressKey will contain the value of api addresses list AddressKey = version.Program + "/apiaddresses" - snapshotConfigMapName = version.Program + "-etcd-snapshots" + snapshotExtraMetadataConfigMapName = version.Program + "-etcd-snapshot-extra-metadata" + snapshotConfigMapName = version.Program + "-etcd-snapshots" NodeNameAnnotation = "etcd." + version.Program + ".cattle.io/node-name" NodeAddressAnnotation = "etcd." + version.Program + ".cattle.io/node-address" @@ -921,7 +923,7 @@ func (e *ETCD) preSnapshotSetup(ctx context.Context, config *config.Control) err return nil } -// Snapshot attempts to save a new snapshot to the configured directory, and then clean up any old +// Snapshot attempts to save a new snapshot to the configured directory, and then clean up any old and failed // snapshots in excess of the retention limits. This method is used in the internal cron snapshot // system as well as used to do on-demand snapshots. func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error { @@ -929,13 +931,29 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error { return err } + logrus.Debugf("Attempting to retrieve extra metadata from %s ConfigMap", snapshotExtraMetadataConfigMapName) + var extraMetadata string + if snapshotExtraMetadataConfigMap, err := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotExtraMetadataConfigMapName, metav1.GetOptions{}); err != nil { + logrus.Debugf("Error encountered attempting to retrieve extra metadata from %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err) + extraMetadata = "" + } else { + if m, err := json.Marshal(snapshotExtraMetadataConfigMap.Data); err != nil { + logrus.Debugf("Error attempting to marshal extra metadata contained in %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err) + extraMetadata = "" + } else { + logrus.Debugf("Setting extra metadata from %s ConfigMap", snapshotExtraMetadataConfigMapName) + logrus.Tracef("Marshalled extra metadata in %s ConfigMap was: %s", snapshotExtraMetadataConfigMapName, string(m)) + extraMetadata = base64.StdEncoding.EncodeToString(m) + } + } + status, err := e.client.Status(ctx, endpoint) if err != nil { return errors.Wrap(err, "failed to check etcd status for snapshot") } if status.IsLearner { - logrus.Warnf("Skipping snapshot: not supported for learner") + logrus.Warnf("Unable to take snapshot: not supported for learner") return nil } @@ -950,40 +968,104 @@ func (e *ETCD) Snapshot(ctx context.Context, config *config.Control) error { } nodeName := os.Getenv("NODE_NAME") - snapshotName := fmt.Sprintf("%s-%s-%d", e.config.EtcdSnapshotName, nodeName, time.Now().Unix()) + now := time.Now() + snapshotName := fmt.Sprintf("%s-%s-%d", e.config.EtcdSnapshotName, nodeName, now.Unix()) snapshotPath := filepath.Join(snapshotDir, snapshotName) logrus.Infof("Saving etcd snapshot to %s", snapshotPath) + var sf *SnapshotFile + if err := snapshot.NewV3(nil).Save(ctx, *cfg, snapshotPath); err != nil { - return errors.Wrap(err, "failed to save snapshot") + sf = &SnapshotFile{ + Name: snapshotName, + Location: "", + Metadata: extraMetadata, + NodeName: nodeName, + CreatedAt: &metav1.Time{ + Time: now, + }, + Status: FailedSnapshotStatus, + Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), + Size: 0, + } + logrus.Errorf("Failed to take etcd snapshot: %v", err) + if err := e.addSnapshotData(*sf); err != nil { + return errors.Wrap(err, "failed to save local snapshot failure data to configmap") + } } - if e.config.EtcdS3 { - logrus.Infof("Saving etcd snapshot %s to S3", snapshotName) - if err := e.initS3IfNil(ctx); err != nil { - return err + // If the snapshot attempt was successful, sf will be nil as we did not set it. + if sf == nil { + f, err := os.Stat(snapshotPath) + if err != nil { + return errors.Wrap(err, "unable to retrieve snapshot information from local snapshot") } - if err := e.s3.upload(ctx, snapshotPath); err != nil { - return err + sf = &SnapshotFile{ + Name: f.Name(), + Metadata: extraMetadata, + Location: "file://" + snapshotPath, + NodeName: nodeName, + CreatedAt: &metav1.Time{ + Time: f.ModTime(), + }, + Status: SuccessfulSnapshotStatus, + Size: f.Size(), } - logrus.Infof("S3 upload complete for %s", snapshotName) - if e.config.EtcdSnapshotRetention >= 1 { - if err := e.s3.snapshotRetention(ctx); err != nil { - return errors.Wrap(err, "failed to apply s3 snapshot retention") - } + if err := e.addSnapshotData(*sf); err != nil { + return errors.Wrap(err, "failed to save local snapshot data to configmap") } - } - // check if we need to perform a retention check - if e.config.EtcdSnapshotRetention >= 1 { if err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir); err != nil { - return errors.Wrap(err, "failed to apply snapshot retention") + return errors.Wrap(err, "failed to apply local snapshot retention policy") + } + + if e.config.EtcdS3 { + logrus.Infof("Saving etcd snapshot %s to S3", snapshotName) + // Set sf to nil so that we can attempt to now upload the snapshot to S3 if needed + sf = nil + if err := e.initS3IfNil(ctx); err != nil { + logrus.Warnf("Unable to initialize S3 client: %v", err) + sf = &SnapshotFile{ + Name: filepath.Base(snapshotPath), + Metadata: extraMetadata, + NodeName: "s3", + CreatedAt: &metav1.Time{ + Time: now, + }, + Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), + Size: 0, + Status: FailedSnapshotStatus, + S3: &s3Config{ + Endpoint: e.config.EtcdS3Endpoint, + EndpointCA: e.config.EtcdS3EndpointCA, + SkipSSLVerify: e.config.EtcdS3SkipSSLVerify, + Bucket: e.config.EtcdS3BucketName, + Region: e.config.EtcdS3Region, + Folder: e.config.EtcdS3Folder, + Insecure: e.config.EtcdS3Insecure, + }, + } + } + // sf should be nil if we were able to successfully initialize the S3 client. + if sf == nil { + sf, err = e.s3.upload(ctx, snapshotPath, extraMetadata, now) + if err != nil { + return err + } + logrus.Infof("S3 upload complete for %s", snapshotName) + if err := e.s3.snapshotRetention(ctx); err != nil { + return errors.Wrap(err, "failed to apply s3 snapshot retention policy") + } + } + if err := e.addSnapshotData(*sf); err != nil { + return errors.Wrap(err, "failed to save snapshot data to configmap") + } } } - return e.StoreSnapshotData(ctx) + return e.ReconcileSnapshotData(ctx) } type s3Config struct { @@ -996,24 +1078,67 @@ type s3Config struct { Insecure bool `json:"insecure,omitempty"` } +type SnapshotStatus string + +const SuccessfulSnapshotStatus SnapshotStatus = "successful" +const FailedSnapshotStatus SnapshotStatus = "failed" + // SnapshotFile represents a single snapshot and it's // metadata. type SnapshotFile struct { Name string `json:"name"` // Location contains the full path of the snapshot. For // local paths, the location will be prefixed with "file://". - Location string `json:"location,omitempty"` - NodeName string `json:"nodeName,omitempty"` - CreatedAt *metav1.Time `json:"createdAt,omitempty"` - Size int64 `json:"size,omitempty"` - S3 *s3Config `json:"s3Config,omitempty"` + Location string `json:"location,omitempty"` + Metadata string `json:"metadata,omitempty"` + Message string `json:"message,omitempty"` + NodeName string `json:"nodeName,omitempty"` + CreatedAt *metav1.Time `json:"createdAt,omitempty"` + Size int64 `json:"size,omitempty"` + Status SnapshotStatus `json:"status,omitempty"` + S3 *s3Config `json:"s3Config,omitempty"` +} + +// listLocalSnapshots provides a list of the currently stored +// snapshots on disk along with their relevant +// metadata. +func (e *ETCD) listLocalSnapshots() (map[string]SnapshotFile, error) { + snapshots := make(map[string]SnapshotFile) + snapshotDir, err := snapshotDir(e.config, true) + if err != nil { + return snapshots, errors.Wrap(err, "failed to get the snapshot dir") + } + + files, err := ioutil.ReadDir(snapshotDir) + if err != nil { + return nil, err + } + + nodeName := os.Getenv("NODE_NAME") + + for _, f := range files { + sf := SnapshotFile{ + Name: f.Name(), + Location: "file://" + filepath.Join(snapshotDir, f.Name()), + NodeName: nodeName, + CreatedAt: &metav1.Time{ + Time: f.ModTime(), + }, + Size: f.Size(), + Status: SuccessfulSnapshotStatus, + } + sfKey := generateSnapshotConfigMapKey(sf) + snapshots[sfKey] = sf + } + + return snapshots, nil } -// listSnapshots provides a list of the currently stored -// snapshots on disk or in S3 along with their relevant +// listS3Snapshots provides a list of currently stored +// snapshots in S3 along with their relevant // metadata. -func (e *ETCD) listSnapshots(ctx context.Context, snapshotDir string) ([]SnapshotFile, error) { - var snapshots []SnapshotFile +func (e *ETCD) listS3Snapshots(ctx context.Context) (map[string]SnapshotFile, error) { + snapshots := make(map[string]SnapshotFile) if e.config.EtcdS3 { ctx, cancel := context.WithCancel(ctx) @@ -1046,7 +1171,7 @@ func (e *ETCD) listSnapshots(ctx context.Context, snapshotDir string) ([]Snapsho return nil, err } - snapshots = append(snapshots, SnapshotFile{ + sf := SnapshotFile{ Name: filepath.Base(obj.Key), NodeName: "s3", CreatedAt: &metav1.Time{ @@ -1062,31 +1187,12 @@ func (e *ETCD) listSnapshots(ctx context.Context, snapshotDir string) ([]Snapsho Folder: e.config.EtcdS3Folder, Insecure: e.config.EtcdS3Insecure, }, - }) + Status: SuccessfulSnapshotStatus, + } + sfKey := generateSnapshotConfigMapKey(sf) + snapshots[sfKey] = sf } - - return snapshots, nil } - - files, err := ioutil.ReadDir(snapshotDir) - if err != nil { - return nil, err - } - - nodeName := os.Getenv("NODE_NAME") - - for _, f := range files { - snapshots = append(snapshots, SnapshotFile{ - Name: f.Name(), - Location: "file://" + filepath.Join(snapshotDir, f.Name()), - NodeName: nodeName, - CreatedAt: &metav1.Time{ - Time: f.ModTime(), - }, - Size: f.Size(), - }) - } - return snapshots, nil } @@ -1104,34 +1210,37 @@ func (e *ETCD) initS3IfNil(ctx context.Context) error { return nil } -// PruneSnapshots perfrorms a retention run with the given +// PruneSnapshots performs a retention run with the given // retention duration and removes expired snapshots. func (e *ETCD) PruneSnapshots(ctx context.Context) error { snapshotDir, err := snapshotDir(e.config, false) if err != nil { return errors.Wrap(err, "failed to get the snapshot dir") } + if err := snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir); err != nil { + logrus.Errorf("Error applying snapshot retention policy: %v", err) + } if e.config.EtcdS3 { - if e.initS3IfNil(ctx); err != nil { - return err + if err := e.initS3IfNil(ctx); err != nil { + logrus.Warnf("Unable to initialize S3 client during prune: %v", err) + } else { + if err := e.s3.snapshotRetention(ctx); err != nil { + logrus.Errorf("Error applying S3 snapshot retention policy: %v", err) + } } - - return e.s3.snapshotRetention(ctx) } - return snapshotRetention(e.config.EtcdSnapshotRetention, e.config.EtcdSnapshotName, snapshotDir) + return e.ReconcileSnapshotData(ctx) } // ListSnapshots is an exported wrapper method that wraps an // unexported method of the same name. -func (e *ETCD) ListSnapshots(ctx context.Context) ([]SnapshotFile, error) { - snapshotDir, err := snapshotDir(e.config, false) - if err != nil { - return nil, errors.Wrap(err, "failed to get the snapshot dir") +func (e *ETCD) ListSnapshots(ctx context.Context) (map[string]SnapshotFile, error) { + if e.config.EtcdS3 { + return e.listS3Snapshots(ctx) } - - return e.listSnapshots(ctx, snapshotDir) + return e.listLocalSnapshots() } // deleteSnapshots removes the given snapshots from @@ -1183,7 +1292,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { select { case <-ctx.Done(): logrus.Errorf("Unable to delete snapshot: %v", ctx.Err()) - return e.StoreSnapshotData(ctx) + return e.ReconcileSnapshotData(ctx) case <-time.After(time.Millisecond * 100): continue case err, ok := <-e.s3.client.RemoveObjects(ctx, e.config.EtcdS3BucketName, objectsCh, minio.RemoveObjectsOptions{}): @@ -1191,7 +1300,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { logrus.Errorf("Unable to delete snapshot: %v", err.Err) } if !ok { - return e.StoreSnapshotData(ctx) + return e.ReconcileSnapshotData(ctx) } } } @@ -1214,59 +1323,33 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { logrus.Debug("Removed snapshot ", s) } - return e.StoreSnapshotData(ctx) -} - -// updateSnapshotData populates the given map with the contents of the given slice. -func updateSnapshotData(data map[string]string, snapshotFiles []SnapshotFile) error { - for _, v := range snapshotFiles { - b, err := json.Marshal(v) - if err != nil { - return err - } - data[v.Name] = string(b) - } - - return nil + return e.ReconcileSnapshotData(ctx) } -// StoreSnapshotData stores the given snapshot data in the "snapshots" ConfigMap. -func (e *ETCD) StoreSnapshotData(ctx context.Context) error { - logrus.Infof("Saving current etcd snapshot set to %s ConfigMap", snapshotConfigMapName) - - snapshotDir, err := snapshotDir(e.config, true) - if err != nil { - return errors.Wrap(err, "failed to get the snapshot dir") - } - +// AddSnapshotData adds the given snapshot file information to the snapshot configmap, using the existing extra metadata +// available at the time. +func (e *ETCD) addSnapshotData(sf SnapshotFile) error { return retry.OnError(retry.DefaultBackoff, func(err error) bool { return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) }, func() error { - // make sure the core.Factory is initialize. There can + // make sure the core.Factory is initialized. There can // be a race between this core code startup. for e.config.Runtime.Core == nil { runtime.Gosched() } - snapshotConfigMap, getErr := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) - snapshotFiles, err := e.listSnapshots(ctx, snapshotDir) + marshalledSnapshotFile, err := json.Marshal(sf) if err != nil { return err } - - data := make(map[string]string, len(snapshotFiles)) - if err := updateSnapshotData(data, snapshotFiles); err != nil { - return err - } - if apierrors.IsNotFound(getErr) { cm := v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: snapshotConfigMapName, Namespace: metav1.NamespaceSystem, }, - Data: data, + Data: map[string]string{sf.Name: string(marshalledSnapshotFile)}, } _, err := e.config.Runtime.Core.Core().V1().ConfigMap().Create(&cm) return err @@ -1276,24 +1359,179 @@ func (e *ETCD) StoreSnapshotData(ctx context.Context) error { snapshotConfigMap.Data = make(map[string]string) } + sfKey := generateSnapshotConfigMapKey(sf) + snapshotConfigMap.Data[sfKey] = string(marshalledSnapshotFile) + + _, err = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap) + return err + }) +} + +func generateSnapshotConfigMapKey(sf SnapshotFile) string { + var sfKey string + if sf.NodeName == "s3" { + sfKey = "s3-" + sf.Name + } else { + sfKey = "local-" + sf.Name + } + return sfKey +} + +// ReconcileSnapshotData reconciles snapshot data in the snapshot ConfigMap. +// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to list S3 snapshots +// and reconcile snapshots from S3. Notably, +func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { + logrus.Infof("Reconciling etcd snapshot data in %s ConfigMap", snapshotConfigMapName) + defer logrus.Infof("Reconciliation of snapshot data in %s ConfigMap complete", snapshotConfigMapName) + return retry.OnError(retry.DefaultBackoff, func(err error) bool { + return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) + }, func() error { + // make sure the core.Factory is initialize. There can + // be a race between this core code startup. + for e.config.Runtime.Core == nil { + runtime.Gosched() + } + + logrus.Debug("core.Factory is initialized") + + snapshotConfigMap, getErr := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) + if apierrors.IsNotFound(getErr) { + // Can't reconcile what doesn't exist. + return errors.New("No snapshot configmap found") + } + + logrus.Debugf("Attempting to reconcile etcd snapshot data for configmap generation %d", snapshotConfigMap.Generation) + + // if the snapshot config map data is nil, no need to reconcile. + if snapshotConfigMap.Data == nil { + return nil + } + + snapshotFiles, err := e.listLocalSnapshots() + if err != nil { + return err + } + + // s3ListSuccessful is set to true if we are successful at listing snapshots from S3 to eliminate accidental + // clobbering of S3 snapshots in the configmap due to misconfigured S3 credentials/details + s3ListSuccessful := false + + if e.config.EtcdS3 { + if s3Snapshots, err := e.listS3Snapshots(ctx); err != nil { + logrus.Errorf("error retrieving S3 snapshots for reconciliation: %v", err) + } else { + for k, v := range s3Snapshots { + snapshotFiles[k] = v + } + s3ListSuccessful = true + } + } + nodeName := os.Getenv("NODE_NAME") - // remove entries for this node only + // deletedSnapshots is a map[string]string where key is the configmap key and the value is the marshalled snapshot file + // it will be populated below with snapshots that are either from S3 or on the local node. Notably, deletedSnapshots will + // not contain snapshots that are in the "failed" status + deletedSnapshots := make(map[string]string) + // failedSnapshots is a slice of unmarshaled snapshot files sourced from the configmap + // These are stored unmarshaled so we can sort based on name. + var failedSnapshots []SnapshotFile + var failedS3Snapshots []SnapshotFile + + // remove entries for this node and s3 (if S3 is enabled) only for k, v := range snapshotConfigMap.Data { var sf SnapshotFile if err := json.Unmarshal([]byte(v), &sf); err != nil { return err } - if sf.NodeName == nodeName || sf.NodeName == "s3" { + if (sf.NodeName == nodeName || (sf.NodeName == "s3" && s3ListSuccessful)) && sf.Status != FailedSnapshotStatus { + // Only delete the snapshot if the snapshot was not failed + // sf.Status != FailedSnapshotStatus is intentional, as it is possible we are reconciling snapshots stored from older versions that did not set status + deletedSnapshots[generateSnapshotConfigMapKey(sf)] = v // store a copy of the snapshot + delete(snapshotConfigMap.Data, k) + } else if sf.Status == FailedSnapshotStatus && sf.NodeName == nodeName && e.config.EtcdSnapshotRetention >= 1 { + // Handle locally failed snapshots. + failedSnapshots = append(failedSnapshots, sf) + delete(snapshotConfigMap.Data, k) + } else if sf.Status == FailedSnapshotStatus && e.config.EtcdS3 && sf.NodeName == "s3" && strings.HasPrefix(sf.Name, e.config.EtcdSnapshotName+"-"+nodeName) && e.config.EtcdSnapshotRetention >= 1 { + // If we're operating against S3, we can clean up failed S3 snapshots that failed on this node. + failedS3Snapshots = append(failedS3Snapshots, sf) delete(snapshotConfigMap.Data, k) } } - // save this node's entries to the ConfigMap - for k, v := range data { - snapshotConfigMap.Data[k] = v + // Apply the failed snapshot retention policy to locally failed snapshots + if len(failedSnapshots) > 0 && e.config.EtcdSnapshotRetention >= 1 { + sort.Slice(failedSnapshots, func(i, j int) bool { + return failedSnapshots[i].Name > failedSnapshots[j].Name + }) + + var keepCount int + if e.config.EtcdSnapshotRetention >= len(failedSnapshots) { + keepCount = len(failedSnapshots) + } else { + keepCount = e.config.EtcdSnapshotRetention + } + for _, dfs := range failedSnapshots[:keepCount] { + sfKey := generateSnapshotConfigMapKey(dfs) + marshalledSnapshot, err := json.Marshal(dfs) + if err != nil { + logrus.Errorf("unable to marshal snapshot to store in configmap %v", err) + } else { + snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot) + } + } } + // Apply the failed snapshot retention policy to the S3 snapshots + if len(failedS3Snapshots) > 0 && e.config.EtcdSnapshotRetention >= 1 { + sort.Slice(failedS3Snapshots, func(i, j int) bool { + return failedS3Snapshots[i].Name > failedS3Snapshots[j].Name + }) + + var keepCount int + if e.config.EtcdSnapshotRetention >= len(failedS3Snapshots) { + keepCount = len(failedS3Snapshots) + } else { + keepCount = e.config.EtcdSnapshotRetention + } + for _, dfs := range failedS3Snapshots[:keepCount] { + sfKey := generateSnapshotConfigMapKey(dfs) + marshalledSnapshot, err := json.Marshal(dfs) + if err != nil { + logrus.Errorf("unable to marshal snapshot to store in configmap %v", err) + } else { + snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot) + } + } + } + + // save the local entries to the ConfigMap if they are still on disk or in S3. + for _, snapshot := range snapshotFiles { + var sf SnapshotFile + sfKey := generateSnapshotConfigMapKey(snapshot) + if v, ok := deletedSnapshots[sfKey]; ok { + // use the snapshot file we have from the existing configmap, and unmarshal it so we can manipulate it + if err := json.Unmarshal([]byte(v), &sf); err != nil { + logrus.Errorf("error unmarshaling snapshot file: %v", err) + // use the snapshot with info we sourced from disk/S3 (will be missing metadata, but something is better than nothing) + sf = snapshot + } + } else { + sf = snapshot + } + + sf.Status = SuccessfulSnapshotStatus // if the snapshot is on disk or in S3, it was successful. + + marshalledSnapshot, err := json.Marshal(sf) + if err != nil { + logrus.Warnf("unable to marshal snapshot metadata %s to store in configmap, received error: %v", sf.Name, err) + } else { + snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot) + } + } + + logrus.Debugf("Updating snapshot ConfigMap (%s) with %d entries", snapshotConfigMapName, len(snapshotConfigMap.Data)) _, err = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap) return err }) @@ -1339,7 +1577,12 @@ func (e *ETCD) Restore(ctx context.Context) error { // snapshotRetention iterates through the snapshots and removes the oldest // leaving the desired number of snapshots. func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) error { + if retention < 1 { + return nil + } + nodeName := os.Getenv("NODE_NAME") + logrus.Infof("Applying local snapshot retention policy: retention: %d, snapshotPrefix: %s, directory: %s", retention, snapshotPrefix+"-"+nodeName, snapshotDir) var snapshotFiles []os.FileInfo if err := filepath.Walk(snapshotDir, func(path string, info os.FileInfo, err error) error { @@ -1362,7 +1605,9 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) delCount := len(snapshotFiles) - retention for _, df := range snapshotFiles[:delCount] { - if err := os.Remove(filepath.Join(snapshotDir, df.Name())); err != nil { + snapshotPath := filepath.Join(snapshotDir, df.Name()) + logrus.Infof("Removing local snapshot %s", snapshotPath) + if err := os.Remove(snapshotPath); err != nil { return err } } diff --git a/pkg/etcd/etcd_int_test.go b/pkg/etcd/etcd_int_test.go index 3d7f17a03432..7ee255143484 100644 --- a/pkg/etcd/etcd_int_test.go +++ b/pkg/etcd/etcd_int_test.go @@ -36,7 +36,7 @@ var _ = Describe("etcd snapshots", func() { }) It("saves an etcd snapshot", func() { Expect(testutil.K3sCmd("etcd-snapshot", "save")). - To(ContainSubstring("Saving current etcd snapshot set to k3s-etcd-snapshots")) + To(ContainSubstring("saved")) }) It("list snapshots", func() { Expect(testutil.K3sCmd("etcd-snapshot", "ls")). @@ -70,13 +70,13 @@ var _ = Describe("etcd snapshots", func() { When("using etcd snapshot prune", func() { It("saves 3 different snapshots", func() { Expect(testutil.K3sCmd("etcd-snapshot", "save", "-name", "PRUNE_TEST")). - To(ContainSubstring("Saving current etcd snapshot set to k3s-etcd-snapshots")) + To(ContainSubstring("saved")) time.Sleep(1 * time.Second) Expect(testutil.K3sCmd("etcd-snapshot", "save", "-name", "PRUNE_TEST")). - To(ContainSubstring("Saving current etcd snapshot set to k3s-etcd-snapshots")) + To(ContainSubstring("saved")) time.Sleep(1 * time.Second) Expect(testutil.K3sCmd("etcd-snapshot", "save", "-name", "PRUNE_TEST")). - To(ContainSubstring("Saving current etcd snapshot set to k3s-etcd-snapshots")) + To(ContainSubstring("saved")) time.Sleep(1 * time.Second) }) It("lists all 3 snapshots", func() { @@ -89,7 +89,7 @@ var _ = Describe("etcd snapshots", func() { }) It("prunes snapshots down to 2", func() { Expect(testutil.K3sCmd("etcd-snapshot", "prune", "--snapshot-retention", "2", "--name", "PRUNE_TEST")). - To(BeEmpty()) + To(ContainSubstring("Removing local snapshot")) lsResult, err := testutil.K3sCmd("etcd-snapshot", "ls") Expect(err).ToNot(HaveOccurred()) reg, err := regexp.Compile(`:///var/lib/rancher/k3s/server/db/snapshots/PRUNE_TEST`) diff --git a/pkg/etcd/s3.go b/pkg/etcd/s3.go index 78b8663025eb..6eee3092c2aa 100644 --- a/pkg/etcd/s3.go +++ b/pkg/etcd/s3.go @@ -14,12 +14,14 @@ import ( "path/filepath" "sort" "strings" + "time" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "github.com/pkg/errors" "github.com/rancher/k3s/pkg/daemons/config" "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // S3 maintains state for S3 functionality. @@ -32,6 +34,9 @@ type S3 struct { // copy of the config.Control pointer and initializes // a new Minio client. func NewS3(ctx context.Context, config *config.Control) (*S3, error) { + if config.EtcdS3BucketName == "" { + return nil, errors.New("s3 bucket name was not set") + } tr := http.DefaultTransport switch { @@ -88,9 +93,11 @@ func NewS3(ctx context.Context, config *config.Control) (*S3, error) { // upload uploads the given snapshot to the configured S3 // compatible backend. -func (s *S3) upload(ctx context.Context, snapshot string) error { +func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now time.Time) (*SnapshotFile, error) { + logrus.Infof("Uploading snapshot %s to S3", snapshot) basename := filepath.Base(snapshot) var snapshotFileName string + var snapshotFile SnapshotFile if s.config.EtcdS3Folder != "" { snapshotFileName = filepath.Join(s.config.EtcdS3Folder, basename) } else { @@ -103,11 +110,56 @@ func (s *S3) upload(ctx context.Context, snapshot string) error { ContentType: "application/zip", NumThreads: 2, } - if _, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotFileName, snapshot, opts); err != nil { - logrus.Errorf("Error received in attempt to upload snapshot to S3: %s", err) - } + uploadInfo, err := s.client.FPutObject(toCtx, s.config.EtcdS3BucketName, snapshotFileName, snapshot, opts) + if err != nil { + snapshotFile = SnapshotFile{ + Name: filepath.Base(uploadInfo.Key), + Metadata: extraMetadata, + NodeName: "s3", + CreatedAt: &metav1.Time{ + Time: now, + }, + Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), + Size: 0, + Status: FailedSnapshotStatus, + S3: &s3Config{ + Endpoint: s.config.EtcdS3Endpoint, + EndpointCA: s.config.EtcdS3EndpointCA, + SkipSSLVerify: s.config.EtcdS3SkipSSLVerify, + Bucket: s.config.EtcdS3BucketName, + Region: s.config.EtcdS3Region, + Folder: s.config.EtcdS3Folder, + Insecure: s.config.EtcdS3Insecure, + }, + } + logrus.Errorf("Error received during snapshot upload to S3: %s", err) + } else { + ca, err := time.Parse(time.RFC3339, uploadInfo.LastModified.Format(time.RFC3339)) + if err != nil { + return nil, err + } - return nil + snapshotFile = SnapshotFile{ + Name: filepath.Base(uploadInfo.Key), + Metadata: extraMetadata, + NodeName: "s3", + CreatedAt: &metav1.Time{ + Time: ca, + }, + Size: uploadInfo.Size, + Status: SuccessfulSnapshotStatus, + S3: &s3Config{ + Endpoint: s.config.EtcdS3Endpoint, + EndpointCA: s.config.EtcdS3EndpointCA, + SkipSSLVerify: s.config.EtcdS3SkipSSLVerify, + Bucket: s.config.EtcdS3BucketName, + Region: s.config.EtcdS3Region, + Folder: s.config.EtcdS3Folder, + Insecure: s.config.EtcdS3Insecure, + }, + } + } + return &snapshotFile, nil } // download downloads the given snapshot from the configured S3 @@ -170,9 +222,13 @@ func (s *S3) snapshotPrefix() string { return prefix } -// snapshotRetention deletes the given snapshot from the configured S3 -// compatible backend. +// snapshotRetention prunes snapshots in the configured S3 compatible backend for this specific node. func (s *S3) snapshotRetention(ctx context.Context) error { + if s.config.EtcdSnapshotRetention < 1 { + return nil + } + logrus.Infof("Applying snapshot retention policy to snapshots stored in S3: retention: %d, snapshotPrefix: %s", s.config.EtcdSnapshotRetention, s.snapshotPrefix()) + var snapshotFiles []minio.ObjectInfo toCtx, cancel := context.WithTimeout(ctx, s.config.EtcdS3Timeout) @@ -199,7 +255,7 @@ func (s *S3) snapshotRetention(ctx context.Context) error { delCount := len(snapshotFiles) - s.config.EtcdSnapshotRetention for _, df := range snapshotFiles[:delCount] { - logrus.Debugf("Removing snapshot: %s", df.Key) + logrus.Infof("Removing S3 snapshot: %s", df.Key) if err := s.client.RemoveObject(ctx, s.config.EtcdS3BucketName, df.Key, minio.RemoveObjectOptions{}); err != nil { return err }