Skip to content

Commit

Permalink
controller: send NetworkFence requests to the leading CSI-Addons sidecar
Browse files Browse the repository at this point in the history
NetworkFence operations should only be sent to a CSI-Addons sidecar that
has the CONTROLLER_SERVICE capability. There should be a single leader
for the CSI-Addons sidecars that support that, and the leader can be
identified by the Lease object for the CSI-drivername.

Signed-off-by: Niels de Vos <[email protected]>
  • Loading branch information
nixpanic committed Dec 11, 2023
1 parent 818a51c commit 752b98a
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 21 deletions.
2 changes: 1 addition & 1 deletion controllers/csiaddons/csiaddonsnode_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (r *CSIAddonsNodeReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}

logger.Info("Connecting to sidecar")
newConn, err := connection.NewConnection(ctx, endPoint, nodeID, driverName)
newConn, err := connection.NewConnection(ctx, endPoint, nodeID, driverName, csiAddonsNode.Name)
if err != nil {
logger.Error(err, "Failed to establish connection with sidecar")

Expand Down
41 changes: 22 additions & 19 deletions controllers/csiaddons/networkfence_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (r *NetworkFenceReconciler) Reconcile(ctx context.Context, req ctrl.Request

logger = logger.WithValues("DriverName", nwFence.Spec.Driver, "CIDRs", nwFence.Spec.Cidrs)

client, err := r.getNetworkFenceClient(nwFence.Spec.Driver, "")
client, err := r.getNetworkFenceClient(ctx, nwFence.Spec.Driver)
if err != nil {
logger.Error(err, "Failed to get NetworkFenceClient")
return ctrl.Result{}, err
Expand Down Expand Up @@ -298,26 +298,29 @@ func (nf *NetworkFenceInstance) removeFinalizerFromNetworkFence(ctx context.Cont
return nil
}

// getNetworkFenceClient returns a NetworkFenceClient for the given driver.
func (r *NetworkFenceReconciler) getNetworkFenceClient(drivername, nodeID string) (proto.NetworkFenceClient, error) {
conns := r.Connpool.GetByNodeID(drivername, nodeID)

// Iterate through the connections and find the one that matches the driver name
// provided in the NetworkFence spec; so that corresponding network fence and
// unfence operations can be performed.
for _, v := range conns {
for _, cap := range v.Capabilities {
// validate if NETWORK_FENCE capability is supported by the driver.
if cap.GetNetworkFence() == nil {
continue
}
// getNetworkFenceClient returns a NetworkFenceClient that is the leader for
// the given driver.
// The NetworkFenceClient should only run on a CONTROLLER_SERVICE capable
// CSI-Addons plugin, there can only be one plugin that holds the lease.
func (r *NetworkFenceReconciler) getNetworkFenceClient(ctx context.Context, drivername string) (proto.NetworkFenceClient, error) {
conn, err := r.Connpool.GetLeaderByDriver(ctx, r.Client, drivername)
if err != nil {
return nil, err
}

// validate of NETWORK_FENCE capability is enabled by the storage driver.
if cap.GetNetworkFence().GetType() == identity.Capability_NetworkFence_NETWORK_FENCE {
return proto.NewNetworkFenceClient(v.Client), nil
}
// verify that the CSI-Addons plugin holding the lease supports
// NetworkFence, it probably is a bug if it doesn't
for _, capability := range conn.Capabilities {
// validate if NETWORK_FENCE capability is supported by the driver.
if capability.GetNetworkFence() == nil {
continue
}

// validate of NETWORK_FENCE capability is enabled by the storage driver.
if capability.GetNetworkFence().GetType() == identity.Capability_NetworkFence_NETWORK_FENCE {
return proto.NewNetworkFenceClient(conn.Client), nil
}
}

return nil, fmt.Errorf("no connections for driver: %s", drivername)
return nil, fmt.Errorf("leading CSIAddonsNode %q for driver %q does not support NetworkFence", conn.Name, drivername)
}
15 changes: 14 additions & 1 deletion internal/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import (
type Connection struct {
Client *grpc.ClientConn
Capabilities []*identity.Capability
Name string
NodeID string
DriverName string
Timeout time.Duration
}

// NewConnection establishes connection with sidecar, fetches capability and returns Connection object
// filled with required information.
func NewConnection(ctx context.Context, endpoint, nodeID, driverName string) (*Connection, error) {
func NewConnection(ctx context.Context, endpoint, nodeID, driverName, podName string) (*Connection, error) {
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithIdleTimeout(time.Duration(0)),
Expand All @@ -49,6 +50,7 @@ func NewConnection(ctx context.Context, endpoint, nodeID, driverName string) (*C

conn := &Connection{
Client: cc,
Name: podName,
NodeID: nodeID,
DriverName: driverName,
Timeout: time.Minute,
Expand Down Expand Up @@ -83,3 +85,14 @@ func (c *Connection) fetchCapabilities(ctx context.Context) error {

return nil
}

func (c *Connection) HasControllerService() bool {
for _, capability := range c.Capabilities {
svc := capability.GetService()
if svc != nil && svc.GetType() == identity.Capability_Service_CONTROLLER_SERVICE {
return true
}
}

return false
}

0 comments on commit 752b98a

Please sign in to comment.