diff --git a/Makefile b/Makefile index dc2ac74eb..6d816f4dd 100644 --- a/Makefile +++ b/Makefile @@ -31,7 +31,7 @@ endif # Dependencies to fetch through `go` CONTROLLER_GEN_PKG?=sigs.k8s.io/controller-tools/cmd/controller-gen@v0.6.1 CONTROLLER_GEN=$(GOBIN)/controller-gen -KUSTOMIZE_PKG?=sigs.k8s.io/kustomize/kustomize/v3@v3.9.4 +KUSTOMIZE_PKG?=sigs.k8s.io/kustomize/kustomize/v4@v4.5.2 KUSTOMIZE=$(GOBIN)/kustomize GOLANGCI_LINT_PKG=github.com/golangci/golangci-lint/cmd/golangci-lint@v1.42.1 GOLANGCI_LINT=$(GOBIN)/golangci-lint @@ -190,7 +190,7 @@ rebuild-operator: container-build container-push-if-remote deploy bounce bounce: kubectl $(KUBECTL_ARGS) delete pod -l app=fdb-kubernetes-operator-controller-manager -samples: ${SAMPLES} +samples: kustomize ${SAMPLES} config/samples/deployment.yaml: config/deployment/*.yaml kustomize build ./config/deployment > $@ diff --git a/api/v1beta2/foundationdb_errors.go b/api/v1beta2/foundationdb_errors.go new file mode 100644 index 000000000..e125cc686 --- /dev/null +++ b/api/v1beta2/foundationdb_errors.go @@ -0,0 +1,34 @@ +/* + * foundationdb_errors.go + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1beta2 + +import "fmt" + +// TimeoutError represents a timeout for either the fdb client library or fdbcli +// +k8s:deepcopy-gen=false +type TimeoutError struct { + Err error +} + +// Error returns the error message of the internal timeout error. +func (timeoutErr TimeoutError) Error() string { + return fmt.Sprintf("fdb timeout: %s", timeoutErr.Err.Error()) +} diff --git a/api/v1beta2/foundationdb_status.go b/api/v1beta2/foundationdb_status.go index 07d3d1c64..a71da56be 100644 --- a/api/v1beta2/foundationdb_status.go +++ b/api/v1beta2/foundationdb_status.go @@ -89,6 +89,10 @@ type FoundationDBStatusClusterInfo struct { // FaultTolerance provides information about the fault tolerance status // of the cluster. FaultTolerance FaultTolerance `json:"fault_tolerance,omitempty"` + + // IncompatibleConnections provides information about processes that try to connect to the cluster with an + // incompatible version. + IncompatibleConnections []string `json:"incompatible_connections,omitempty"` } // FaultTolerance provides information about the fault tolerance status diff --git a/api/v1beta2/foundationdb_status_test.go b/api/v1beta2/foundationdb_status_test.go index 9b4598edf..cfb97b792 100644 --- a/api/v1beta2/foundationdb_status_test.go +++ b/api/v1beta2/foundationdb_status_test.go @@ -70,6 +70,7 @@ var _ = Describe("FoundationDBStatus", func() { DatabaseStatus: FoundationDBStatusClientDBStatus{Available: true, Healthy: true}, }, Cluster: FoundationDBStatusClusterInfo{ + IncompatibleConnections: []string{}, FaultTolerance: FaultTolerance{ MaxZoneFailuresWithoutLosingAvailability: 1, MaxZoneFailuresWithoutLosingData: 1, @@ -459,6 +460,7 @@ var _ = Describe("FoundationDBStatus", func() { When("parsing the status json with a 7.1.0-rc1 cluster", func() { status := FoundationDBStatusClusterInfo{ + IncompatibleConnections: []string{}, DatabaseConfiguration: DatabaseConfiguration{ RedundancyMode: "double", StorageEngine: StorageEngineSSD2, diff --git a/api/v1beta2/foundationdbcluster_types.go b/api/v1beta2/foundationdbcluster_types.go index 7ba92c3a5..e246d4b37 100644 --- a/api/v1beta2/foundationdbcluster_types.go +++ b/api/v1beta2/foundationdbcluster_types.go @@ -1159,7 +1159,12 @@ func (cluster *FoundationDBCluster) CheckReconciliation(log logr.Logger) (bool, for _, processGroup := range cluster.Status.ProcessGroups { if len(processGroup.ProcessGroupConditions) > 0 && !processGroup.IsMarkedForRemoval() { - logger.Info("Has unhealthy process group", "processGroupID", processGroup.ProcessGroupID, "state", "HasUnhealthyProcess") + conditions := make([]ProcessGroupConditionType, 0, len(processGroup.ProcessGroupConditions)) + for _, condition := range processGroup.ProcessGroupConditions { + conditions = append(conditions, condition.ProcessGroupConditionType) + } + + logger.Info("Has unhealthy process group", "processGroupID", processGroup.ProcessGroupID, "state", "HasUnhealthyProcess", "conditions", conditions) cluster.Status.Generations.HasUnhealthyProcess = cluster.ObjectMeta.Generation reconciled = false } diff --git a/api/v1beta2/zz_generated.deepcopy.go b/api/v1beta2/zz_generated.deepcopy.go index 0ea4d5cc3..ebccc41dc 100644 --- a/api/v1beta2/zz_generated.deepcopy.go +++ b/api/v1beta2/zz_generated.deepcopy.go @@ -958,6 +958,11 @@ func (in *FoundationDBStatusClusterInfo) DeepCopyInto(out *FoundationDBStatusClu in.Clients.DeepCopyInto(&out.Clients) in.Layers.DeepCopyInto(&out.Layers) out.FaultTolerance = in.FaultTolerance + if in.IncompatibleConnections != nil { + in, out := &in.IncompatibleConnections, &out.IncompatibleConnections + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FoundationDBStatusClusterInfo. diff --git a/charts/fdb-operator/Chart.yaml b/charts/fdb-operator/Chart.yaml index ebcdccf94..03bcd8a9d 100644 --- a/charts/fdb-operator/Chart.yaml +++ b/charts/fdb-operator/Chart.yaml @@ -21,7 +21,7 @@ version: 0.2.0 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. -appVersion: v1.8.0 +appVersion: v1.8.1 maintainers: - name: "foundationdb-ci" diff --git a/charts/fdb-operator/values.yaml b/charts/fdb-operator/values.yaml index 4bf3380e6..2f85c79dd 100644 --- a/charts/fdb-operator/values.yaml +++ b/charts/fdb-operator/values.yaml @@ -1,7 +1,7 @@ --- image: repository: foundationdb/fdb-kubernetes-operator - tag: v1.8.0 + tag: v1.8.1 pullPolicy: IfNotPresent initContainers: diff --git a/config/deployment/manager.yaml b/config/deployment/manager.yaml index 4c108457f..91ceb66da 100644 --- a/config/deployment/manager.yaml +++ b/config/deployment/manager.yaml @@ -83,7 +83,7 @@ spec: containers: - command: - /manager - image: foundationdb/fdb-kubernetes-operator:v1.8.0 + image: foundationdb/fdb-kubernetes-operator:v1.8.1 name: manager env: - name: WATCH_NAMESPACE diff --git a/config/samples/deployment.yaml b/config/samples/deployment.yaml index 9d52dc694..107928e2f 100644 --- a/config/samples/deployment.yaml +++ b/config/samples/deployment.yaml @@ -150,7 +150,7 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace - image: foundationdb/fdb-kubernetes-operator:v1.8.0 + image: foundationdb/fdb-kubernetes-operator:v1.8.1 name: manager ports: - containerPort: 8080 diff --git a/controllers/admin_client_mock.go b/controllers/admin_client_mock.go index ffa5d0525..1803c8e9d 100644 --- a/controllers/admin_client_mock.go +++ b/controllers/admin_client_mock.go @@ -463,6 +463,16 @@ func (client *mockAdminClient) KillProcesses(addresses []fdbv1beta2.ProcessAddre } adminClientMutex.Unlock() + if client.Cluster.Status.RunningVersion != client.Cluster.Spec.Version { + // We have to do this in the mock client, in the real world the tryConnectionOptions in update_status, + // will update the version. + client.Cluster.Status.RunningVersion = client.Cluster.Spec.Version + err := client.KubeClient.Status().Update(context.TODO(), client.Cluster) + if err != nil { + return err + } + } + client.UnfreezeStatus() return nil } diff --git a/controllers/bounce_processes.go b/controllers/bounce_processes.go index a81976b03..0a5dd5253 100644 --- a/controllers/bounce_processes.go +++ b/controllers/bounce_processes.go @@ -58,7 +58,7 @@ func (bounceProcesses) reconcile(ctx context.Context, r *FoundationDBClusterReco minimumUptime := math.Inf(1) addressMap := make(map[string][]fdbv1beta2.ProcessAddress, len(status.Cluster.Processes)) for _, process := range status.Cluster.Processes { - addressMap[process.Locality["instance_id"]] = append(addressMap[process.Locality["instance_id"]], process.Address) + addressMap[process.Locality[fdbv1beta2.FDBLocalityInstanceIDKey]] = append(addressMap[process.Locality[fdbv1beta2.FDBLocalityInstanceIDKey]], process.Address) if process.UptimeSeconds < minimumUptime { minimumUptime = process.UptimeSeconds @@ -68,6 +68,7 @@ func (bounceProcesses) reconcile(ctx context.Context, r *FoundationDBClusterReco processesToBounce := fdbv1beta2.FilterByConditions(cluster.Status.ProcessGroups, map[fdbv1beta2.ProcessGroupConditionType]bool{ fdbv1beta2.IncorrectCommandLine: true, fdbv1beta2.IncorrectPodSpec: false, + fdbv1beta2.SidecarUnreachable: false, // ignore all Process groups that are not reachable and therefore will not get any config map updates. }, true) addresses := make([]fdbv1beta2.ProcessAddress, 0, len(processesToBounce)) @@ -172,7 +173,7 @@ func (bounceProcesses) reconcile(ctx context.Context, r *FoundationDBClusterReco if useLocks && upgrading { var req *requeue - addresses, req = getAddressesForUpgrade(r, adminClient, lockClient, cluster, version) + addresses, req = getAddressesForUpgrade(r, status, lockClient, cluster, version) if req != nil { return req } @@ -187,13 +188,13 @@ func (bounceProcesses) reconcile(ctx context.Context, r *FoundationDBClusterReco if err != nil { return &requeue{curError: err} } - } - if upgrading { - cluster.Status.RunningVersion = cluster.Spec.Version - err = r.Status().Update(ctx, cluster) - if err != nil { - return &requeue{curError: err} + // If the cluster was upgraded we will requeue and let the update_status command set the correct version. + // Updating the version in this method has the drawback that we upgrade the version independent of the success + // of the kill command. The kill command is not reliable, which means that some kill request might not be + // delivered and the return value will still not contain any error. + if upgrading { + return &requeue{message: "fetch latest status after upgrade"} } } @@ -202,20 +203,14 @@ func (bounceProcesses) reconcile(ctx context.Context, r *FoundationDBClusterReco // getAddressesForUpgrade checks that all processes in a cluster are ready to be // upgraded and returns the full list of addresses. -func getAddressesForUpgrade(r *FoundationDBClusterReconciler, adminClient fdbadminclient.AdminClient, lockClient fdbadminclient.LockClient, cluster *fdbv1beta2.FoundationDBCluster, version fdbv1beta2.Version) ([]fdbv1beta2.ProcessAddress, *requeue) { +func getAddressesForUpgrade(r *FoundationDBClusterReconciler, databaseStatus *fdbv1beta2.FoundationDBStatus, lockClient fdbadminclient.LockClient, cluster *fdbv1beta2.FoundationDBCluster, version fdbv1beta2.Version) ([]fdbv1beta2.ProcessAddress, *requeue) { logger := log.WithValues("namespace", cluster.Namespace, "cluster", cluster.Name, "reconciler", "bounceProcesses") pendingUpgrades, err := lockClient.GetPendingUpgrades(version) if err != nil { return nil, &requeue{curError: err} } - databaseStatus, err := adminClient.GetStatus() - if err != nil { - return nil, &requeue{curError: err} - } - if !databaseStatus.Client.DatabaseStatus.Available { - logger.Info("Deferring upgrade until database is available") r.Recorder.Event(cluster, corev1.EventTypeNormal, "UpgradeRequeued", "Database is unavailable") return nil, &requeue{message: "Deferring upgrade until database is available"} } @@ -223,7 +218,7 @@ func getAddressesForUpgrade(r *FoundationDBClusterReconciler, adminClient fdbadm notReadyProcesses := make([]string, 0) addresses := make([]fdbv1beta2.ProcessAddress, 0, len(databaseStatus.Cluster.Processes)) for _, process := range databaseStatus.Cluster.Processes { - processID := process.Locality["instance_id"] + processID := process.Locality[fdbv1beta2.FDBLocalityInstanceIDKey] if process.Version == version.String() { continue } diff --git a/controllers/bounce_processes_test.go b/controllers/bounce_processes_test.go index a00afaf89..68669b6b0 100644 --- a/controllers/bounce_processes_test.go +++ b/controllers/bounce_processes_test.go @@ -166,8 +166,8 @@ var _ = Describe("bounceProcesses", func() { } }) - It("should not requeue", func() { - Expect(requeue).To(BeNil()) + It("should requeue", func() { + Expect(requeue).NotTo(BeNil()) }) It("should kill all the processes", func() { @@ -233,8 +233,8 @@ var _ = Describe("bounceProcesses", func() { Expect(err).NotTo(HaveOccurred()) }) - It("should not requeue", func() { - Expect(requeue).To(BeNil()) + It("should requeue", func() { + Expect(requeue).NotTo(BeNil()) }) It("should kill all the processes", func() { @@ -272,8 +272,8 @@ var _ = Describe("bounceProcesses", func() { cluster.Spec.LockOptions.DisableLocks = &disabled }) - It("should not requeue", func() { - Expect(requeue).To(BeNil()) + It("should requeue", func() { + Expect(requeue).NotTo(BeNil()) }) It("should kill all the processes", func() { diff --git a/controllers/cluster_controller.go b/controllers/cluster_controller.go index 6fe79f6fb..a8bb5412d 100644 --- a/controllers/cluster_controller.go +++ b/controllers/cluster_controller.go @@ -30,10 +30,9 @@ import ( "sort" "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/FoundationDB/fdb-kubernetes-operator/pkg/fdbadminclient" "github.com/FoundationDB/fdb-kubernetes-operator/pkg/podmanager" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -53,15 +52,16 @@ import ( // FoundationDBClusterReconciler reconciles a FoundationDBCluster object type FoundationDBClusterReconciler struct { client.Client - Recorder record.EventRecorder - Log logr.Logger - InSimulation bool - PodLifecycleManager podmanager.PodLifecycleManager - PodClientProvider func(*fdbv1beta2.FoundationDBCluster, *corev1.Pod) (podclient.FdbPodClient, error) - DatabaseClientProvider DatabaseClientProvider - DeprecationOptions internal.DeprecationOptions - GetTimeout time.Duration - PostTimeout time.Duration + Recorder record.EventRecorder + Log logr.Logger + InSimulation bool + EnableRestartIncompatibleProcesses bool + PodLifecycleManager podmanager.PodLifecycleManager + PodClientProvider func(*fdbv1beta2.FoundationDBCluster, *corev1.Pod) (podclient.FdbPodClient, error) + DatabaseClientProvider DatabaseClientProvider + DeprecationOptions internal.DeprecationOptions + GetTimeout time.Duration + PostTimeout time.Duration } // NewFoundationDBClusterReconciler creates a new FoundationDBClusterReconciler with defaults. @@ -139,6 +139,7 @@ func (r *FoundationDBClusterReconciler) Reconcile(ctx context.Context, request c addPVCs{}, addPods{}, generateInitialClusterFile{}, + removeIncompatibleProcesses{}, updateSidecarVersions{}, updatePodConfig{}, updateLabels{}, diff --git a/controllers/exclude_processes.go b/controllers/exclude_processes.go index 7769f2a8c..7925094b7 100644 --- a/controllers/exclude_processes.go +++ b/controllers/exclude_processes.go @@ -42,6 +42,7 @@ type excludeProcesses struct{} // reconcile runs the reconciler's work. func (e excludeProcesses) reconcile(_ context.Context, r *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster) *requeue { + logger := log.WithValues("namespace", cluster.Namespace, "cluster", cluster.Name, "reconciler", "excludeProcesses") adminClient, err := r.getDatabaseClientProvider().GetAdminClient(cluster, r) if err != nil { return &requeue{curError: err} @@ -62,7 +63,7 @@ func (e excludeProcesses) reconcile(_ context.Context, r *FoundationDBClusterRec if err != nil { return &requeue{curError: err} } - log.Info("current exclusions", "ex", exclusions) + logger.Info("current exclusions", "ex", exclusions) fdbProcessesToExclude, processClassesToExclude = getProcessesToExclude(exclusions, cluster, removalCount) } diff --git a/controllers/remove_incompatible_processes.go b/controllers/remove_incompatible_processes.go new file mode 100644 index 000000000..49556d121 --- /dev/null +++ b/controllers/remove_incompatible_processes.go @@ -0,0 +1,128 @@ +/* + * remove_incompatible_processes.go + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package controllers + +import ( + "context" + "time" + + corev1 "k8s.io/api/core/v1" + + "github.com/go-logr/logr" + + fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2" + "github.com/FoundationDB/fdb-kubernetes-operator/internal" +) + +// removeIncompatibleProcesses is a reconciler that will restart incompatible fdbserver processes, this can happen +// during an upgrade when the kill command doesn't reach all processes, see: https://github.com/FoundationDB/fdb-kubernetes-operator/issues/1281 +type removeIncompatibleProcesses struct{} + +// reconcile runs the reconciler's work. +func (removeIncompatibleProcesses) reconcile(ctx context.Context, r *FoundationDBClusterReconciler, cluster *fdbv1beta2.FoundationDBCluster) *requeue { + logger := log.WithValues("namespace", cluster.Namespace, "cluster", cluster.Name, "reconciler", "removeIncompatibleProcesses") + err := processIncompatibleProcesses(ctx, r, logger, cluster) + + if err != nil { + return &requeue{curError: err, delay: 15 * time.Second} + } + + return nil +} + +func processIncompatibleProcesses(ctx context.Context, r *FoundationDBClusterReconciler, logger logr.Logger, cluster *fdbv1beta2.FoundationDBCluster) error { + if !r.EnableRestartIncompatibleProcesses { + logger.Info("skipping disabled subreconciler") + return nil + } + + if !cluster.Status.Configured { + logger.Info("waiting for cluster to be configured") + return nil + } + + pods, err := r.PodLifecycleManager.GetPods(ctx, r, cluster, internal.GetPodListOptions(cluster, "", "")...) + if err != nil { + return err + } + + podMap := internal.CreatePodMap(cluster, pods) + + adminClient, err := r.getDatabaseClientProvider().GetAdminClient(cluster, r.Client) + if err != nil { + return err + } + defer adminClient.Close() + + status, err := adminClient.GetStatus() + if err != nil { + // If we hit a timeout issue we don't want to block any further steps. + if internal.IsTimeoutError(err) { + return nil + } + return err + } + + if len(status.Cluster.IncompatibleConnections) == 0 { + return nil + } + + logger.Info("incompatible connections", "incompatibleConnections", status.Cluster.IncompatibleConnections) + incompatibleConnections := map[string]fdbv1beta2.None{} + for _, incompatibleAddress := range status.Cluster.IncompatibleConnections { + address, err := fdbv1beta2.ParseProcessAddress(incompatibleAddress) + if err != nil { + logger.Error(err, "could not parse address in incompatible connections", "address", incompatibleAddress) + continue + } + + incompatibleConnections[address.IPAddress.String()] = fdbv1beta2.None{} + } + + incompatiblePods := make([]*corev1.Pod, 0, len(incompatibleConnections)) + for _, processGroup := range cluster.Status.ProcessGroups { + pod, ok := podMap[processGroup.ProcessGroupID] + if !ok || pod == nil { + logger.V(1).Info("Could not find Pod for process group ID", + "processGroupID", processGroup.ProcessGroupID) + continue + } + + if isIncompatible(incompatibleConnections, processGroup) { + logger.Info("recreate Pod for process group with incompatible version", "processGroupID", processGroup.ProcessGroupID) + incompatiblePods = append(incompatiblePods, pod) + } + } + + // Do an unsafe update of the Pods since they are not reachable anyway + return r.PodLifecycleManager.UpdatePods(ctx, r, cluster, incompatiblePods, true) +} + +// isIncompatible checks if the process group is in the list of incompatible connections. +func isIncompatible(incompatibleConnections map[string]fdbv1beta2.None, processGroup *fdbv1beta2.ProcessGroupStatus) bool { + for _, address := range processGroup.Addresses { + if _, ok := incompatibleConnections[address]; ok { + return true + } + } + + return false +} diff --git a/controllers/remove_incompatible_processes_test.go b/controllers/remove_incompatible_processes_test.go new file mode 100644 index 000000000..8443c9b9a --- /dev/null +++ b/controllers/remove_incompatible_processes_test.go @@ -0,0 +1,183 @@ +/* + * remove_incompatible_processes_tests.go + * + * This source file is part of the FoundationDB open source project + * + * Copyright 2022 Apple Inc. and the FoundationDB project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package controllers + +import ( + "context" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + + fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2" + "github.com/FoundationDB/fdb-kubernetes-operator/internal" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("restart_incompatible_pods", func() { + DescribeTable("", func(incompatibleConnections map[string]fdbv1beta2.None, processGroup *fdbv1beta2.ProcessGroupStatus, expected bool) { + Expect(isIncompatible(incompatibleConnections, processGroup)).To(Equal(expected)) + }, + Entry("empty incompatible map", + map[string]fdbv1beta2.None{}, + &fdbv1beta2.ProcessGroupStatus{ + Addresses: []string{"1.1.1.1"}, + }, + false), + Entry("nil incompatible map", + nil, + &fdbv1beta2.ProcessGroupStatus{ + Addresses: []string{"1.1.1.1"}, + }, + false), + Entry("incompatible map contains another address", + map[string]fdbv1beta2.None{ + "1.1.1.2": {}, + }, + &fdbv1beta2.ProcessGroupStatus{ + Addresses: []string{"1.1.1.1"}, + }, + false), + Entry("incompatible map contains matching address", + map[string]fdbv1beta2.None{ + "1.1.1.1": {}, + }, + &fdbv1beta2.ProcessGroupStatus{ + Addresses: []string{"1.1.1.1"}, + }, + true), + ) + + When("running a reconcile for the restart incompatible process reconciler", func() { + var cluster *fdbv1beta2.FoundationDBCluster + var initialCount int + + BeforeEach(func() { + cluster = internal.CreateDefaultCluster() + err := k8sClient.Create(context.TODO(), cluster) + Expect(err).NotTo(HaveOccurred()) + + result, err := reconcileCluster(cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + + generation, err := reloadCluster(cluster) + Expect(err).NotTo(HaveOccurred()) + Expect(generation).To(Equal(int64(1))) + + pods := &corev1.PodList{} + err = k8sClient.List(context.TODO(), pods, getListOptions(cluster)...) + Expect(err).NotTo(HaveOccurred()) + initialCount = len(pods.Items) + }) + + JustBeforeEach(func() { + err := processIncompatibleProcesses(context.TODO(), clusterReconciler, logr.Discard(), cluster) + Expect(err).NotTo(HaveOccurred()) + }) + + When("no incompatible processes are reported", func() { + BeforeEach(func() { + clusterReconciler.EnableRestartIncompatibleProcesses = true + adminClient, err := newMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) + adminClient.frozenStatus = &fdbv1beta2.FoundationDBStatus{ + Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ + IncompatibleConnections: []string{}, + }, + } + }) + + It("should have no deletions", func() { + pods := &corev1.PodList{} + err := k8sClient.List(context.TODO(), pods, getListOptions(cluster)...) + Expect(err).NotTo(HaveOccurred()) + Expect(len(pods.Items)).To(BeNumerically("==", initialCount)) + }) + }) + + When("no matching incompatible processes are reported", func() { + BeforeEach(func() { + clusterReconciler.EnableRestartIncompatibleProcesses = true + adminClient, err := newMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) + adminClient.frozenStatus = &fdbv1beta2.FoundationDBStatus{ + Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ + IncompatibleConnections: []string{ + "192.192.192.192", + }, + }, + } + }) + + It("should have no deletions", func() { + pods := &corev1.PodList{} + err := k8sClient.List(context.TODO(), pods, getListOptions(cluster)...) + Expect(err).NotTo(HaveOccurred()) + Expect(len(pods.Items)).To(BeNumerically("==", initialCount)) + }) + }) + + When("matching incompatible processes are reported and the subreconciler is enabled", func() { + BeforeEach(func() { + clusterReconciler.EnableRestartIncompatibleProcesses = true + adminClient, err := newMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) + adminClient.frozenStatus = &fdbv1beta2.FoundationDBStatus{ + Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ + IncompatibleConnections: []string{ + cluster.Status.ProcessGroups[0].Addresses[0], + }, + }, + } + }) + + It("should have one deletion", func() { + pods := &corev1.PodList{} + err := k8sClient.List(context.TODO(), pods, getListOptions(cluster)...) + Expect(err).NotTo(HaveOccurred()) + Expect(len(pods.Items)).To(BeNumerically("==", initialCount-1)) + }) + }) + + When("matching incompatible processes are reported and the subreconciler is disabled", func() { + BeforeEach(func() { + clusterReconciler.EnableRestartIncompatibleProcesses = false + adminClient, err := newMockAdminClientUncast(cluster, k8sClient) + Expect(err).NotTo(HaveOccurred()) + adminClient.frozenStatus = &fdbv1beta2.FoundationDBStatus{ + Cluster: fdbv1beta2.FoundationDBStatusClusterInfo{ + IncompatibleConnections: []string{ + cluster.Status.ProcessGroups[0].Addresses[0], + }, + }, + } + }) + + It("should have no deletions", func() { + pods := &corev1.PodList{} + err := k8sClient.List(context.TODO(), pods, getListOptions(cluster)...) + Expect(err).NotTo(HaveOccurred()) + Expect(len(pods.Items)).To(BeNumerically("==", initialCount)) + }) + }) + }) +}) diff --git a/controllers/update_pod_config.go b/controllers/update_pod_config.go index efe6473e9..dcd74fd60 100644 --- a/controllers/update_pod_config.go +++ b/controllers/update_pod_config.go @@ -52,6 +52,7 @@ func (updatePodConfig) reconcile(ctx context.Context, r *FoundationDBClusterReco podMap := internal.CreatePodMap(cluster, pods) allSynced := true + delayedRequeue := true hasUpdate := false var errs []error // We try to update all process groups and if we observe an error we add it to the error list. @@ -109,11 +110,15 @@ func (updatePodConfig) reconcile(ctx context.Context, r *FoundationDBClusterReco } if internal.IsNetworkError(err) && processGroup.GetConditionTime(fdbv1beta2.SidecarUnreachable) == nil { + curLogger.Info("process group sidecar is not reachable") processGroup.UpdateCondition(fdbv1beta2.SidecarUnreachable, true, cluster.Status.ProcessGroups, processGroup.ProcessGroupID) hasUpdate = true } else if processGroup.GetConditionTime(fdbv1beta2.IncorrectConfigMap) == nil { processGroup.UpdateCondition(fdbv1beta2.IncorrectConfigMap, true, cluster.Status.ProcessGroups, processGroup.ProcessGroupID) hasUpdate = true + // If we are still waiting for a ConfigMap update we should not delay the requeue to ensure all processes are bounced + // at the same time. If the process is unreachable e.g. has the SidecarUnreachable status we can delay the requeue. + delayedRequeue = false } pod.ObjectMeta.Annotations[fdbv1beta2.OutdatedConfigMapKey] = fmt.Sprintf("%d", time.Now().Unix()) @@ -155,7 +160,7 @@ func (updatePodConfig) reconcile(ctx context.Context, r *FoundationDBClusterReco // If we return an error we don't requeue // So we just return that we can't continue but don't have an error if !allSynced { - return &requeue{message: "Waiting for Pod to receive ConfigMap update", delay: podSchedulingDelayDuration} + return &requeue{message: "Waiting for Pod to receive ConfigMap update", delay: podSchedulingDelayDuration, delayedRequeue: delayedRequeue} } return nil diff --git a/controllers/update_pods.go b/controllers/update_pods.go index f4d0f6f6e..7413167f4 100644 --- a/controllers/update_pods.go +++ b/controllers/update_pods.go @@ -146,16 +146,16 @@ func (updatePods) reconcile(ctx context.Context, r *FoundationDBClusterReconcile } } + if len(updates) == 0 { + return nil + } + adminClient, err := r.getDatabaseClientProvider().GetAdminClient(cluster, r.Client) if err != nil { return &requeue{curError: err} } defer adminClient.Close() - if len(updates) == 0 { - return nil - } - return deletePodsForUpdates(ctx, r, cluster, adminClient, updates, logger) } @@ -219,7 +219,7 @@ func deletePodsForUpdates(ctx context.Context, r *FoundationDBClusterReconciler, } // Only lock the cluster if we are not running in the delete "All" mode. - // Otherwise we want to delete all Pods and don't require a lock to sync with other clusters. + // Otherwise, we want to delete all Pods and don't require a lock to sync with other clusters. if deletionMode != fdbv1beta2.PodUpdateModeAll { hasLock, err := r.takeLock(cluster, "updating pods") if !hasLock { diff --git a/controllers/update_status.go b/controllers/update_status.go index 865cea236..c220671c3 100644 --- a/controllers/update_status.go +++ b/controllers/update_status.go @@ -94,6 +94,7 @@ func (updateStatus) reconcile(ctx context.Context, r *FoundationDBClusterReconci } } + versionMap := map[string]int{} for _, process := range databaseStatus.Cluster.Processes { processID, ok := process.Locality["process_id"] // if the processID is not set we fall back to the instanceID @@ -101,8 +102,16 @@ func (updateStatus) reconcile(ctx context.Context, r *FoundationDBClusterReconci processID = process.Locality["instance_id"] } processMap[processID] = append(processMap[processID], process) + versionMap[process.Version]++ } + // Update the running version based on the reported version of the FDB processes + version, err := getRunningVersion(versionMap, cluster.Status.RunningVersion) + if err != nil { + return &requeue{curError: err} + } + cluster.Status.RunningVersion = version + status.HasListenIPsForAllPods = cluster.NeedsExplicitListenAddress() status.DatabaseConfiguration = databaseStatus.Cluster.DatabaseConfiguration.NormalizeConfigurationWithSeparatedProxies(cluster.Spec.Version, cluster.Spec.DatabaseConfiguration.AreSeparatedProxiesConfigured()) // Removing excluded servers as we don't want them during comparison. @@ -336,6 +345,7 @@ func tryConnectionOptions(cluster *fdbv1beta2.FoundationDBCluster, r *Foundation "version", version, "connectionString", connectionString) } } + return originalVersion, originalConnectionString, nil } @@ -685,3 +695,35 @@ func refreshProcessGroupStatus(ctx context.Context, r *FoundationDBClusterReconc return nil } + +func getRunningVersion(versionMap map[string]int, fallback string) (string, error) { + if len(versionMap) == 0 { + return fallback, nil + } + + var currentCandidate fdbv1beta2.Version + var currentMaxCount int + + for version, count := range versionMap { + if count < currentMaxCount { + continue + } + + parsedVersion, err := fdbv1beta2.ParseFdbVersion(version) + if err != nil { + return fallback, err + } + // In this case we want to ensure we always pick the newer version to have a stable return value. Otherwise, + // it could happen that the version will be flapping between two versions. + if count == currentMaxCount { + if currentCandidate.IsAtLeast(parsedVersion) { + continue + } + } + + currentCandidate = parsedVersion + currentMaxCount = count + } + + return currentCandidate.String(), nil +} diff --git a/controllers/update_status_test.go b/controllers/update_status_test.go index 3e3e28f0d..01cad27f9 100644 --- a/controllers/update_status_test.go +++ b/controllers/update_status_test.go @@ -465,4 +465,20 @@ var _ = Describe("update_status", func() { }) }) }) + + DescribeTable("when getting the running version from the running processes", func(versionMap map[string]int, fallback string, expected string) { + Expect(getRunningVersion(versionMap, fallback)).To(Equal(expected)) + }, + Entry("when nearly all processes running on the new version", map[string]int{ + "7.1.11": 1, + "7.1.15": 99, + }, "0", "7.1.15"), + Entry("when all processes running on the same version", map[string]int{ + "7.1.15": 100, + }, "0", "7.1.15"), + Entry("when half of the processes running the old/new version", map[string]int{ + "7.1.11": 50, + "7.1.15": 50, + }, "0", "7.1.15"), + Entry("when the versionMap is empty", map[string]int{}, "7.1.15", "7.1.15")) }) diff --git a/docs/changelog/v1.8.1.md b/docs/changelog/v1.8.1.md new file mode 100644 index 000000000..c7b9d8b60 --- /dev/null +++ b/docs/changelog/v1.8.1.md @@ -0,0 +1,10 @@ +# v1.8.1 + +## Changes + +### Operator + +- Make upgrades more reliable [#1283](https://github.com/FoundationDB/fdb-kubernetes-operator/pull/1283) +- Unblock incompatible processes reconciler [#1306](https://github.com/FoundationDB/fdb-kubernetes-operator/pull/1306) +- Ignore process groups if they have the sidecar unreachable condition in the bounce sub reconciler [#1313](https://github.com/FoundationDB/fdb-kubernetes-operator/pull/1313) +- Correct error check for timeout [#1311](https://github.com/FoundationDB/fdb-kubernetes-operator/pull/1311) diff --git a/docs/manual/technical_design.md b/docs/manual/technical_design.md index 5830b3523..ea46cc945 100644 --- a/docs/manual/technical_design.md +++ b/docs/manual/technical_design.md @@ -88,30 +88,31 @@ See the [LockOptions](../cluster_spec.md#LockOptions) documentation for more opt The cluster reconciler runs the following subreconcilers: -1. UpdateStatus -1. UpdateLockConfiguration -1. UpdateConfigMap -1. CheckClientCompatibility -1. ReplaceMisconfiguredProcessGroups -1. ReplaceFailedProcessGroups -1. DeletePodsForBuggification -1. AddProcessGroups -1. AddServices -1. AddPVCs -1. AddPods -1. GenerateInitialClusterFile -1. UpdateSidecarVersions -1. UpdatePodConfig -1. UpdateLabels -1. UpdateDatabaseConfiguration -1. ChooseRemovals -1. ExcludeProcesses -1. ChangeCoordinators -1. BounceProcesses -1. UpdatePods -1. RemoveServices -1. RemoveProcessGroups -1. UpdateStatus (again) +1. [UpdateStatus](#updatestatus) +1. [UpdateLockConfiguration](#updatelockconfiguration) +1. [UpdateConfigMap](#updateconfigmap) +1. [CheckClientCompatibility](#checkclientcompatibility) +1. [DeletePodsForBuggification](#deletepodsforbuggification) +1. [ReplaceMisconfiguredProcessGroups](#replacemisconfiguredprocessgroups) +1. [ReplaceFailedProcessGroups](#replacefailedprocessGroups) +1. [AddProcessGroups](#addprocessgroups) +1. [AddServices](#addservices) +1. [AddPVCs](#addpvcs) +1. [AddPods](#addpods) +1. [GenerateInitialClusterFile](#generateinitialclusterFile) +1. [RemoveIncompatibleProcesses](#removeincompatibleprocesses) +1. [UpdateSidecarVersions](#updatesidecarversions) +1. [UpdatePodConfig](#updatepodconfig) +1. [UpdateLabels](#updatelabels) +1. [UpdateDatabaseConfiguration](#updatedatabaseconfiguration) +1. [ChooseRemovals](#chooseremovals) +1. [ExcludeProcesses](#excludeprocesses) +1. [ChangeCoordinators](#changecoordinators) +1. [BounceProcesses](#bounceprocesses) +1. [UpdatePods](#updatepods) +1. [RemoveProcessGroups](#removeprocessgroups) +1. [RemoveServices](#removeservices) +1. [UpdateStatus (again)](#updatestatus) ### Tracking Reconciliation Stages @@ -139,6 +140,12 @@ The `CheckClientCompatibility` subreconciler is used during upgrades to ensure t You can skip this check by setting the `ignoreUpgradabilityChecks` flag in the cluster spec. +### DeletePodsForBuggification + +The `DeletePodsForBuggification` subreconciler deletes pods that need to be recreated in order to set buggification options. These options are set through the `buggify` section in the cluster spec. + +When pods are deleted for buggification, we apply fewer safety checks, and buggification will often put the cluster in an unhealthy state. + ### ReplaceMisconfiguredProcessGroups The `ReplaceMisconfiguredProcessGroups` subreconciler checks for process groups that need to be replaced in order to safely bring them up on a new configuration. The core action this subreconciler takes is setting the `removalTimestamp` field on the `ProcessGroup` in the cluster status. Later subreconcilers will do the work for handling the replacement, whether processes are marked for replacement through this subreconciler or another mechanism. @@ -151,12 +158,6 @@ The `ReplaceFailedProcessGroups` subreconciler checks for process groups that ne See the [Replacements and Deletions](replacements_and_deletions.md) document for more details on when we do these replacements. -### DeletePodsForBuggification - -The `DeletePodsForBuggification` subreconciler deletes pods that need to be recreated in order to set buggification options. These options are set through the `buggify` section in the cluster spec. - -When pods are deleted for buggification, we apply fewer safety checks, and buggification will often put the cluster in an unhealthy state. - ### AddProcessGroups The `AddProcessGroups` subreconciler compares the desired process counts, calculated from the cluster spec, with the number of process groups in the cluster status. If the spec requires any additional process groups, this step will add them to the status. It will not create resources, and will mark the new process groups with conditions that indicate they are missing resources. @@ -177,6 +178,12 @@ The `AddPods` subreconciler creates any pods that are required for the cluster. The `GenerateInitialClusterFile` creates the cluster file for the cluster. If the cluster already has a cluster file, this will take no action. The cluster file is the service discovery mechanism for the cluster. It includes addresses for coordinator processes, which are chosen statically. The coordinators are used to elect the cluster controller and inform servers and clients about which process is serving as cluster controller. The cluster file is stored in the `connectionString` field in the cluster status. You can manually specify the cluster file in the `seedConnectionString` field in the cluster spec. If both of these are blank, the operator will choose coordinators that satisfy the cluster's fault tolerance requirements. Coordinators cannot be chosen until the pods have been created and the processes have been assigned IP addresses, which by default comes from the pod's IP. Once the initial cluster file has been generated, we store it in the cluster status and requeue reconciliation so we can update the config map with the new cluster file. +### RemoveIncompatibleProcesses + +The `RemoveIncompatibleProcesses` subreconciler will check the FoundationDB cluster status for incompatible connections. +If the cluster has some incompatible connections the subreconciler will match those IP addresses with the process groups. +For matching process groups the subrecociler will delete the associated Pod and let it recreate with the new image. + ### UpdateSidecarVersions The `UpdateSidecarVersions` subreconciler updates the image for the `foundationdb-kubernetes-sidecar` container in each pod to match the `version` in the cluster spec. diff --git a/fdbclient/admin_client.go b/fdbclient/admin_client.go index d33da6665..db5f2b9ed 100644 --- a/fdbclient/admin_client.go +++ b/fdbclient/admin_client.go @@ -89,7 +89,6 @@ func NewCliAdminClient(cluster *fdbv1beta2.FoundationDBCluster, _ client.Client, if err != nil { return nil, err } - clusterFilePath := clusterFile.Name() defer clusterFile.Close() _, err = clusterFile.WriteString(cluster.Status.ConnectionString) @@ -101,7 +100,7 @@ func NewCliAdminClient(cluster *fdbv1beta2.FoundationDBCluster, _ client.Client, return nil, err } - return &cliAdminClient{Cluster: cluster, clusterFilePath: clusterFilePath, useClientLibrary: true, log: log}, nil + return &cliAdminClient{Cluster: cluster, clusterFilePath: clusterFile.Name(), useClientLibrary: true, log: log}, nil } // cliCommand describes a command that we are running against FDB. @@ -117,6 +116,9 @@ type cliCommand struct { // args provides alternative arguments in place of the exec command. args []string + + // timeout provides a way to overwrite the default cli timeout. + timeout time.Duration } // hasTimeoutArg determines whether a command accepts a timeout argument. @@ -140,6 +142,15 @@ func (command cliCommand) getClusterFileFlag() string { return "-C" } +// getTimeout returns the timeout for the command +func (command cliCommand) getTimeout() int { + if command.timeout != 0 { + return int(command.timeout.Seconds()) + } + + return DefaultCLITimeout +} + // getBinaryPath generates the path to an FDB binary. func getBinaryPath(binaryName string, version string) string { parsed, _ := fdbv1beta2.ParseFdbVersion(version) @@ -162,7 +173,7 @@ func (client *cliAdminClient) runCommand(command cliCommand) (string, error) { } binary := getBinaryPath(binaryName, version) - hardTimeout := DefaultCLITimeout + hardTimeout := command.getTimeout() args := make([]string, 0, 9) args = append(args, command.args...) if len(args) == 0 { @@ -195,8 +206,8 @@ func (client *cliAdminClient) runCommand(command cliCommand) (string, error) { } if command.hasTimeoutArg() { - args = append(args, "--timeout", strconv.Itoa(DefaultCLITimeout)) - hardTimeout += DefaultCLITimeout + args = append(args, "--timeout", strconv.Itoa(command.getTimeout())) + hardTimeout += command.getTimeout() } timeoutContext, cancelFunction := context.WithTimeout(context.Background(), time.Second*time.Duration(hardTimeout)) defer cancelFunction() @@ -210,6 +221,14 @@ func (client *cliAdminClient) runCommand(command cliCommand) (string, error) { if canCast { client.log.Error(exitError, "Error from FDB command", "namespace", client.Cluster.Namespace, "cluster", client.Cluster.Name, "code", exitError.ProcessState.ExitCode(), "stdout", string(output), "stderr", string(exitError.Stderr)) } + + // If we hit a timeout report it as a timeout error + if strings.Contains(string(output), "Specified timeout reached") { + // See: https://apple.github.io/foundationdb/api-error-codes.html + // 1031: Operation aborted because the transaction timed out + return "", fdbv1beta2.TimeoutError{Err: err} + } + return "", err } @@ -226,37 +245,88 @@ func (client *cliAdminClient) runCommand(command cliCommand) (string, error) { return outputString, nil } -// GetStatus gets the database's status -func (client *cliAdminClient) GetStatus() (*fdbv1beta2.FoundationDBStatus, error) { - adminClientMutex.Lock() - defer adminClientMutex.Unlock() +// runCommandWithBackoff is a wrapper around runCommand which allows retrying commands if they hit a timeout. +func (client *cliAdminClient) runCommandWithBackoff(command string) (string, error) { + maxTimeoutInSeconds := 40 + currentTimeoutInSeconds := DefaultCLITimeout + + var rawResult string + var err error + + // This method will be retrying to get the status if a timeout is seen. The timeout will be doubled everytime we try + // it with the default timeout of 10 we will try it 3 times with the following timeouts: 10s - 20s - 40s. We have + // seen that during upgrades of version incompatible version, when not all coordinators are properly restarted that + // the response time will be increased. + for currentTimeoutInSeconds <= maxTimeoutInSeconds { + rawResult, err = client.runCommand(cliCommand{command: command, timeout: time.Duration(currentTimeoutInSeconds) * time.Second}) + if err == nil { + break + } + + if _, ok := err.(fdbv1beta2.TimeoutError); ok { + client.log.Info("timeout issue will retry with higher timeout") + currentTimeoutInSeconds *= 2 + continue + } + + // If any error other than a timeout happens return this error and don't retry. + return "", err + } + + return rawResult, err +} + +func (client *cliAdminClient) getStatus(useClientLibrary bool) (*fdbv1beta2.FoundationDBStatus, error) { var contents []byte var err error - if client.useClientLibrary { + + if useClientLibrary { // This will call directly the database and fetch the status information // from the system key space. contents, err = getStatusFromDB(client.Cluster, client.log) - if err != nil { - return nil, err - } } else { - var rawResult, filteredJSON string - rawResult, err = client.runCommand(cliCommand{command: "status json"}) - if err != nil { - return nil, err - } - filteredJSON, err = internal.RemoveWarningsInJSON(rawResult) + var rawResult string + rawResult, err = client.runCommandWithBackoff("status json") if err != nil { return nil, err } - contents = []byte(filteredJSON) + + contents, err = internal.RemoveWarningsInJSON(rawResult) + } + + if err != nil { + return nil, err } + status := &fdbv1beta2.FoundationDBStatus{} err = json.Unmarshal(contents, status) if err != nil { return nil, err } client.log.V(1).Info("Fetched status JSON", "status", status) + + return status, nil +} + +// GetStatus gets the database's status +func (client *cliAdminClient) GetStatus() (*fdbv1beta2.FoundationDBStatus, error) { + adminClientMutex.Lock() + defer adminClientMutex.Unlock() + + status, err := client.getStatus(client.useClientLibrary) + if err != nil { + return nil, err + } + + // There is a limitation in the multi version client if the cluster is only partially upgraded e.g. because not + // all fdbserver processes are restarted, then the multi version client sometimes picks the wrong version + // to connect to the cluster. This will result in an empty status only reporting the unreachable coordinators. + // In this case we want to fall back to use fdbcli which is version specific and will work. + if len(status.Cluster.Processes) == 0 && client.useClientLibrary && client.Cluster.Status.Configured { + client.log.Info("retry fetching status with fdbcli instead of using the client library") + return client.getStatus(false) + } + return status, nil } @@ -505,8 +575,9 @@ func (client *cliAdminClient) KillProcesses(addresses []fdbv1beta2.ProcessAddres if len(addresses) == 0 { return nil } + _, err := client.runCommand(cliCommand{command: fmt.Sprintf( - "kill; kill %s; status", + "kill; kill %[1]s; sleep 1; kill %[1]s; sleep 1; kill %[1]s", fdbv1beta2.ProcessAddressesStringWithoutFlags(addresses, " "), )}) return err @@ -534,33 +605,37 @@ func (client *cliAdminClient) ChangeCoordinators(addresses []fdbv1beta2.ProcessA return connectionString.String(), nil } +// cleanConnectionStringOutput is a helper method to remove unrelated output from the get command in the connection string +// output. +func cleanConnectionStringOutput(input string) string { + startIdx := strings.LastIndex(input, "`") + endIdx := strings.LastIndex(input, "'") + if startIdx == -1 && endIdx == -1 { + return input + } + + return input[startIdx+1 : endIdx] +} + // GetConnectionString fetches the latest connection string. func (client *cliAdminClient) GetConnectionString() (string, error) { - var connectionStringBytes []byte + var output string var err error if client.Cluster.UseManagementAPI() { // This will call directly the database and fetch the connection string // from the system key space. - connectionStringBytes, err = getConnectionStringFromDB(client.Cluster) + var outputBytes []byte + outputBytes, err = getConnectionStringFromDB(client.Cluster, client.log) + output = string(outputBytes) } else { - var output string - output, err = client.runCommand(cliCommand{command: "status minimal"}) - if err != nil { - return "", err - } - - if !strings.Contains(output, "The database is available") { - return "", fmt.Errorf("unable to fetch connection string: %s", output) - } - - connectionStringBytes, err = os.ReadFile(client.clusterFilePath) + output, err = client.runCommandWithBackoff("option on ACCESS_SYSTEM_KEYS; get \xff/coordinators") } if err != nil { return "", err } var connectionString fdbv1beta2.ConnectionString - connectionString, err = fdbv1beta2.ParseConnectionString(string(connectionStringBytes)) + connectionString, err = fdbv1beta2.ParseConnectionString(cleanConnectionStringOutput(output)) if err != nil { return "", err } @@ -685,12 +760,12 @@ func (client *cliAdminClient) GetBackupStatus() (*fdbv1beta2.FoundationDBLiveBac } status := &fdbv1beta2.FoundationDBLiveBackupStatus{} - statusString, err = internal.RemoveWarningsInJSON(statusString) + statusBytes, err := internal.RemoveWarningsInJSON(statusString) if err != nil { return nil, err } - err = json.Unmarshal([]byte(statusString), &status) + err = json.Unmarshal(statusBytes, &status) if err != nil { return nil, err } diff --git a/fdbclient/admin_client_test.go b/fdbclient/admin_client_test.go index 0ffd1788f..387078204 100644 --- a/fdbclient/admin_client_test.go +++ b/fdbclient/admin_client_test.go @@ -178,4 +178,23 @@ var _ = Describe("admin_client_test", func() { ), ) }) + + When("parsing the connection string", func() { + DescribeTable("it should return the correct connection string", + func(input string, expected string) { + connectingString, err := fdbv1beta2.ParseConnectionString(cleanConnectionStringOutput(input)) + Expect(err).NotTo(HaveOccurred()) + Expect(connectingString.String()).To(Equal(expected)) + }, + Entry("with a correct response from FDB", + ">>> option on ACCESS_SYSTEM_KEYS\\nOption enabled for all transactions\\n>>> get \\\\xff/coordinators\\n`\\\\xff/coordinators' is `fdb_cluster_52v1bpr8:rhUbBjrtyweZBQO1U3Td81zyP9d46yEh@100.82.81.253:4500:tls,100.82.71.5:4500:tls,100.82.119.151:4500:tls,100.82.122.125:4500:tls,100.82.76.240:4500:tls'\\n", + "fdb_cluster_52v1bpr8:rhUbBjrtyweZBQO1U3Td81zyP9d46yEh@100.82.81.253:4500:tls,100.82.71.5:4500:tls,100.82.119.151:4500:tls,100.82.122.125:4500:tls,100.82.76.240:4500:tls", + ), + + Entry("without the byte string response", + "fdb_cluster_52v1bpr8:rhUbBjrtyweZBQO1U3Td81zyP9d46yEh@100.82.81.253:4500:tls,100.82.71.5:4500:tls,100.82.119.151:4500:tls,100.82.122.125:4500:tls,100.82.76.240:4500:tls", + "fdb_cluster_52v1bpr8:rhUbBjrtyweZBQO1U3Td81zyP9d46yEh@100.82.81.253:4500:tls,100.82.71.5:4500:tls,100.82.119.151:4500:tls,100.82.122.125:4500:tls,100.82.76.240:4500:tls", + ), + ) + }) }) diff --git a/fdbclient/common.go b/fdbclient/common.go index 597dae4cc..371939091 100644 --- a/fdbclient/common.go +++ b/fdbclient/common.go @@ -72,11 +72,16 @@ func getFDBDatabase(cluster *fdbv1beta2.FoundationDBCluster) (fdb.Database, erro return database, nil } -func getValueFromDBUsingKey(cluster *fdbv1beta2.FoundationDBCluster, fdbKey string, extraTimeout int64) ([]byte, error) { +func getValueFromDBUsingKey(cluster *fdbv1beta2.FoundationDBCluster, log logr.Logger, fdbKey string, extraTimeout int64) ([]byte, error) { + log.Info("Fetch values from FDB", "namespace", cluster.Namespace, "cluster", cluster.Name, "key", fdbKey) + defer func() { + log.Info("Done fetching values from FDB", "namespace", cluster.Namespace, "cluster", cluster.Name, "key", fdbKey) + }() database, err := getFDBDatabase(cluster) if err != nil { return nil, err } + result, err := database.Transact(func(transaction fdb.Transaction) (interface{}, error) { err := transaction.Options().SetAccessSystemKeys() if err != nil { @@ -96,6 +101,14 @@ func getValueFromDBUsingKey(cluster *fdbv1beta2.FoundationDBCluster, fdbKey stri }) if err != nil { + if fdbError, ok := err.(fdb.Error); ok { + // See: https://apple.github.io/foundationdb/api-error-codes.html + // 1031: Operation aborted because the transaction timed out + if fdbError.Code == 1031 { + return nil, fdbv1beta2.TimeoutError{Err: err} + } + } + return nil, err } @@ -107,14 +120,13 @@ func getValueFromDBUsingKey(cluster *fdbv1beta2.FoundationDBCluster, fdbKey stri } // getConnectionStringFromDB gets the database's connection string directly from the system key -func getConnectionStringFromDB(cluster *fdbv1beta2.FoundationDBCluster) ([]byte, error) { - return getValueFromDBUsingKey(cluster, "\xff\xff/connection_string", 1) +func getConnectionStringFromDB(cluster *fdbv1beta2.FoundationDBCluster, log logr.Logger) ([]byte, error) { + return getValueFromDBUsingKey(cluster, log, "\xff/coordinators", 1) } // getStatusFromDB gets the database's status directly from the system key func getStatusFromDB(cluster *fdbv1beta2.FoundationDBCluster, log logr.Logger) ([]byte, error) { - log.Info("Fetch status from FDB", "namespace", cluster.Namespace, "cluster", cluster.Name) - return getValueFromDBUsingKey(cluster, "\xff\xff/status/json", 1000) + return getValueFromDBUsingKey(cluster, log, "\xff\xff/status/json", 1000) } type realDatabaseClientProvider struct { diff --git a/internal/error_helper.go b/internal/error_helper.go index 0961581e7..5901e5b2b 100644 --- a/internal/error_helper.go +++ b/internal/error_helper.go @@ -3,7 +3,7 @@ * * This source file is part of the FoundationDB open source project * - * Copyright 2021 Apple Inc. and the FoundationDB project authors + * Copyright 2021-2022 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,8 @@ import ( "net" "strings" + fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2" + k8serrors "k8s.io/apimachinery/pkg/api/errors" ) @@ -44,7 +46,7 @@ func IsNetworkError(err error) bool { // IsTimeoutError returns true if the observed error was a timeout error func IsTimeoutError(err error) bool { for err != nil { - if strings.Contains(err.Error(), "Specified timeout reached") { + if _, ok := err.(fdbv1beta2.TimeoutError); ok { return true } diff --git a/internal/error_helper_test.go b/internal/error_helper_test.go index e873fc470..6e7b74444 100644 --- a/internal/error_helper_test.go +++ b/internal/error_helper_test.go @@ -3,7 +3,7 @@ * * This source file is part of the FoundationDB open source project * - * Copyright 2021 Apple Inc. and the FoundationDB project authors + * Copyright 2021-2022 Apple Inc. and the FoundationDB project authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,8 @@ import ( "fmt" "net" + fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/api/v1beta2" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" @@ -85,11 +87,39 @@ var _ = Describe("Internal error helper", func() { err: apierrors.NewForbidden(schema.GroupResource{}, "test", fmt.Errorf("not allowed")), expected: false, }), - Entry("simple errorr", + Entry("simple error", testCase{ err: fmt.Errorf("error"), expected: false, }), ) }) + + When("checking if an error is a timeout error", func() { + type testCase struct { + err error + expected bool + } + + DescribeTable("it should detect the timeout error", + func(tc testCase) { + Expect(IsTimeoutError(tc.err)).To(Equal(tc.expected)) + }, + Entry("simple error", + testCase{ + err: fmt.Errorf("test"), + expected: false, + }), + Entry("simple timeout error", + testCase{ + err: fdbv1beta2.TimeoutError{Err: fmt.Errorf("not reachable")}, + expected: true, + }), + Entry("wrapped timeout error", + testCase{ + err: fmt.Errorf("test : %w", fdbv1beta2.TimeoutError{Err: fmt.Errorf("not reachable")}), + expected: true, + }), + ) + }) }) diff --git a/internal/fdb_status_helper.go b/internal/fdb_status_helper.go index 9cacc8c6f..02f722318 100644 --- a/internal/fdb_status_helper.go +++ b/internal/fdb_status_helper.go @@ -27,11 +27,11 @@ import ( // RemoveWarningsInJSON removes any warning messages that might appear in the status output from the fdbcli and returns // the JSON output without the warning message. -func RemoveWarningsInJSON(jsonString string) (string, error) { +func RemoveWarningsInJSON(jsonString string) ([]byte, error) { idx := strings.Index(jsonString, "{") if idx == -1 { - return "", fmt.Errorf("the JSON string doesn't contain a starting '{'") + return nil, fmt.Errorf("the JSON string doesn't contain a starting '{'") } - return strings.TrimSpace(jsonString[idx:]), nil + return []byte(strings.TrimSpace(jsonString[idx:])), nil } diff --git a/internal/fdb_status_helper_test.go b/internal/fdb_status_helper_test.go index 8b195b30b..889fef016 100644 --- a/internal/fdb_status_helper_test.go +++ b/internal/fdb_status_helper_test.go @@ -31,7 +31,7 @@ var _ = Describe("fdb_status_helper_test", func() { When("Removing warnings in JSON", func() { type testCase struct { input string - expected string + expected []byte expectedErr error } @@ -53,7 +53,7 @@ var _ = Describe("fdb_status_helper_test", func() { Entry("Valid JSON without warning", testCase{ input: "{}", - expected: "{}", + expected: []byte("{}"), expectedErr: nil, }, ), @@ -63,14 +63,14 @@ var _ = Describe("fdb_status_helper_test", func() { # Warning Slow response {}`, - expected: "{}", + expected: []byte("{}"), expectedErr: nil, }, ), Entry("Invalid JSON", testCase{ input: "}", - expected: "", + expected: nil, expectedErr: fmt.Errorf("the JSON string doesn't contain a starting '{'"), }, ), diff --git a/kubectl-fdb/cmd/exclusion_status.go b/kubectl-fdb/cmd/exclusion_status.go index 07e1907bd..aeacf4660 100644 --- a/kubectl-fdb/cmd/exclusion_status.go +++ b/kubectl-fdb/cmd/exclusion_status.go @@ -155,7 +155,7 @@ func getExclusionStatus(cmd *cobra.Command, restConfig *rest.Config, kubeClient } status := &fdbv1beta2.FoundationDBStatus{} - err = json.Unmarshal([]byte(res), status) + err = json.Unmarshal(res, status) if err != nil { // If an error occurs retry cmd.PrintErrln(err) diff --git a/mock-kubernetes-client/client/client.go b/mock-kubernetes-client/client/client.go index c9a521e90..a7276771c 100644 --- a/mock-kubernetes-client/client/client.go +++ b/mock-kubernetes-client/client/client.go @@ -65,7 +65,7 @@ func (client *MockClient) Clear() { // Scheme returns the runtime Scheme func (client *MockClient) Scheme() *runtime.Scheme { - return nil + return runtime.NewScheme() } // RESTMapper returns the RESTMapper diff --git a/setup/setup.go b/setup/setup.go index 45d255598..7998cddc5 100644 --- a/setup/setup.go +++ b/setup/setup.go @@ -50,24 +50,25 @@ var operatorVersion = "latest" // Options provides all configuration Options for the operator type Options struct { - MetricsAddr string - EnableLeaderElection bool - LeaderElectionID string - LogFile string - CliTimeout int - DeprecationOptions internal.DeprecationOptions - MaxConcurrentReconciles int - CleanUpOldLogFile bool - LogFileMinAge time.Duration - LogFileMaxSize int - LogFileMaxAge int - MaxNumberOfOldLogFiles int - CompressOldFiles bool - PrintVersion bool - LabelSelector string - WatchNamespace string - GetTimeout time.Duration - PostTimeout time.Duration + EnableLeaderElection bool + CleanUpOldLogFile bool + CompressOldFiles bool + PrintVersion bool + EnableRestartIncompatibleProcesses bool + MetricsAddr string + LeaderElectionID string + LogFile string + LabelSelector string + WatchNamespace string + CliTimeout int + MaxConcurrentReconciles int + LogFileMaxSize int + LogFileMaxAge int + MaxNumberOfOldLogFiles int + LogFileMinAge time.Duration + GetTimeout time.Duration + PostTimeout time.Duration + DeprecationOptions internal.DeprecationOptions } // BindFlags will parse the given flagset for the operator option flags @@ -94,6 +95,7 @@ func (o *Options) BindFlags(fs *flag.FlagSet) { fs.StringVar(&o.WatchNamespace, "watch-namespace", os.Getenv("WATCH_NAMESPACE"), "Defines which namespace the operator should watch") fs.DurationVar(&o.GetTimeout, "get-timeout", 5*time.Second, "http timeout for get requests to the FDB sidecar") fs.DurationVar(&o.PostTimeout, "post-timeout", 10*time.Second, "http timeout for post requests to the FDB sidecar") + fs.BoolVar(&o.EnableRestartIncompatibleProcesses, "enable-restart-incompatible-processes", true, "This flag enables/disables in the operator to restart incompatible fdbserver processes.") } // StartManager will start the FoundationDB operator manager. @@ -180,6 +182,7 @@ func StartManager( clusterReconciler.GetTimeout = operatorOpts.GetTimeout clusterReconciler.PostTimeout = operatorOpts.PostTimeout clusterReconciler.Log = logr.WithName("controllers").WithName("FoundationDBCluster") + clusterReconciler.EnableRestartIncompatibleProcesses = operatorOpts.EnableRestartIncompatibleProcesses if err := clusterReconciler.SetupWithManager(mgr, operatorOpts.MaxConcurrentReconciles, *labelSelector, watchedObjects...); err != nil { setupLog.Error(err, "unable to create controller", "controller", "FoundationDBCluster")