Skip to content

Commit

Permalink
Fix duplicated Traceflow tag allocation due to Traceflow CRD updates
Browse files Browse the repository at this point in the history
For a CRD update, controller checks if the CRD has been processed and
already has a tag allocated or not; if it has been processed already
the update will be ignored by controller.
Also controller releases the allocated tag when failing to update the
CRD status.
  • Loading branch information
jianjuns committed Aug 20, 2020
1 parent 880794f commit 0f07d5e
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 62 deletions.
5 changes: 3 additions & 2 deletions pkg/agent/controller/traceflow/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ func (c *Controller) HandlePacketIn(pktIn *ofctrl.PacketIn) error {
klog.Warningf("Get traceflow failed: %+v", err)
return err
}
tf.Status.Results = append(tf.Status.Results, *nodeResult)
tf, err = c.traceflowClient.OpsV1alpha1().Traceflows().UpdateStatus(context.TODO(), tf, v1.UpdateOptions{})
update := tf.DeepCopy()
update.Status.Results = append(update.Status.Results, *nodeResult)
_, err = c.traceflowClient.OpsV1alpha1().Traceflows().UpdateStatus(context.TODO(), update, v1.UpdateOptions{})
if err != nil {
klog.Warningf("Update traceflow failed: %+v", err)
return err
Expand Down
125 changes: 65 additions & 60 deletions pkg/controller/traceflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ package traceflow

import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -161,7 +160,7 @@ func (c *Controller) updateTraceflow(_, curObj interface{}) {
func (c *Controller) deleteTraceflow(old interface{}) {
tf := old.(*opsv1alpha1.Traceflow)
klog.Infof("Processing Traceflow %s DELETE event", tf.Name)
c.deallocateTag(tf)
c.deallocateTagForTF(tf)
}

// worker is a long-running function that will continually call the processTraceflowItem function
Expand All @@ -173,10 +172,10 @@ func (c *Controller) worker() {

// processTraceflowItem processes an item in the "traceflow" work queue, by calling syncTraceflow
// after casting the item to a string (Traceflow name). If syncTraceflow returns an error, this
// function logs error. If syncTraceflow returns retry flag is false, the Traceflow will be added
// to queue with rate limit. If syncTraceflow returns retry flag is false, the Traceflow is removed
// from the queue until we get notified of a new change. This function returns false if and only if
// the work queue was shutdown (no more items will be processed).
// function logs the error and adds the Traceflow request back to the queue with a rate limit. If
// no error occurs, the Traceflow request is removed from the queue until we get notified of a new
// change. This function returns false if and only if the work queue was shutdown (no more items
// will be processed).
func (c *Controller) processTraceflowItem() bool {
obj, quit := c.queue.Get()
if quit {
Expand All @@ -197,12 +196,9 @@ func (c *Controller) processTraceflowItem() bool {
klog.Errorf("Expected string in work queue but got %#v", obj)
return true
} else {
retry, err := c.syncTraceflow(key)
err := c.syncTraceflow(key)
if err != nil {
klog.Errorf("Error syncing Traceflow %s, Aborting. Error: %v", key, err)
}
// Add key to queue if retry flag is true, forget key if retry flag is false.
if retry {
c.queue.AddRateLimited(key)
} else {
c.queue.Forget(key)
Expand All @@ -211,39 +207,49 @@ func (c *Controller) processTraceflowItem() bool {
return true
}

func (c *Controller) syncTraceflow(traceflowName string) (retry bool, err error) {
func (c *Controller) syncTraceflow(traceflowName string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing Traceflow for %s. (%v)", traceflowName, time.Since(startTime))
}()

retry = false
tf, err := c.traceflowLister.Get(traceflowName)
if err != nil {
return
if apierrors.IsNotFound(err) {
// Traceflow CRD has been deleted.
return nil
}
return err
}
switch tf.Status.Phase {
case "", opsv1alpha1.Pending:
_, err = c.startTraceflow(tf)
err = c.startTraceflow(tf)
case opsv1alpha1.Running:
retry, err = c.checkTraceflowStatus(tf)
err = c.checkTraceflowStatus(tf)
default:
c.deallocateTag(tf)
c.deallocateTagForTF(tf)
}
return
return err
}

func (c *Controller) startTraceflow(tf *opsv1alpha1.Traceflow) (*opsv1alpha1.Traceflow, error) {
func (c *Controller) startTraceflow(tf *opsv1alpha1.Traceflow) error {
// Allocate data plane tag.
tag, err := c.allocateTag(tf)
tag, err := c.allocateTag(tf.Name)
if err != nil {
return err
}
if tag == 0 {
return nil
}

err = c.updateTraceflowStatus(tf, opsv1alpha1.Running, "", tag)
if err != nil {
return nil, err
c.deallocateTag(tf.Name, tag)
}
return c.runningTraceflowCRD(tf, tag)
return err
}

func (c *Controller) checkTraceflowStatus(tf *opsv1alpha1.Traceflow) (retry bool, err error) {
retry = false
func (c *Controller) checkTraceflowStatus(tf *opsv1alpha1.Traceflow) error {
sender := false
receiver := false
for i, nodeResult := range tf.Status.Results {
Expand Down Expand Up @@ -271,39 +277,25 @@ func (c *Controller) checkTraceflowStatus(tf *opsv1alpha1.Traceflow) (retry bool
}
}
if sender && receiver {
tf.Status.Phase = opsv1alpha1.Succeeded
_, err = c.client.OpsV1alpha1().Traceflows().UpdateStatus(context.TODO(), tf, metav1.UpdateOptions{})
return
return c.updateTraceflowStatus(tf, opsv1alpha1.Succeeded, "", 0)
}
if time.Now().UTC().Sub(tf.CreationTimestamp.UTC()).Seconds() > timeout {
_, err = c.errorTraceflowCRD(tf, "traceflow timeout")
return
return c.updateTraceflowStatus(tf, opsv1alpha1.Failed, "traceflow timeout", 0)
}
retry = true
return
return nil
}

func (c *Controller) runningTraceflowCRD(tf *opsv1alpha1.Traceflow, dataPlaneTag uint8) (*opsv1alpha1.Traceflow, error) {
tf.Status.DataplaneTag = dataPlaneTag
tf.Status.Phase = opsv1alpha1.Running

type Traceflow struct {
Status opsv1alpha1.TraceflowStatus `json:"status,omitempty"`
func (c *Controller) updateTraceflowStatus(tf *opsv1alpha1.Traceflow, phase opsv1alpha1.TraceflowPhase, reason string, dataPlaneTag uint8) error {
update := tf.DeepCopy()
update.Status.Phase = phase
if phase == opsv1alpha1.Running {
update.Status.DataplaneTag = dataPlaneTag
}
patchData := Traceflow{Status: opsv1alpha1.TraceflowStatus{Phase: tf.Status.Phase, DataplaneTag: dataPlaneTag}}
payloads, _ := json.Marshal(patchData)
return c.client.OpsV1alpha1().Traceflows().Patch(context.TODO(), tf.Name, types.MergePatchType, payloads, metav1.PatchOptions{}, "status")
}

func (c *Controller) errorTraceflowCRD(tf *opsv1alpha1.Traceflow, reason string) (*opsv1alpha1.Traceflow, error) {
tf.Status.Phase = opsv1alpha1.Failed

type Traceflow struct {
Status opsv1alpha1.TraceflowStatus `json:"status,omitempty"`
if reason != "" {
update.Status.Reason = reason
}
patchData := Traceflow{Status: opsv1alpha1.TraceflowStatus{Phase: tf.Status.Phase, Reason: reason}}
payloads, _ := json.Marshal(patchData)
return c.client.OpsV1alpha1().Traceflows().Patch(context.TODO(), tf.Name, types.MergePatchType, payloads, metav1.PatchOptions{}, "status")
_, err := c.client.OpsV1alpha1().Traceflows().UpdateStatus(context.TODO(), update, metav1.UpdateOptions{})
return err
}

func (c *Controller) occupyTag(tf *opsv1alpha1.Traceflow) error {
Expand All @@ -326,28 +318,41 @@ func (c *Controller) occupyTag(tf *opsv1alpha1.Traceflow) error {
return nil
}

func (c *Controller) allocateTag(tf *opsv1alpha1.Traceflow) (uint8, error) {
// Allocates a tag. If the Traceflow request has been allocated with a tag
// already, 0 is returned. If number of existing Traceflow requests reaches
// the upper limit, an error is returned.
func (c *Controller) allocateTag(name string) (uint8, error) {
c.runningTraceflowsMutex.Lock()
defer c.runningTraceflowsMutex.Unlock()

for _, n := range c.runningTraceflows {
if n == name {
// The Traceflow request has been processed already.
return 0, nil
}
}
for i := minTagNum; i <= maxTagNum; i++ {
if _, ok := c.runningTraceflows[i]; !ok {
c.runningTraceflows[i] = tf.Name
c.runningTraceflows[i] = name
return i, nil
}
}
return 0, errors.New("Too much traceflow currently")
return 0, fmt.Errorf("number of on-going Traceflow operations already reached the upper limit: %d", maxTagNum)
}

// Deallocate tag from cache. Ignore DataplaneTag == 0 which is invalid case.
func (c *Controller) deallocateTag(tf *opsv1alpha1.Traceflow) {
if tf.Status.DataplaneTag == 0 {
return
// Deallocates tag from cache. Ignore DataplaneTag == 0 which is an invalid case.
func (c *Controller) deallocateTagForTF(tf *opsv1alpha1.Traceflow) {
if tf.Status.DataplaneTag != 0 {
c.deallocateTag(tf.Name, tf.Status.DataplaneTag)
}
}

func (c *Controller) deallocateTag(name string, tag uint8) {
c.runningTraceflowsMutex.Lock()
defer c.runningTraceflowsMutex.Unlock()
if existingTraceflowName, ok := c.runningTraceflows[tf.Status.DataplaneTag]; ok {
if tf.Name == existingTraceflowName {
delete(c.runningTraceflows, tf.Status.DataplaneTag)
if existingTraceflowName, ok := c.runningTraceflows[tag]; ok {
if name == existingTraceflowName {
delete(c.runningTraceflows, tag)
}
}
}

0 comments on commit 0f07d5e

Please sign in to comment.