Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow the remove sub command to remove by process class #1935

Merged
merged 2 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kubectl-fdb/cmd/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func analyzeCluster(cmd *cobra.Command, kubeClient client.Client, cluster *fdbv1
confirmed := false

if len(failedProcessGroups) > 0 {
err := replaceProcessGroups(kubeClient, cluster.Name, failedProcessGroups, cluster.Namespace, "", true, wait, false, true)
err := replaceProcessGroups(kubeClient, cluster.Name, failedProcessGroups, cluster.Namespace, "", "", true, wait, false, true)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion kubectl-fdb/cmd/cordon.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func cordonNode(cmd *cobra.Command, kubeClient client.Client, inputClusterName s
processGroups = append(processGroups, processGroup)
}
}
err = replaceProcessGroups(kubeClient, cluster.Name, processGroups, namespace, "", withExclusion, wait, false, true)
err = replaceProcessGroups(kubeClient, cluster.Name, processGroups, namespace, "", "", withExclusion, wait, false, true)
if err != nil {
internalErr := fmt.Sprintf("unable to cordon all Pods for cluster %s\n", cluster.Name)
errors = append(errors, internalErr)
Expand Down
14 changes: 14 additions & 0 deletions kubectl-fdb/cmd/cordon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,20 @@ func createPods(clusterName string, namespace string) error {
NodeName: "node-2",
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-instance-3", clusterName),
Namespace: namespace,
Labels: map[string]string{
fdbv1beta2.FDBProcessClassLabel: string(fdbv1beta2.ProcessClassStateless),
fdbv1beta2.FDBClusterLabel: clusterName,
fdbv1beta2.FDBProcessGroupIDLabel: fmt.Sprintf("%s-instance-3", clusterName),
},
},
Spec: corev1.PodSpec{
NodeName: "node-3",
},
},
}

for _, pod := range pods {
Expand Down
12 changes: 12 additions & 0 deletions kubectl-fdb/cmd/k8s_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,18 @@ func getProcessGroupIDsFromPodName(cluster *fdbv1beta2.FoundationDBCluster, podN
return processGroupIDs, nil
}

// getProcessGroupIdsWithClass returns a list of ProcessGroupIDs in the given cluster which are of the given processClass
func getProcessGroupIdsWithClass(cluster *fdbv1beta2.FoundationDBCluster, processClass string) []fdbv1beta2.ProcessGroupID {
matchingProcessGroupIDs := []fdbv1beta2.ProcessGroupID{}
for _, processGroup := range cluster.Status.ProcessGroups {
if processGroup.ProcessClass != fdbv1beta2.ProcessClass(processClass) {
continue
}
matchingProcessGroupIDs = append(matchingProcessGroupIDs, processGroup.ProcessGroupID)
}
return matchingProcessGroupIDs
}

// fetchProcessGroupsCrossCluster fetches the list of process groups matching the given podNames and returns the
// processGroupIDs mapped by clusterName matching the given clusterLabel.
func fetchProcessGroupsCrossCluster(kubeClient client.Client, namespace string, clusterLabel string, podNames ...string) (map[*fdbv1beta2.FoundationDBCluster][]fdbv1beta2.ProcessGroupID, error) {
Expand Down
40 changes: 29 additions & 11 deletions kubectl-fdb/cmd/remove_process_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func newRemoveProcessGroupCmd(streams genericclioptions.IOStreams) *cobra.Comman

cmd := &cobra.Command{
Use: "process-groups",
Short: "Adds a process group (or multiple) to the remove list of the given cluster",
Long: "Adds a process group (or multiple) to the remove list field of the given cluster",
Short: "Adds a process group (or multiple) to the remove list of the given cluster, or matching given pod names",
Long: "Adds a process group (or multiple) to the remove list field of the given cluster, or matching given pod names",
RunE: func(cmd *cobra.Command, args []string) error {
wait, err := cmd.Root().Flags().GetBool("wait")
if err != nil {
Expand All @@ -50,6 +50,10 @@ func newRemoveProcessGroupCmd(streams genericclioptions.IOStreams) *cobra.Comman
if err != nil {
return err
}
processClass, err := cmd.Flags().GetString("process-class")
if err != nil {
return err
}
withExclusion, err := cmd.Flags().GetBool("exclusion")
if err != nil {
return err
Expand All @@ -73,7 +77,7 @@ func newRemoveProcessGroupCmd(streams genericclioptions.IOStreams) *cobra.Comman
return err
}

return replaceProcessGroups(kubeClient, cluster, args, namespace, clusterLabel, withExclusion, wait, removeAllFailed, useProcessGroupID)
return replaceProcessGroups(kubeClient, cluster, args, namespace, clusterLabel, processClass, withExclusion, wait, removeAllFailed, useProcessGroupID)
},
Example: `
# Remove process groups for a cluster in the current namespace
Expand All @@ -91,14 +95,18 @@ kubectl fdb -n default remove process-group --use-process-group-id -c cluster st

# Remove all failed process groups for a cluster (all process groups that have a missing process)
kubectl fdb -n default remove process-group -c cluster --remove-all-failed

# Remove all processes in the cluster that have the given process-class (incompatible with passing pod names or process group IDs)
kubectl fdb -n default remove process-group -c cluster --process-class="stateless"
`,
}

cmd.Flags().StringP("fdb-cluster", "c", "", "remove process groups from the provided cluster.")
cmd.Flags().String("process-class", "", "remove process groups matching the provided value in the provided cluster. Using this option ignores provided ids.")
cmd.Flags().StringP("cluster-label", "l", fdbv1beta2.FDBClusterLabel, "cluster label used to identify the cluster for a requested pod")
cmd.Flags().BoolP("exclusion", "e", true, "define if the process groups should be removed with exclusion.")
cmd.Flags().Bool("remove-all-failed", false, "define if all failed processes should be replaced.")
cmd.Flags().Bool("use-process-group-id", false, "define if the process-group should be used instead of the Pod name.")
cmd.Flags().StringP("cluster-label", "l", fdbv1beta2.FDBClusterLabel, "cluster label used to identify the cluster for a requested pod")

cmd.SetOut(o.Out)
cmd.SetErr(o.ErrOut)
Expand All @@ -109,10 +117,12 @@ kubectl fdb -n default remove process-group -c cluster --remove-all-failed
return cmd
}

// replaceProcessGroups adds process groups to the removal list of their respective clusters
// if a clusterName is specified, it will ONLY do so for the specified cluster
func replaceProcessGroups(kubeClient client.Client, clusterName string, ids []string, namespace string, clusterLabel string, withExclusion bool, wait bool, removeAllFailed bool, useProcessGroupID bool) error {
if len(ids) == 0 && !removeAllFailed {
// replaceProcessGroups adds process groups to the removal list of their respective clusters.
// If clusterName is specified, it will ONLY do so for the specified cluster.
// If processClass is specified, it will ignore the given ids and remove all processes in the given cluster whose pods
// have a processClassLabel matching the processClass.
func replaceProcessGroups(kubeClient client.Client, clusterName string, ids []string, namespace string, clusterLabel string, processClass string, withExclusion bool, wait bool, removeAllFailed bool, useProcessGroupID bool) error {
johscheuer marked this conversation as resolved.
Show resolved Hide resolved
if len(ids) == 0 && !removeAllFailed && processClass == "" {
return nil
}

Expand All @@ -139,14 +149,22 @@ func replaceProcessGroups(kubeClient client.Client, clusterName string, ids []st
}
return err
}
// In this case the user has Pod name specified

var processGroupIDs []fdbv1beta2.ProcessGroupID
if !useProcessGroupID {
if processClass != "" { // match against a whole process class, ignore provided ids
if len(ids) != 0 {
return fmt.Errorf("process identifiers were provided along with a processClass and would be ignored, please only provide one or the other")
nicmorales9 marked this conversation as resolved.
Show resolved Hide resolved
}
processGroupIDs = getProcessGroupIdsWithClass(cluster, processClass)
if len(processGroupIDs) == 0 {
return fmt.Errorf("found no processGroups of processClass '%s' in cluster %s", processClass, clusterName)
}
} else if !useProcessGroupID { // match by pod name
processGroupIDs, err = getProcessGroupIDsFromPodName(cluster, ids)
if err != nil {
return err
}
} else {
} else { // match by process group ID
for _, id := range ids {
processGroupIDs = append(processGroupIDs, fdbv1beta2.ProcessGroupID(id))
}
Expand Down
81 changes: 76 additions & 5 deletions kubectl-fdb/cmd/remove_process_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var _ = Describe("[plugin] remove process groups command", func() {

DescribeTable("should cordon all targeted processes",
func(tc testCase) {
err := replaceProcessGroups(k8sClient, clusterName, tc.Instances, namespace, "", tc.WithExclusion, false, tc.RemoveAllFailed, false)
err := replaceProcessGroups(k8sClient, clusterName, tc.Instances, namespace, "", "", tc.WithExclusion, false, tc.RemoveAllFailed, false)
Expect(err).NotTo(HaveOccurred())

var resCluster fdbv1beta2.FoundationDBCluster
Expand Down Expand Up @@ -119,7 +119,7 @@ var _ = Describe("[plugin] remove process groups command", func() {
When("adding the same process group to the removal list without exclusion", func() {
It("should add the process group to the removal without exclusion list", func() {
removals := []string{"test-storage-1"}
err := replaceProcessGroups(k8sClient, clusterName, removals, namespace, "", false, false, false, false)
err := replaceProcessGroups(k8sClient, clusterName, removals, namespace, "", "", false, false, false, false)
Expect(err).NotTo(HaveOccurred())

var resCluster fdbv1beta2.FoundationDBCluster
Expand All @@ -139,7 +139,7 @@ var _ = Describe("[plugin] remove process groups command", func() {
When("adding the same process group to the removal list", func() {
It("should add the process group to the removal without exclusion list", func() {
removals := []string{"test-storage-1"}
err := replaceProcessGroups(k8sClient, clusterName, removals, namespace, "", true, false, false, false)
err := replaceProcessGroups(k8sClient, clusterName, removals, namespace, "", "", true, false, false, false)
Expect(err).NotTo(HaveOccurred())

var resCluster fdbv1beta2.FoundationDBCluster
Expand Down Expand Up @@ -178,15 +178,15 @@ var _ = Describe("[plugin] remove process groups command", func() {
podNames []string
clusterNameFilter string // if used, then cross-cluster will not work
clusterLabel string
RemoveAllFailed bool
johscheuer marked this conversation as resolved.
Show resolved Hide resolved
clusterDataMap map[string]clusterData
wantErrorContains string
}

DescribeTable("should remove specified processes via clusterLabel and podName(s)",
func(tc testCase) {
err := replaceProcessGroups(k8sClient, tc.clusterNameFilter, tc.podNames, namespace, tc.clusterLabel, true, false, tc.RemoveAllFailed, false)
err := replaceProcessGroups(k8sClient, tc.clusterNameFilter, tc.podNames, namespace, tc.clusterLabel, "", true, false, false, false)
if tc.wantErrorContains != "" {
Expect(err).To(Not(BeNil()))
Expect(err.Error()).To(ContainSubstring(tc.wantErrorContains))
} else {
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -335,6 +335,77 @@ var _ = Describe("[plugin] remove process groups command", func() {
),
)
})
When("processes are specified via processClass", func() {
BeforeEach(func() {
cluster = generateClusterStruct(clusterName, namespace) // the status is overwritten by prior tests
Expect(createPods(clusterName, namespace)).NotTo(HaveOccurred())

})
type testCase struct {
ids []string // should be ignored when processClass is specified
clusterName string
processClass string
wantErrorContains string
ExpectedInstancesToRemove []fdbv1beta2.ProcessGroupID
}
DescribeTable("should remove specified processes via clusterLabel and podName(s)",
func(tc testCase) {
err := replaceProcessGroups(k8sClient, tc.clusterName, tc.ids, namespace, "", tc.processClass, true, false, false, false)
if tc.wantErrorContains != "" {
Expect(err).To(Not(BeNil()))
Expect(err.Error()).To(ContainSubstring(tc.wantErrorContains))
} else {
Expect(err).NotTo(HaveOccurred())
}

var resCluster fdbv1beta2.FoundationDBCluster
err = k8sClient.Get(context.Background(), client.ObjectKey{
Namespace: namespace,
Name: clusterName,
}, &resCluster)
Expect(err).NotTo(HaveOccurred())
Expect(tc.ExpectedInstancesToRemove).To(ContainElements(resCluster.Spec.ProcessGroupsToRemove))
Expect(len(tc.ExpectedInstancesToRemove)).To(BeNumerically("==", len(resCluster.Spec.ProcessGroupsToRemove)))
Expect(len(resCluster.Spec.ProcessGroupsToRemoveWithoutExclusion)).To(BeNumerically("==", 0))
},
Entry("errors when no process groups are found of the given class",
testCase{
processClass: "non-existent",
wantErrorContains: fmt.Sprintf("found no processGroups of processClass 'non-existent' in cluster %s", clusterName),
clusterName: clusterName,
ExpectedInstancesToRemove: []fdbv1beta2.ProcessGroupID{},
},
),
Entry("errors when ids are provided along with processClass",
testCase{
ids: []string{"ignored", "also-ignored"},
wantErrorContains: "provided along with a processClass and would be ignored",
processClass: string(fdbv1beta2.ProcessClassStateless),
clusterName: clusterName,
ExpectedInstancesToRemove: []fdbv1beta2.ProcessGroupID{},
},
),
Entry("removes singular matching process",
testCase{
processClass: string(fdbv1beta2.ProcessClassStateless),
clusterName: clusterName,
ExpectedInstancesToRemove: []fdbv1beta2.ProcessGroupID{
fdbv1beta2.ProcessGroupID(fmt.Sprintf("%s-instance-3", clusterName)),
},
},
),
Entry("removes multiple processes that match",
testCase{
processClass: string(fdbv1beta2.ProcessClassStorage),
clusterName: clusterName,
ExpectedInstancesToRemove: []fdbv1beta2.ProcessGroupID{
fdbv1beta2.ProcessGroupID(fmt.Sprintf("%s-instance-1", clusterName)),
fdbv1beta2.ProcessGroupID(fmt.Sprintf("%s-instance-2", clusterName)),
},
},
),
)
})
})
})
})
6 changes: 6 additions & 0 deletions kubectl-fdb/cmd/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,15 @@ func generateClusterStruct(name string, namespace string) *fdbv1beta2.Foundation
ProcessGroups: []*fdbv1beta2.ProcessGroupStatus{
{
ProcessGroupID: fdbv1beta2.ProcessGroupID(name + "-instance-1"),
ProcessClass: fdbv1beta2.ProcessClassStorage,
},
{
ProcessGroupID: fdbv1beta2.ProcessGroupID(name + "-instance-2"),
ProcessClass: fdbv1beta2.ProcessClassStorage,
},
{
ProcessGroupID: fdbv1beta2.ProcessGroupID(name + "-instance-3"),
ProcessClass: fdbv1beta2.ProcessClassStateless,
},
},
},
Expand Down
Loading