Skip to content

Commit

Permalink
fix rebase errors
Browse files Browse the repository at this point in the history
  • Loading branch information
njtran committed Nov 8, 2023
1 parent e5bb196 commit f374a5c
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 58 deletions.
5 changes: 0 additions & 5 deletions pkg/cloudprovider/fake/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,6 @@ const (
)

func init() {
v1alpha5.WellKnownLabels.Insert(
LabelInstanceSize,
ExoticInstanceLabelKey,
IntegerInstanceLabelKey,
)
v1beta1.WellKnownLabels.Insert(
LabelInstanceSize,
ExoticInstanceLabelKey,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command)
return c.StateNode
})
reason := fmt.Sprintf("%s/%s", m.Type(), cmd.Action())
if err := c.Queue.Add(ctx, stateNodes, cmd.replacements, reason, 5 * time.Second); err != nil {
if err := c.Queue.Add(ctx, stateNodes, cmd.replacements, reason, 5*time.Second); err != nil {
return fmt.Errorf("adding command to queue, %w", err)
}
disruptionActionsPerformedCounter.With(map[string]string{
Expand Down
12 changes: 6 additions & 6 deletions pkg/controllers/disruption/orchestration/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type Queue struct {
workqueue.RateLimitingInterface
// providerID -> command, maps a candidate to its command. Each command has a list of candidates that can be used
// to map to itself.
CandidateProviderIDToCommand map[string]*Command
ProviderIDToCommand map[string]*Command

kubeClient client.Client
recorder events.Recorder
Expand All @@ -103,7 +103,7 @@ func NewQueue(kubeClient client.Client, recorder events.Recorder, cluster *state
provisioner *provisioning.Provisioner) *Queue {
queue := &Queue{
RateLimitingInterface: workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(queueBaseDelay, queueMaxDelay)),
CandidateProviderIDToCommand: map[string]*Command{},
ProviderIDToCommand: map[string]*Command{},
kubeClient: kubeClient,
recorder: recorder,
cluster: cluster,
Expand Down Expand Up @@ -222,7 +222,7 @@ func (q *Queue) Add(ctx context.Context, candidates []*state.StateNode, replacem
cmd := NewCommand(nodeClaimKeys, candidates, reason, q.clock.Now().Add(delay))
q.RateLimitingInterface.AddAfter(cmd, delay)
for _, candidate := range candidates {
q.CandidateProviderIDToCommand[candidate.ProviderID()] = cmd
q.ProviderIDToCommand[candidate.ProviderID()] = cmd
}
return nil
}
Expand Down Expand Up @@ -298,7 +298,7 @@ func (q *Queue) WaitOrTerminate(ctx context.Context, cmd *Command) error {
func (q *Queue) CanAdd(ids ...string) error {
var err error
for _, id := range ids {
if _, ok := q.CandidateProviderIDToCommand[id]; ok {
if _, ok := q.ProviderIDToCommand[id]; ok {
err = multierr.Append(err, fmt.Errorf("candidate is being deprovisioned"))
}
}
Expand All @@ -309,7 +309,7 @@ func (q *Queue) CanAdd(ids ...string) error {
func (q *Queue) Remove(cmd *Command) {
// Remove all candidates linked to the command
for _, candidate := range cmd.Candidates {
delete(q.CandidateProviderIDToCommand, candidate.ProviderID())
delete(q.ProviderIDToCommand, candidate.ProviderID())
}
q.RateLimitingInterface.Forget(cmd)
q.RateLimitingInterface.Done(cmd)
Expand Down Expand Up @@ -367,7 +367,7 @@ func (q *Queue) RequireNoScheduleTaint(ctx context.Context, addTaint bool, nodes
// Reset is used for testing and clears all internal data structures
func (q *Queue) Reset() {
q.RateLimitingInterface = workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(queueBaseDelay, queueMaxDelay))
q.CandidateProviderIDToCommand = map[string]*Command{}
q.ProviderIDToCommand = map[string]*Command{}
}

// launchReplacementNodeClaims will create replacement node claims
Expand Down
54 changes: 8 additions & 46 deletions pkg/controllers/disruption/orchestration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
. "knative.dev/pkg/logging/testing"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -97,7 +98,7 @@ var _ = BeforeEach(func() {
fakeClock.SetTime(time.Now())
cluster.Reset()
cloudProvider.Reset()
instanceTypes = cloudProvider.InstanceTypes
instanceTypes = fake.InstanceTypes(5)
cluster.MarkUnconsolidated()
queue.Reset()
})
Expand All @@ -118,10 +119,11 @@ var _ = Describe("Queue", func() {

Context("Add", func() {
It("should remove the karpenter.sh/disruption taint for nodes that fail to disrupt", func() {
nodePool.Spec.Limits = v1beta1.Limits{v1.ResourceCPU: resource.MustParse("0")}
nodePool.Status.Resources = v1.ResourceList{v1.ResourceCPU: resource.MustParse("100")}
ExpectApplied(ctx, env.Client, nodeClaim1, node1, nodePool)
ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*v1.Node{node1}, []*v1beta1.NodeClaim{nodeClaim1})

cloudProvider.AllowedCreateCalls = 0
stateNode := ExpectStateNodeExists(cluster, node1)
Expect(queue.Add(ctx, []*state.StateNode{stateNode}, []*scheduling.NodeClaim{schedulingReplacementNodeClaim}, "test", 0)).ToNot(BeNil())

Expand Down Expand Up @@ -162,6 +164,7 @@ var _ = Describe("Queue", func() {

Expect(queue.Add(ctx, []*state.StateNode{stateNode}, []*scheduling.NodeClaim{schedulingReplacementNodeClaim}, "test", 0)).To(BeNil())
Expect(queue.Len()).To(Equal(1))
stateNode = ExpectStateNodeExistsForNodeClaim(cluster, nodeClaim1)
Expect(stateNode.MarkedForDeletion()).To(BeTrue())

fakeClock.Step(1 * time.Hour)
Expand All @@ -176,7 +179,7 @@ var _ = Describe("Queue", func() {
ExpectReconcileSucceeded(ctx, queue, types.NamespacedName{})

// Get the command
cmd := queue.CandidateProviderIDToCommand[nodeClaim1.Status.ProviderID]
cmd := queue.ProviderIDToCommand[nodeClaim1.Status.ProviderID]
Expect(cmd).ToNot(BeNil())
Expect(cmd.ReplacementKeys[0].Initialized).To(BeFalse())

Expand Down Expand Up @@ -214,7 +217,7 @@ var _ = Describe("Queue", func() {
Expect(queue.Len()).To(Equal(1))

// Get the command
cmd := queue.CandidateProviderIDToCommand[nodeClaim1.Status.ProviderID]
cmd := queue.ProviderIDToCommand[nodeClaim1.Status.ProviderID]
Expect(cmd).ToNot(BeNil())

// Process the command
Expand Down Expand Up @@ -274,7 +277,7 @@ var _ = Describe("Queue", func() {
Expect(queue.Len()).To(Equal(1))

// Get the command and process it
cmd := queue.CandidateProviderIDToCommand[nodeClaim1.Status.ProviderID]
cmd := queue.ProviderIDToCommand[nodeClaim1.Status.ProviderID]
Expect(cmd).ToNot(BeNil())
ExpectReconcileSucceeded(ctx, queue, types.NamespacedName{})

Expand All @@ -288,44 +291,3 @@ var _ = Describe("Queue", func() {
})
})
})

// Context("Queue Events", func() {
// It("should emit readiness events", func() {
// ExpectApplied(ctx, env.Client, nodeClaim1, node1, nodePool)
// ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*v1.Node{node1}, []*v1beta1.NodeClaim{nodeClaim1})
// stateNode := ExpectStateNodeExistsForNodeClaim(cluster, nodeClaim1)
// Expect(queue.Add(ctx, []*state.StateNode{stateNode}, []*scheduling.NodeClaim{schedulingReplacementNodeClaim}, "test", 0)).ToNot(BeNil())
// fakeClock.Step(10 * time.Second)
// Expect(queue.Len()).To(Equal(1))

// // Get the command
// cmd := queue.CandidateProviderIDToCommand[nodeClaim1.Status.ProviderID]
// Expect(cmd).ToNot(BeNil())

// ExpectReconcileSucceeded(ctx, queue, types.NamespacedName{})
// Expect(cmd.ReplacementKeys[0].Initialized).To(BeFalse())
// Expect(recorder.DetectedEvent(disruptionevents.Launching(stateNode.NodeClaim, "test").Message)).To(BeTrue())
// Expect(recorder.DetectedEvent(disruptionevents.WaitingOnReadiness(stateNode.NodeClaim).Message)).To(BeTrue())
// })
// It("should emit termination events", func() {
// ExpectApplied(ctx, env.Client, nodeClaim1, node1, nodePool)
// ExpectMakeNodesAndNodeClaimsInitializedAndStateUpdated(ctx, env.Client, nodeStateController, nodeClaimStateController, []*v1.Node{node1}, []*v1beta1.NodeClaim{nodeClaim1})
// stateNode := ExpectStateNodeExistsForNodeClaim(cluster, nodeClaim1)
// Expect(queue.Add(ctx, []*state.StateNode{stateNode}, []*scheduling.NodeClaim{schedulingReplacementNodeClaim}, "test", 0)).ToNot(BeNil())
// fakeClock.Step(10 * time.Second)
// Expect(queue.Len()).To(Equal(1))

// // Get the command
// cmd := queue.CandidateProviderIDToCommand[nodeClaim1.Status.ProviderID]
// Expect(cmd).ToNot(BeNil())

// ExpectReconcileSucceeded(ctx, queue, types.NamespacedName{})
// terminatingEvents := disruptionevents.Terminating(stateNode.Node, stateNode.NodeClaim, "test")
// Expect(recorder.DetectedEvent(terminatingEvents[0].Message)).To(BeTrue())
// Expect(recorder.DetectedEvent(terminatingEvents[1].Message)).To(BeTrue())

// ExpectNodeClaimsCascadeDeletion(ctx, env.Client, nodeClaim1)
// // And expect the nodeClaim and node to be deleted
// ExpectNotFound(ctx, env.Client, nodeClaim1, node1)
// })
// })

0 comments on commit f374a5c

Please sign in to comment.