diff --git a/controller/api/destination/endpoint_profile_translator.go b/controller/api/destination/endpoint_profile_translator.go index f497eb4ff1387..a31083efeec6f 100644 --- a/controller/api/destination/endpoint_profile_translator.go +++ b/controller/api/destination/endpoint_profile_translator.go @@ -48,25 +48,6 @@ func (ept *endpointProfileTranslator) Update(address *watcher.Address) (bool, er return false, fmt.Errorf("failed to create endpoint: %w", err) } - // The protocol for an endpoint should only be updated if there is a pod, - // endpoint, and the endpoint has a protocol hint. If there is an endpoint - // but it does not have a protocol hint, that means we could not determine - // if it has a peer proxy so a opaque traffic would not be supported. - if address.Pod != nil && endpoint != nil && endpoint.ProtocolHint != nil { - if !address.OpaqueProtocol { - endpoint.ProtocolHint.OpaqueTransport = nil - } else if endpoint.ProtocolHint.OpaqueTransport == nil { - port, err := getInboundPort(&address.Pod.Spec) - if err != nil { - ept.log.Error(err) - } else { - endpoint.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{ - InboundPort: port, - } - } - } - - } profile := &pb.DestinationProfile{ RetryBudget: defaultRetryBudget(), Endpoint: endpoint, @@ -86,7 +67,7 @@ func (ept *endpointProfileTranslator) Update(address *watcher.Address) (bool, er } func (ept *endpointProfileTranslator) createEndpoint(address watcher.Address, opaquePorts map[uint32]struct{}) (*pb.WeightedAddr, error) { - weightedAddr, err := createWeightedAddr(address, opaquePorts, ept.enableH2Upgrade, ept.identityTrustDomain, ept.controllerNS, ept.log) + weightedAddr, err := createWeightedAddr(address, opaquePorts, ept.enableH2Upgrade, ept.identityTrustDomain, ept.controllerNS) if err != nil { return nil, err } diff --git a/controller/api/destination/endpoint_translator.go b/controller/api/destination/endpoint_translator.go index 1b70db7aa335b..4fc280d3aa50f 100644 --- a/controller/api/destination/endpoint_translator.go +++ b/controller/api/destination/endpoint_translator.go @@ -377,7 +377,7 @@ func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) { ) if address.Pod != nil { opaquePorts = watcher.GetAnnotatedOpaquePorts(address.Pod, et.defaultOpaquePorts) - wa, err = createWeightedAddr(address, opaquePorts, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.log) + wa, err = createWeightedAddr(address, opaquePorts, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS) if err != nil { et.log.Errorf("Failed to translate endpoints to weighted addr: %s", err) continue @@ -486,7 +486,6 @@ func createWeightedAddr( enableH2Upgrade bool, identityTrustDomain string, controllerNS string, - log *logging.Entry, ) (*pb.WeightedAddr, error) { tcpAddr, err := toAddr(address) if err != nil { @@ -505,10 +504,7 @@ func createWeightedAddr( return &weightedAddr, nil } - skippedInboundPorts, err := getPodSkippedInboundPortsAnnotations(address.Pod) - if err != nil { - log.Errorf("failed to get ignored inbound ports annotation for pod: %s", err) - } + skippedInboundPorts := getPodSkippedInboundPortsAnnotations(address.Pod) controllerNSLabel := address.Pod.Labels[pkgK8s.ControllerNSLabel] sa, ns := pkgK8s.GetServiceAccountAndNS(address.Pod) @@ -523,29 +519,29 @@ func createWeightedAddr( _, isSkippedInboundPort := skippedInboundPorts[address.Port] - // If the pod is controlled by any Linkerd control plane, then it can be - // hinted that this destination knows H2 (and handles our orig-proto - // translation) - weightedAddr.ProtocolHint = &pb.ProtocolHint{} if controllerNSLabel != "" && !isSkippedInboundPort { - // If address is set as opaque by a Server, or its port is set as - // opaque by annotation or default value, then hint its proxy's - // inbound port, and set the hinted protocol to Opaque, so that the - // client proxy does not send a SessionProtocol. + weightedAddr.ProtocolHint = &pb.ProtocolHint{} + _, opaquePort := opaquePorts[address.Port] + // If address is set as opaque by a Server, or its port is set as + // opaque by annotation or default value, then set the hinted protocol to + // Opaque. if address.OpaqueProtocol || opaquePort { + weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{ + Opaque: &pb.ProtocolHint_Opaque{}, + } + port, err := getInboundPort(&address.Pod.Spec) if err != nil { - log.Error(err) - } else { - weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{ - InboundPort: port, - } - weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_Opaque_{ - Opaque: &pb.ProtocolHint_Opaque{}, - } + return nil, fmt.Errorf("failed to read inbound port: %w", err) + } + weightedAddr.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{ + InboundPort: port, } } else if enableH2Upgrade { + // If the pod is controlled by any Linkerd control plane, then it can be + // hinted that this destination knows H2 (and handles our orig-proto + // translation) weightedAddr.ProtocolHint.Protocol = &pb.ProtocolHint_H2_{ H2: &pb.ProtocolHint_H2{}, } diff --git a/controller/api/destination/server.go b/controller/api/destination/server.go index 75a07d8fe4ab9..b569bdf0d435c 100644 --- a/controller/api/destination/server.go +++ b/controller/api/destination/server.go @@ -278,10 +278,18 @@ func (s *server) GetProfile(dest *pb.GetDestination, stream pb.Destination_GetPr } if ip := net.ParseIP(host); ip != nil { - return s.getProfileByIP(token, ip, port, log, stream) + err = s.getProfileByIP(token, ip, port, log, stream) + if err != nil { + log.Errorf("Failed to subscribe to profile by ip %q: %q", dest.GetPath(), err) + } + return err } - return s.getProfileByName(token, host, port, log, stream) + err = s.getProfileByName(token, host, port, log, stream) + if err != nil { + log.Errorf("Failed to subscribe to profile by name %q: %q", dest.GetPath(), err) + } + return err } func (s *server) getProfileByIP( @@ -657,10 +665,10 @@ func hasSuffix(slice []string, suffix []string) bool { return true } -func getPodSkippedInboundPortsAnnotations(pod *corev1.Pod) (map[uint32]struct{}, error) { +func getPodSkippedInboundPortsAnnotations(pod *corev1.Pod) map[uint32]struct{} { annotation, ok := pod.Annotations[labels.ProxyIgnoreInboundPortsAnnotation] if !ok || annotation == "" { - return nil, nil + return nil } return util.ParsePorts(annotation) diff --git a/controller/api/destination/server_test.go b/controller/api/destination/server_test.go index d83555a3cd923..c5baa1bcc17f8 100644 --- a/controller/api/destination/server_test.go +++ b/controller/api/destination/server_test.go @@ -449,6 +449,25 @@ func TestGetProfiles(t *testing.T) { } }) + t.Run("Return profile with protocol hint for default opaque port when pod is unmeshed", func(t *testing.T) { + server := makeServer(t) + defer server.clusterStore.UnregisterGauges() + + // 3306 is in the default opaque list + stream := profileStream(t, server, podIP2, 3306, "") + defer stream.Cancel() + profile := assertSingleProfile(t, stream.Updates()) + if profile.Endpoint == nil { + t.Fatalf("Expected response to have endpoint field") + } + if !profile.OpaqueProtocol { + t.Fatal("Expected port 3306 to be an opaque protocol, but it was not") + } + if profile.GetEndpoint().GetProtocolHint() != nil { + t.Fatalf("Expected protocol hint to be nil") + } + }) + t.Run("Return non-opaque protocol profile when using cluster IP and opaque protocol port", func(t *testing.T) { server := makeServer(t) defer server.clusterStore.UnregisterGauges() @@ -497,7 +516,7 @@ func TestGetProfiles(t *testing.T) { if profile.Endpoint.ProtocolHint == nil { t.Fatalf("Expected protocol hint but found none") } - if profile.Endpoint.ProtocolHint.GetOpaqueTransport().GetInboundPort() != 4143 { + if profile.Endpoint.GetProtocolHint().GetOpaqueTransport().GetInboundPort() != 4143 { t.Fatalf("Expected pod to support opaque traffic on port 4143") } if profile.Endpoint.Addr.String() != epAddr.String() { @@ -552,10 +571,10 @@ func TestGetProfiles(t *testing.T) { if !profile.OpaqueProtocol { t.Fatalf("Expected port %d to be an opaque protocol, but it was not", 80) } - if profile.Endpoint.ProtocolHint == nil { + if profile.Endpoint.GetProtocolHint() == nil { t.Fatalf("Expected protocol hint but found none") } - if profile.Endpoint.ProtocolHint.GetOpaqueTransport().GetInboundPort() != 4143 { + if profile.Endpoint.GetProtocolHint().GetOpaqueTransport().GetInboundPort() != 4143 { t.Fatalf("Expected pod to support opaque traffic on port 4143") } }) diff --git a/controller/cmd/destination/main.go b/controller/cmd/destination/main.go index 2f4abb2077ef1..845c9e03cad88 100644 --- a/controller/cmd/destination/main.go +++ b/controller/cmd/destination/main.go @@ -76,10 +76,7 @@ func Main(args []string) { log.Warnf("expected cluster domain through args (falling back to %s)", *clusterDomain) } - opaquePorts, err := util.ParsePorts(*defaultOpaquePorts) - if err != nil { - log.Fatalf("Failed to parse opaque Ports %s: %s", *defaultOpaquePorts, err) - } + opaquePorts := util.ParsePorts(*defaultOpaquePorts) log.Infof("Using default opaque ports: %v", opaquePorts) diff --git a/pkg/util/parsing.go b/pkg/util/parsing.go index 6268bc73c410a..1bf252d7bd100 100644 --- a/pkg/util/parsing.go +++ b/pkg/util/parsing.go @@ -9,7 +9,7 @@ import ( // ParsePorts parses the given ports string into a map of ports; // this includes converting port ranges into separate ports -func ParsePorts(portsString string) (map[uint32]struct{}, error) { +func ParsePorts(portsString string) map[uint32]struct{} { opaquePorts := make(map[uint32]struct{}) if portsString != "" { portRanges := GetPortRanges(portsString) @@ -25,7 +25,7 @@ func ParsePorts(portsString string) (map[uint32]struct{}, error) { } } - return opaquePorts, nil + return opaquePorts } // ParseContainerOpaquePorts parses the opaque ports annotation into a list of diff --git a/pkg/util/parsing_test.go b/pkg/util/parsing_test.go index 605844e42de11..eba7490179b3b 100644 --- a/pkg/util/parsing_test.go +++ b/pkg/util/parsing_test.go @@ -45,10 +45,7 @@ func TestParsePorts(t *testing.T) { for _, tc := range testCases { tc := tc // pin t.Run(fmt.Sprintf("test %s", tc.ports), func(t *testing.T) { - ports, err := ParsePorts(tc.ports) - if err != nil { - t.Fatalf("could not parse ports: %v", err) - } + ports := ParsePorts(tc.ports) if diff := deep.Equal(ports, tc.result); diff != nil { t.Errorf("%v", diff) } diff --git a/test/fuzzing/fuzzers.go b/test/fuzzing/fuzzers.go index b0d0a96b8e304..cf26115b8c4d1 100644 --- a/test/fuzzing/fuzzers.go +++ b/test/fuzzing/fuzzers.go @@ -9,7 +9,7 @@ import ( // FuzzParsePorts fuzzes the ParsePorts function. func FuzzParsePorts(data []byte) int { - _, _ = util.ParsePorts(string(data)) + _ = util.ParsePorts(string(data)) return 1 }