diff --git a/cli/cmd/profile.go b/cli/cmd/profile.go index fa3a24dac29af..eac4f207c24c6 100644 --- a/cli/cmd/profile.go +++ b/cli/cmd/profile.go @@ -6,6 +6,7 @@ import ( "os" "time" + "github.com/linkerd/linkerd2/pkg/k8s" "github.com/linkerd/linkerd2/pkg/profiles" "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/util/validation" @@ -103,7 +104,11 @@ func newCmdProfile() *cobra.Command { } else if options.openAPI != "" { return profiles.RenderOpenAPI(options.openAPI, options.namespace, options.name, os.Stdout) } else if options.tap != "" { - return profiles.RenderTapOutputProfile(checkPublicAPIClientOrExit(), options.tap, options.namespace, options.name, options.tapDuration, int(options.tapRouteLimit), os.Stdout) + k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, 0) + if err != nil { + return err + } + return profiles.RenderTapOutputProfile(k8sAPI, options.tap, options.namespace, options.name, options.tapDuration, int(options.tapRouteLimit), os.Stdout) } else if options.proto != "" { return profiles.RenderProto(options.proto, options.namespace, options.name, os.Stdout) } diff --git a/cli/cmd/tap.go b/cli/cmd/tap.go index a4c26cf0396ed..4457fcd67a3e6 100644 --- a/cli/cmd/tap.go +++ b/cli/cmd/tap.go @@ -8,12 +8,12 @@ import ( "strings" "text/tabwriter" - "github.com/linkerd/linkerd2/cli/tap" "github.com/linkerd/linkerd2/controller/api/util" pb "github.com/linkerd/linkerd2/controller/gen/public" "github.com/linkerd/linkerd2/pkg/addr" "github.com/linkerd/linkerd2/pkg/k8s" "github.com/linkerd/linkerd2/pkg/protohttp" + "github.com/linkerd/linkerd2/pkg/tap" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "google.golang.org/grpc/codes" @@ -151,7 +151,7 @@ func requestTapByResourceFromAPI(w io.Writer, k8sAPI *k8s.KubernetesAPI, req *pb resource = req.GetTarget().GetResource().GetType() } - reader, body, err := tap.Reader(k8sAPI, req) + reader, body, err := tap.Reader(k8sAPI, req, 0) if err != nil { return err } diff --git a/cli/cmd/top.go b/cli/cmd/top.go index b4e91aa8fc427..480b9fee0ed6e 100644 --- a/cli/cmd/top.go +++ b/cli/cmd/top.go @@ -10,13 +10,13 @@ import ( "time" "github.com/golang/protobuf/ptypes" - "github.com/linkerd/linkerd2/cli/tap" "github.com/linkerd/linkerd2/controller/api/public" "github.com/linkerd/linkerd2/controller/api/util" pb "github.com/linkerd/linkerd2/controller/gen/public" "github.com/linkerd/linkerd2/pkg/addr" "github.com/linkerd/linkerd2/pkg/k8s" "github.com/linkerd/linkerd2/pkg/protohttp" + "github.com/linkerd/linkerd2/pkg/tap" runewidth "github.com/mattn/go-runewidth" termbox "github.com/nsf/termbox-go" log "github.com/sirupsen/logrus" @@ -372,7 +372,7 @@ func newCmdTop() *cobra.Command { } func getTrafficByResourceFromAPI(k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, table *topTable) error { - reader, body, err := tap.Reader(k8sAPI, req) + reader, body, err := tap.Reader(k8sAPI, req, 0) if err != nil { return err } diff --git a/pkg/profiles/tap.go b/pkg/profiles/tap.go index 01196b5ef06cf..478158fc9407b 100644 --- a/pkg/profiles/tap.go +++ b/pkg/profiles/tap.go @@ -1,8 +1,7 @@ package profiles import ( - "context" - "errors" + "bufio" "fmt" "io" "os" @@ -14,15 +13,17 @@ import ( "github.com/linkerd/linkerd2/controller/api/util" sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2" pb "github.com/linkerd/linkerd2/controller/gen/public" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - + "github.com/linkerd/linkerd2/pkg/k8s" + "github.com/linkerd/linkerd2/pkg/protohttp" + "github.com/linkerd/linkerd2/pkg/tap" log "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // RenderTapOutputProfile performs a tap on the desired resource and generates // a service profile with routes pre-populated from the tap data // Only inbound tap traffic is considered. -func RenderTapOutputProfile(client pb.ApiClient, tapResource, namespace, name string, tapDuration time.Duration, routeLimit int, w io.Writer) error { +func RenderTapOutputProfile(k8sAPI *k8s.KubernetesAPI, tapResource, namespace, name string, tapDuration time.Duration, routeLimit int, w io.Writer) error { requestParams := util.TapRequestParams{ Resource: tapResource, Namespace: namespace, @@ -34,7 +35,7 @@ func RenderTapOutputProfile(client pb.ApiClient, tapResource, namespace, name st return err } - profile, err := tapToServiceProfile(client, req, namespace, name, tapDuration, routeLimit) + profile, err := tapToServiceProfile(k8sAPI, req, namespace, name, tapDuration, routeLimit) if err != nil { return err } @@ -47,7 +48,7 @@ func RenderTapOutputProfile(client pb.ApiClient, tapResource, namespace, name st return nil } -func tapToServiceProfile(client pb.ApiClient, tapReq *pb.TapByResourceRequest, namespace, name string, tapDuration time.Duration, routeLimit int) (sp.ServiceProfile, error) { +func tapToServiceProfile(k8sAPI *k8s.KubernetesAPI, tapReq *pb.TapByResourceRequest, namespace, name string, tapDuration time.Duration, routeLimit int) (sp.ServiceProfile, error) { profile := sp.ServiceProfile{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s.%s.svc.cluster.local", name, namespace), @@ -56,44 +57,38 @@ func tapToServiceProfile(client pb.ApiClient, tapReq *pb.TapByResourceRequest, n TypeMeta: serviceProfileMeta, } - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(tapDuration)) - defer cancel() - - tapClient, err := client.TapByResource(ctx, tapReq) + reader, body, err := tap.Reader(k8sAPI, tapReq, tapDuration) if err != nil { - if strings.HasSuffix(err.Error(), context.DeadlineExceeded.Error()) { - // return a more user friendly error if we've exceeded the specified duration - return profile, errors.New("Tap duration exceeded, try increasing --tap-duration") - } return profile, err } + defer body.Close() - routes := routeSpecFromTap(tapClient, routeLimit) + routes := routeSpecFromTap(reader, routeLimit) profile.Spec.Routes = routes return profile, nil } -func routeSpecFromTap(tapClient pb.Api_TapByResourceClient, routeLimit int) []*sp.RouteSpec { +func routeSpecFromTap(tapByteStream *bufio.Reader, routeLimit int) []*sp.RouteSpec { routes := make([]*sp.RouteSpec, 0) routesMap := make(map[string]*sp.RouteSpec) for { log.Debug("Waiting for data...") - event, err := tapClient.Recv() - + event := pb.TapEvent{} + err := protohttp.FromByteStreamToProtocolBuffers(tapByteStream, &event) if err != nil { // expected errors when hitting the tapDuration deadline if err != io.EOF && - !strings.HasSuffix(err.Error(), context.DeadlineExceeded.Error()) && + !strings.HasSuffix(err.Error(), "(Client.Timeout exceeded while reading body)") && !strings.HasSuffix(err.Error(), "http2: response body closed") { fmt.Fprintln(os.Stderr, err) } break } - routeSpec := getPathDataFromTap(event) + routeSpec := getPathDataFromTap(&event) log.Debugf("Created route spec: %v", routeSpec) if routeSpec != nil { diff --git a/pkg/profiles/tap_test.go b/pkg/profiles/tap_test.go index f93d4bf9c4eb1..9070312c3ae92 100644 --- a/pkg/profiles/tap_test.go +++ b/pkg/profiles/tap_test.go @@ -1,13 +1,16 @@ package profiles import ( + "net/http" + "net/http/httptest" "testing" "time" - "github.com/linkerd/linkerd2/controller/api/public" "github.com/linkerd/linkerd2/controller/api/util" sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2" pb "github.com/linkerd/linkerd2/controller/gen/public" + "github.com/linkerd/linkerd2/pkg/k8s" + "github.com/linkerd/linkerd2/pkg/protohttp" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -71,10 +74,23 @@ func TestTapToServiceProfile(t *testing.T) { pb.TapEvent_INBOUND, ) - mockAPIClient := &public.MockAPIClient{} - mockAPIClient.APITapByResourceClientToReturn = &public.MockAPITapByResourceClient{ - TapEventsToReturn: []pb.TapEvent{event1, event2}, + kubeAPI, err := k8s.NewFakeAPI() + if err != nil { + t.Fatalf("Unexpected error: %v", err) } + ts := httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + for _, event := range []pb.TapEvent{event1, event2} { + event := event // pin + err = protohttp.WriteProtoToHTTPResponse(w, &event) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + }), + ) + defer ts.Close() + kubeAPI.Config.Host = ts.URL expectedServiceProfile := sp.ServiceProfile{ TypeMeta: serviceProfileMeta, @@ -102,7 +118,7 @@ func TestTapToServiceProfile(t *testing.T) { }, } - actualServiceProfile, err := tapToServiceProfile(mockAPIClient, tapReq, namespace, name, tapDuration, routeLimit) + actualServiceProfile, err := tapToServiceProfile(kubeAPI, tapReq, namespace, name, tapDuration, routeLimit) if err != nil { t.Fatalf("Failed to create ServiceProfile: %v", err) } diff --git a/cli/tap/tap.go b/pkg/tap/tap.go similarity index 92% rename from cli/tap/tap.go rename to pkg/tap/tap.go index 5db9a98b82914..79fa68fcfd8c4 100644 --- a/cli/tap/tap.go +++ b/pkg/tap/tap.go @@ -6,6 +6,7 @@ import ( "io" "net/http" "net/url" + "time" "github.com/golang/protobuf/proto" pb "github.com/linkerd/linkerd2/controller/gen/public" @@ -16,11 +17,12 @@ import ( // Reader initiates a TapByResourceRequest and returns a buffered Reader. // It is the caller's responsibility to call Close() on the io.ReadCloser. -func Reader(k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest) (*bufio.Reader, io.ReadCloser, error) { +func Reader(k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, timeout time.Duration) (*bufio.Reader, io.ReadCloser, error) { client, err := k8sAPI.NewClient() if err != nil { return nil, nil, err } + client.Timeout = timeout reqBytes, err := proto.Marshal(req) if err != nil {