Skip to content

Commit

Permalink
Return early if EndpointSlices of upstream service have no ports
Browse files Browse the repository at this point in the history
Signed-off-by: Daneyon Hansen <[email protected]>
  • Loading branch information
danehans committed Nov 27, 2024
1 parent 66beb6c commit 6cc7490
Showing 1 changed file with 25 additions and 4 deletions.
29 changes: 25 additions & 4 deletions projects/gateway2/krtcollections/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,33 @@ func transformK8sEndpoints(ctx context.Context, inputs EndpointsInputs) func(kct
spec := kubeUpstream.Kube
kubeSvcPort, singlePortSvc := findPortForService(kctx, svcs, spec)
if kubeSvcPort == nil {
logger.Debug("findPortForService - not found.", zap.Uint32("port", spec.GetServicePort()), zap.String("svcName", spec.GetServiceName()), zap.String("svcNamespace", spec.GetServiceNamespace()))
logger.Debug("port not found for service", zap.Uint32("port", spec.GetServicePort()), zap.String("name", spec.GetServiceName()), zap.String("namespace", spec.GetServiceNamespace()))
return nil
}

// Fetch all EndpointSlices for the service
// Fetch all EndpointSlices for the upstream service
key := types.NamespacedName{
Namespace: spec.GetServiceNamespace(),
Name: spec.GetServiceName(),
}

endpointSlices := krt.Fetch(kctx, inputs.EndpointSlices, krt.FilterIndex(inputs.EndpointSlicesByService, key))
if len(endpointSlices) == 0 {
logger.Debug("no EndpointSlices found for service", zap.String("name", key.Name), zap.String("namespace", key.Namespace))
logger.Debug("no endpointslices found for service", zap.String("name", key.Name), zap.String("namespace", key.Namespace))
return nil
}

// Handle potential eventually consistency of EndpointSlices for the upstream service
found := false
for _, endpointSlice := range endpointSlices {
if port := findPortInEndpointSlice(endpointSlice, singlePortSvc, kubeSvcPort); port != 0 {
found = true
break
}
}
if !found {
logger.Debug("no ports found in endpointslices for service", zap.String("name", key.Name), zap.String("namespace", key.Namespace))
return nil
}

// Initialize the returned EndpointsForUpstream
Expand All @@ -251,7 +265,9 @@ func transformK8sEndpoints(ctx context.Context, inputs EndpointsInputs) func(kct
for _, endpointSlice := range endpointSlices {
port := findPortInEndpointSlice(endpointSlice, singlePortSvc, kubeSvcPort)
if port == 0 {
logger.Debug("no port found in EndpointSlice", zap.String("name", endpointSlice.Name), zap.String("namespace", endpointSlice.Namespace))
logger.Debug("no port found in endpointslice; will try next endpointslice if one exists",
zap.String("name", endpointSlice.Name),
zap.String("namespace", endpointSlice.Namespace))
continue
}

Expand Down Expand Up @@ -384,6 +400,11 @@ func findPortForService(kctx krt.HandlerContext, services krt.Collection[*corev1

func findPortInEndpointSlice(endpointSlice *discoveryv1.EndpointSlice, singlePortService bool, kubeServicePort *corev1.ServicePort) uint32 {
var port uint32

if endpointSlice == nil || kubeServicePort == nil {
return port
}

for _, p := range endpointSlice.Ports {
if p.Port == nil {
continue
Expand Down

0 comments on commit 6cc7490

Please sign in to comment.