Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix key in connection pool #704

Merged
merged 2 commits into from
Nov 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 19 additions & 25 deletions internal/controller/csiaddons/csiaddonsnode_controller.go
Original file line number Diff line number Diff line change
@@ -42,8 +42,6 @@ import (

var (
csiAddonsNodeFinalizer = csiaddonsv1alpha1.GroupVersion.Group + "/csiaddonsnode"

errLegacyEndpoint = errors.New("legacy formatted endpoint")
)

// CSIAddonsNodeReconciler reconciles a CSIAddonsNode object
@@ -95,9 +93,20 @@ func (r *CSIAddonsNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reques
nodeID := csiAddonsNode.Spec.Driver.NodeID
driverName := csiAddonsNode.Spec.Driver.Name

key := csiAddonsNode.Namespace + "/" + util.NormalizeLeaseName(csiAddonsNode.Name)
logger = logger.WithValues("NodeID", nodeID, "DriverName", driverName)

podName, endPoint, err := r.resolveEndpoint(ctx, csiAddonsNode.Spec.Driver.EndPoint)
if err != nil {
logger.Error(err, "Failed to resolve endpoint")
return ctrl.Result{}, fmt.Errorf("failed to resolve endpoint %q: %w", csiAddonsNode.Spec.Driver.EndPoint, err)
}

// namespace + "/" + leader identity(pod name) is the key for the connection.
// this key is used by GetLeaderByDriver to get the connection
key := csiAddonsNode.Namespace + "/" + podName

logger = logger.WithValues("EndPoint", endPoint)

if !csiAddonsNode.DeletionTimestamp.IsZero() {
// if deletion timestamp is set, the CSIAddonsNode is getting deleted,
// delete connections and remove finalizer.
@@ -107,14 +116,6 @@ func (r *CSIAddonsNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, err
}

endPoint, err := r.resolveEndpoint(ctx, csiAddonsNode.Spec.Driver.EndPoint)
if err != nil {
logger.Error(err, "Failed to resolve endpoint")
return ctrl.Result{}, fmt.Errorf("failed to resolve endpoint %q: %w", csiAddonsNode.Spec.Driver.EndPoint, err)
}

logger = logger.WithValues("EndPoint", endPoint)

if err := r.addFinalizer(ctx, &logger, csiAddonsNode); err != nil {
return ctrl.Result{}, err
}
@@ -203,14 +204,12 @@ func (r *CSIAddonsNodeReconciler) removeFinalizer(
return nil
}

// resolveEndpoint parses the endpoint and returned a string that can be used
// resolveEndpoint parses the endpoint and returned a endpoint and pod name that can be used
// by GRPC to connect to the sidecar.
func (r *CSIAddonsNodeReconciler) resolveEndpoint(ctx context.Context, rawURL string) (string, error) {
func (r *CSIAddonsNodeReconciler) resolveEndpoint(ctx context.Context, rawURL string) (string, string, error) {
namespace, podname, port, err := parseEndpoint(rawURL)
if err != nil && errors.Is(err, errLegacyEndpoint) {
return rawURL, nil
} else if err != nil {
return "", err
if err != nil {
return "", "", err
}

pod := &corev1.Pod{}
@@ -219,23 +218,18 @@ func (r *CSIAddonsNodeReconciler) resolveEndpoint(ctx context.Context, rawURL st
Name: podname,
}, pod)
if err != nil {
return "", fmt.Errorf("failed to get pod %s/%s: %w", namespace, podname, err)
return "", "", fmt.Errorf("failed to get pod %s/%s: %w", namespace, podname, err)
} else if pod.Status.PodIP == "" {
return "", fmt.Errorf("pod %s/%s does not have an IP-address", namespace, podname)
return "", "", fmt.Errorf("pod %s/%s does not have an IP-address", namespace, podname)
}

return fmt.Sprintf("%s:%s", pod.Status.PodIP, port), nil
return podname, fmt.Sprintf("%s:%s", pod.Status.PodIP, port), nil
}

// parseEndpoint returns the rawURL if it is in the legacy <IP-address>:<port>
// format. When the recommended format is used, it returns the Namespace,
// PodName, Port and error instead.
func parseEndpoint(rawURL string) (string, string, string, error) {
// assume old formatted endpoint, don't parse it
if !strings.Contains(rawURL, "://") {
return "", "", "", errLegacyEndpoint
}

endpoint, err := url.Parse(rawURL)
if err != nil {
return "", "", "", fmt.Errorf("failed to parse endpoint %q: %w", rawURL, err)
Original file line number Diff line number Diff line change
@@ -17,7 +17,6 @@ limitations under the License.
package controller

import (
"errors"
"testing"

"github.com/csi-addons/spec/lib/go/identity"
@@ -26,11 +25,8 @@ import (
)

func TestParseEndpoint(t *testing.T) {
_, _, _, err := parseEndpoint("1.2.3.4:5678")
assert.True(t, errors.Is(err, errLegacyEndpoint))

// test empty namespace
_, _, _, err = parseEndpoint("pod://pod-name:5678")
_, _, _, err := parseEndpoint("pod://pod-name:5678")
assert.Error(t, err)

// test empty namespace