diff --git a/pkg/etcd/s3.go b/pkg/etcd/s3.go index fe15f8f1f129..0bd09a2c2478 100644 --- a/pkg/etcd/s3.go +++ b/pkg/etcd/s3.go @@ -20,6 +20,7 @@ import ( "github.com/minio/minio-go/v7/pkg/credentials" "github.com/pkg/errors" "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -92,7 +93,7 @@ func NewS3(ctx context.Context, config *config.Control) (*S3, error) { // upload uploads the given snapshot to the configured S3 // compatible backend. -func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now time.Time) (*snapshotFile, error) { +func (s *S3) upload(ctx context.Context, snapshot string, extraMetadata *v1.ConfigMap, now time.Time) (*snapshotFile, error) { logrus.Infof("Uploading snapshot %s to S3", snapshot) basename := filepath.Base(snapshot) var snapshotFileName string @@ -115,7 +116,6 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim if err != nil { sf = snapshotFile{ Name: filepath.Base(uploadInfo.Key), - Metadata: extraMetadata, NodeName: "s3", CreatedAt: &metav1.Time{ Time: now, @@ -132,6 +132,7 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim Folder: s.config.EtcdS3Folder, Insecure: s.config.EtcdS3Insecure, }, + metadataSource: extraMetadata, } logrus.Errorf("Error received during snapshot upload to S3: %s", err) } else { @@ -142,7 +143,6 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim sf = snapshotFile{ Name: filepath.Base(uploadInfo.Key), - Metadata: extraMetadata, NodeName: "s3", CreatedAt: &metav1.Time{ Time: ca, @@ -158,6 +158,7 @@ func (s *S3) upload(ctx context.Context, snapshot, extraMetadata string, now tim Folder: s.config.EtcdS3Folder, Insecure: s.config.EtcdS3Insecure, }, + metadataSource: extraMetadata, } } return &sf, nil diff --git a/pkg/etcd/snapshot.go b/pkg/etcd/snapshot.go index e1a12e9e44ea..9c8d210884b0 100644 --- a/pkg/etcd/snapshot.go +++ b/pkg/etcd/snapshot.go @@ -12,6 +12,7 @@ import ( "path/filepath" "runtime" "sort" + "strconv" "strings" "time" @@ -35,6 +36,7 @@ import ( const ( maxConcurrentSnapshots = 1 + pruneStepSize = 5 compressedExtension = ".zip" ) @@ -187,7 +189,7 @@ func (e *ETCD) Snapshot(ctx context.Context) error { defer e.snapshotSem.Release(maxConcurrentSnapshots) // make sure the core.Factory is initialized before attempting to add snapshot metadata - var extraMetadata string + var extraMetadata *v1.ConfigMap if e.config.Runtime.Core == nil { logrus.Debugf("Cannot retrieve extra metadata from %s ConfigMap: runtime core not ready", snapshotExtraMetadataConfigMapName) } else { @@ -195,13 +197,8 @@ func (e *ETCD) Snapshot(ctx context.Context) error { if snapshotExtraMetadataConfigMap, err := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotExtraMetadataConfigMapName, metav1.GetOptions{}); err != nil { logrus.Debugf("Error encountered attempting to retrieve extra metadata from %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err) } else { - if m, err := json.Marshal(snapshotExtraMetadataConfigMap.Data); err != nil { - logrus.Debugf("Error attempting to marshal extra metadata contained in %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err) - } else { - logrus.Debugf("Setting extra metadata from %s ConfigMap", snapshotExtraMetadataConfigMapName) - logrus.Tracef("Marshalled extra metadata in %s ConfigMap was: %s", snapshotExtraMetadataConfigMapName, string(m)) - extraMetadata = base64.StdEncoding.EncodeToString(m) - } + logrus.Debugf("Setting extra metadata from %s ConfigMap", snapshotExtraMetadataConfigMapName) + extraMetadata = snapshotExtraMetadataConfigMap } } @@ -259,15 +256,15 @@ func (e *ETCD) Snapshot(ctx context.Context) error { sf = &snapshotFile{ Name: snapshotName, Location: "", - Metadata: extraMetadata, NodeName: nodeName, CreatedAt: &metav1.Time{ Time: now, }, - Status: failedSnapshotStatus, - Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), - Size: 0, - Compressed: e.config.EtcdSnapshotCompress, + Status: failedSnapshotStatus, + Message: base64.StdEncoding.EncodeToString([]byte(err.Error())), + Size: 0, + Compressed: e.config.EtcdSnapshotCompress, + metadataSource: extraMetadata, } logrus.Errorf("Failed to take etcd snapshot: %v", err) if err := e.addSnapshotData(*sf); err != nil { @@ -295,15 +292,15 @@ func (e *ETCD) Snapshot(ctx context.Context) error { } sf = &snapshotFile{ Name: f.Name(), - Metadata: extraMetadata, Location: "file://" + snapshotPath, NodeName: nodeName, CreatedAt: &metav1.Time{ Time: f.ModTime(), }, - Status: successfulSnapshotStatus, - Size: f.Size(), - Compressed: e.config.EtcdSnapshotCompress, + Status: successfulSnapshotStatus, + Size: f.Size(), + Compressed: e.config.EtcdSnapshotCompress, + metadataSource: extraMetadata, } if err := e.addSnapshotData(*sf); err != nil { @@ -321,7 +318,6 @@ func (e *ETCD) Snapshot(ctx context.Context) error { logrus.Warnf("Unable to initialize S3 client: %v", err) sf = &snapshotFile{ Name: filepath.Base(snapshotPath), - Metadata: extraMetadata, NodeName: "s3", CreatedAt: &metav1.Time{ Time: now, @@ -338,6 +334,7 @@ func (e *ETCD) Snapshot(ctx context.Context) error { Folder: e.config.EtcdS3Folder, Insecure: e.config.EtcdS3Insecure, }, + metadataSource: extraMetadata, } } // sf should be nil if we were able to successfully initialize the S3 client. @@ -392,6 +389,8 @@ type snapshotFile struct { Status snapshotStatus `json:"status,omitempty"` S3 *s3Config `json:"s3Config,omitempty"` Compressed bool `json:"compressed"` + + metadataSource *v1.ConfigMap `json:"-"` } // listLocalSnapshots provides a list of the currently stored @@ -572,7 +571,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { for obj := range e.s3.client.ListObjects(ctx, e.config.EtcdS3BucketName, opts) { if obj.Err != nil { - logrus.Error(obj.Err) + logrus.Errorf("Failed to list snapshots from S3: %v", obj.Err) return } @@ -630,24 +629,40 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) error { return e.ReconcileSnapshotData(ctx) } +func marshalSnapshotFile(sf snapshotFile) ([]byte, error) { + if sf.metadataSource != nil { + if m, err := json.Marshal(sf.metadataSource.Data); err != nil { + logrus.Debugf("Error attempting to marshal extra metadata contained in %s ConfigMap, error: %v", snapshotExtraMetadataConfigMapName, err) + } else { + logrus.Tracef("Marshalled extra metadata in %s ConfigMap was: %s", snapshotExtraMetadataConfigMapName, string(m)) + sf.Metadata = base64.StdEncoding.EncodeToString(m) + } + } + return json.Marshal(sf) +} + // AddSnapshotData adds the given snapshot file information to the snapshot configmap, using the existing extra metadata // available at the time. func (e *ETCD) addSnapshotData(sf snapshotFile) error { + // make sure the core.Factory is initialized. There can + // be a race between this core code startup. + for e.config.Runtime.Core == nil { + runtime.Gosched() + } + + sfKey := generateSnapshotConfigMapKey(sf) + marshalledSnapshotFile, err := marshalSnapshotFile(sf) + if err != nil { + return err + } + + pruneCount := pruneStepSize + var lastErr error return retry.OnError(snapshotDataBackoff, func(err error) bool { - return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) + return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) || isTooLargeError(err) }, func() error { - // make sure the core.Factory is initialized. There can - // be a race between this core code startup. - for e.config.Runtime.Core == nil { - runtime.Gosched() - } snapshotConfigMap, getErr := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) - sfKey := generateSnapshotConfigMapKey(sf) - marshalledSnapshotFile, err := json.Marshal(sf) - if err != nil { - return err - } if apierrors.IsNotFound(getErr) { cm := v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ @@ -664,10 +679,21 @@ func (e *ETCD) addSnapshotData(sf snapshotFile) error { snapshotConfigMap.Data = make(map[string]string) } + // If the configmap update was rejected due to size, drop the oldest entries from the map. + // We will continue to remove an increasing number of old snapshots from the map until the request succeeds, + // or the number we would attempt to remove exceeds the number stored. + if isTooLargeError(lastErr) { + logrus.Warnf("Snapshot configmap is too large, attempting to elide %d oldest snapshots from list", pruneCount) + if err := pruneConfigMap(snapshotConfigMap, pruneCount); err != nil { + return err + } + pruneCount += pruneStepSize + } + snapshotConfigMap.Data[sfKey] = string(marshalledSnapshotFile) - _, err = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap) - return err + _, lastErr = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap) + return lastErr }) } @@ -679,34 +705,68 @@ func generateSnapshotConfigMapKey(sf snapshotFile) string { return "local-" + name } +// pruneConfigMap drops the oldest entries from the configMap. +// Note that the actual snapshot files are not removed, just the entries that track them in the configmap. +func pruneConfigMap(snapshotConfigMap *v1.ConfigMap, pruneCount int) error { + if pruneCount > len(snapshotConfigMap.Data) { + return errors.New("unable to reduce snapshot ConfigMap size by eliding old snapshots") + } + + var snapshotFiles []snapshotFile + retention := len(snapshotConfigMap.Data) - pruneCount + for name := range snapshotConfigMap.Data { + basename, compressed := strings.CutSuffix(name, compressedExtension) + ts, _ := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64) + snapshotFiles = append(snapshotFiles, snapshotFile{Name: name, CreatedAt: &metav1.Time{Time: time.Unix(ts, 0)}, Compressed: compressed}) + } + + // sort newest-first so we can prune entries past the retention count + sort.Slice(snapshotFiles, func(i, j int) bool { + return snapshotFiles[j].CreatedAt.Before(snapshotFiles[i].CreatedAt) + }) + + for _, snapshotFile := range snapshotFiles[retention:] { + delete(snapshotConfigMap.Data, snapshotFile.Name) + } + return nil +} + // ReconcileSnapshotData reconciles snapshot data in the snapshot ConfigMap. // It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to list S3 snapshots -// and reconcile snapshots from S3. Notably, +// and reconcile snapshots from S3. func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { + // make sure the core.Factory is initialized. There can + // be a race between this core code startup. + for e.config.Runtime.Core == nil { + runtime.Gosched() + } + logrus.Infof("Reconciling etcd snapshot data in %s ConfigMap", snapshotConfigMapName) defer logrus.Infof("Reconciliation of snapshot data in %s ConfigMap complete", snapshotConfigMapName) + + pruneCount := pruneStepSize + var lastErr error return retry.OnError(retry.DefaultBackoff, func(err error) bool { - return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) + return apierrors.IsConflict(err) || apierrors.IsAlreadyExists(err) || isTooLargeError(err) }, func() error { - // make sure the core.Factory is initialize. There can - // be a race between this core code startup. - for e.config.Runtime.Core == nil { - runtime.Gosched() - } - - logrus.Debug("core.Factory is initialized") - snapshotConfigMap, getErr := e.config.Runtime.Core.Core().V1().ConfigMap().Get(metav1.NamespaceSystem, snapshotConfigMapName, metav1.GetOptions{}) if apierrors.IsNotFound(getErr) { - // Can't reconcile what doesn't exist. - return errors.New("No snapshot configmap found") + cm := &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: snapshotConfigMapName, + Namespace: metav1.NamespaceSystem, + }, + } + cm, err := e.config.Runtime.Core.Core().V1().ConfigMap().Create(cm) + if err != nil { + return err + } + snapshotConfigMap = cm } logrus.Debugf("Attempting to reconcile etcd snapshot data for configmap generation %d", snapshotConfigMap.Generation) - - // if the snapshot config map data is nil, no need to reconcile. if snapshotConfigMap.Data == nil { - return nil + snapshotConfigMap.Data = map[string]string{} } snapshotFiles, err := e.listLocalSnapshots() @@ -716,11 +776,11 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { // s3ListSuccessful is set to true if we are successful at listing snapshots from S3 to eliminate accidental // clobbering of S3 snapshots in the configmap due to misconfigured S3 credentials/details - s3ListSuccessful := false + var s3ListSuccessful bool if e.config.EtcdS3 { if s3Snapshots, err := e.listS3Snapshots(ctx); err != nil { - logrus.Errorf("error retrieving S3 snapshots for reconciliation: %v", err) + logrus.Errorf("Error retrieving S3 snapshots for reconciliation: %v", err) } else { for k, v := range s3Snapshots { snapshotFiles[k] = v @@ -764,21 +824,16 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { // Apply the failed snapshot retention policy to locally failed snapshots if len(failedSnapshots) > 0 && e.config.EtcdSnapshotRetention >= 1 { + // sort newest-first so we can record only the retention count sort.Slice(failedSnapshots, func(i, j int) bool { - return failedSnapshots[i].Name > failedSnapshots[j].Name + return failedSnapshots[j].CreatedAt.Before(failedSnapshots[i].CreatedAt) }) - var keepCount int - if e.config.EtcdSnapshotRetention >= len(failedSnapshots) { - keepCount = len(failedSnapshots) - } else { - keepCount = e.config.EtcdSnapshotRetention - } - for _, dfs := range failedSnapshots[:keepCount] { + for _, dfs := range failedSnapshots[:e.config.EtcdSnapshotRetention] { sfKey := generateSnapshotConfigMapKey(dfs) - marshalledSnapshot, err := json.Marshal(dfs) + marshalledSnapshot, err := marshalSnapshotFile(dfs) if err != nil { - logrus.Errorf("unable to marshal snapshot to store in configmap %v", err) + logrus.Errorf("Failed to marshal snapshot to store in configmap %v", err) } else { snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot) } @@ -787,21 +842,16 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { // Apply the failed snapshot retention policy to the S3 snapshots if len(failedS3Snapshots) > 0 && e.config.EtcdSnapshotRetention >= 1 { + // sort newest-first so we can record only the retention count sort.Slice(failedS3Snapshots, func(i, j int) bool { - return failedS3Snapshots[i].Name > failedS3Snapshots[j].Name + return failedS3Snapshots[j].CreatedAt.Before(failedS3Snapshots[i].CreatedAt) }) - var keepCount int - if e.config.EtcdSnapshotRetention >= len(failedS3Snapshots) { - keepCount = len(failedS3Snapshots) - } else { - keepCount = e.config.EtcdSnapshotRetention - } - for _, dfs := range failedS3Snapshots[:keepCount] { + for _, dfs := range failedS3Snapshots[:e.config.EtcdSnapshotRetention] { sfKey := generateSnapshotConfigMapKey(dfs) - marshalledSnapshot, err := json.Marshal(dfs) + marshalledSnapshot, err := marshalSnapshotFile(dfs) if err != nil { - logrus.Errorf("unable to marshal snapshot to store in configmap %v", err) + logrus.Errorf("Failed to marshal snapshot to store in configmap %v", err) } else { snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot) } @@ -815,7 +865,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { if v, ok := deletedSnapshots[sfKey]; ok { // use the snapshot file we have from the existing configmap, and unmarshal it so we can manipulate it if err := json.Unmarshal([]byte(v), &sf); err != nil { - logrus.Errorf("error unmarshaling snapshot file: %v", err) + logrus.Errorf("Error unmarshaling snapshot file: %v", err) // use the snapshot with info we sourced from disk/S3 (will be missing metadata, but something is better than nothing) sf = snapshot } @@ -824,18 +874,28 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error { } sf.Status = successfulSnapshotStatus // if the snapshot is on disk or in S3, it was successful. - - marshalledSnapshot, err := json.Marshal(sf) + marshalledSnapshot, err := marshalSnapshotFile(sf) if err != nil { - logrus.Warnf("unable to marshal snapshot metadata %s to store in configmap, received error: %v", sf.Name, err) + logrus.Warnf("Failed to marshal snapshot metadata %s to store in configmap, received error: %v", sf.Name, err) } else { snapshotConfigMap.Data[sfKey] = string(marshalledSnapshot) } } + // If the configmap update was rejected due to size, drop the oldest entries from the map. + // We will continue to remove an increasing number of old snapshots from the map until the request succeeds, + // or the number we would attempt to remove exceeds the number stored. + if isTooLargeError(lastErr) { + logrus.Warnf("Snapshot configmap is too large, attempting to elide %d oldest snapshots from list", pruneCount) + if err := pruneConfigMap(snapshotConfigMap, pruneCount); err != nil { + return err + } + pruneCount += pruneStepSize + } + logrus.Debugf("Updating snapshot ConfigMap (%s) with %d entries", snapshotConfigMapName, len(snapshotConfigMap.Data)) - _, err = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap) - return err + _, lastErr = e.config.Runtime.Core.Core().V1().ConfigMap().Update(snapshotConfigMap) + return lastErr }) } @@ -848,7 +908,7 @@ func (e *ETCD) setSnapshotFunction(ctx context.Context) { // when updating the snapshot list configmap. time.Sleep(time.Duration(rand.Float64() * float64(snapshotJitterMax))) if err := e.Snapshot(ctx); err != nil { - logrus.Error(err) + logrus.Errorf("Failed to take scheduled snapshot: %v", err) } }))) } @@ -862,13 +922,15 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) logrus.Infof("Applying local snapshot retention policy: retention: %d, snapshotPrefix: %s, directory: %s", retention, snapshotPrefix, snapshotDir) - var snapshotFiles []os.FileInfo + var snapshotFiles []snapshotFile if err := filepath.Walk(snapshotDir, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if strings.HasPrefix(info.Name(), snapshotPrefix) { - snapshotFiles = append(snapshotFiles, info) + basename, compressed := strings.CutSuffix(info.Name(), compressedExtension) + ts, _ := strconv.ParseInt(basename[strings.LastIndexByte(basename, '-')+1:], 10, 64) + snapshotFiles = append(snapshotFiles, snapshotFile{Name: info.Name(), CreatedAt: &metav1.Time{Time: time.Unix(ts, 0)}, Compressed: compressed}) } return nil }); err != nil { @@ -877,16 +939,14 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) if len(snapshotFiles) <= retention { return nil } - sort.Slice(snapshotFiles, func(firstSnapshot, secondSnapshot int) bool { - // it takes the name from the snapshot file ex: etcd-snapshot-example-{date}, makes the split using "-" to find the date, takes the date and sort by date - firstSnapshotName, secondSnapshotName := strings.Split(snapshotFiles[firstSnapshot].Name(), "-"), strings.Split(snapshotFiles[secondSnapshot].Name(), "-") - firstSnapshotDate, secondSnapshotDate := firstSnapshotName[len(firstSnapshotName)-1], secondSnapshotName[len(secondSnapshotName)-1] - return firstSnapshotDate < secondSnapshotDate + + // sort newest-first so we can prune entries past the retention count + sort.Slice(snapshotFiles, func(i, j int) bool { + return snapshotFiles[j].CreatedAt.Before(snapshotFiles[i].CreatedAt) }) - delCount := len(snapshotFiles) - retention - for _, df := range snapshotFiles[:delCount] { - snapshotPath := filepath.Join(snapshotDir, df.Name()) + for _, df := range snapshotFiles[retention:] { + snapshotPath := filepath.Join(snapshotDir, df.Name) logrus.Infof("Removing local snapshot %s", snapshotPath) if err := os.Remove(snapshotPath); err != nil { return err @@ -895,3 +955,8 @@ func snapshotRetention(retention int, snapshotPrefix string, snapshotDir string) return nil } + +func isTooLargeError(err error) bool { + // There are no helpers for unpacking field validation errors, so we just check for "Too long" in the error string. + return apierrors.IsRequestEntityTooLargeError(err) || (apierrors.IsInvalid(err) && strings.Contains(err.Error(), "Too long")) +}