Skip to content

Commit

Permalink
Update linkerd profile --tap to Tap APIService
Browse files Browse the repository at this point in the history
PR #3167 introduced a Tap APIService, and migrated linkerd tap to it.

This change migrates `linkerd profile --tap` to the new Tap APIService.

Depends on #3186
Fixes #3169

Signed-off-by: Andrew Seigner <[email protected]>
  • Loading branch information
siggy committed Aug 2, 2019
1 parent c7c3f87 commit 5171b51
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 32 deletions.
7 changes: 6 additions & 1 deletion cli/cmd/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"time"

"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/profiles"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/util/validation"
Expand Down Expand Up @@ -103,7 +104,11 @@ func newCmdProfile() *cobra.Command {
} else if options.openAPI != "" {
return profiles.RenderOpenAPI(options.openAPI, options.namespace, options.name, os.Stdout)
} else if options.tap != "" {
return profiles.RenderTapOutputProfile(checkPublicAPIClientOrExit(), options.tap, options.namespace, options.name, options.tapDuration, int(options.tapRouteLimit), os.Stdout)
k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, 0)
if err != nil {
return err
}
return profiles.RenderTapOutputProfile(k8sAPI, options.tap, options.namespace, options.name, options.tapDuration, int(options.tapRouteLimit), os.Stdout)
} else if options.proto != "" {
return profiles.RenderProto(options.proto, options.namespace, options.name, os.Stdout)
}
Expand Down
4 changes: 2 additions & 2 deletions cli/cmd/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"strings"
"text/tabwriter"

"github.com/linkerd/linkerd2/cli/tap"
"github.com/linkerd/linkerd2/controller/api/util"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/addr"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/protohttp"
"github.com/linkerd/linkerd2/pkg/tap"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -151,7 +151,7 @@ func requestTapByResourceFromAPI(w io.Writer, k8sAPI *k8s.KubernetesAPI, req *pb
resource = req.GetTarget().GetResource().GetType()
}

reader, body, err := tap.Reader(k8sAPI, req)
reader, body, err := tap.Reader(k8sAPI, req, 0)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cli/cmd/top.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ import (
"time"

"github.com/golang/protobuf/ptypes"
"github.com/linkerd/linkerd2/cli/tap"
"github.com/linkerd/linkerd2/controller/api/public"
"github.com/linkerd/linkerd2/controller/api/util"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/addr"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/protohttp"
"github.com/linkerd/linkerd2/pkg/tap"
runewidth "github.com/mattn/go-runewidth"
termbox "github.com/nsf/termbox-go"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -372,7 +372,7 @@ func newCmdTop() *cobra.Command {
}

func getTrafficByResourceFromAPI(k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, table *topTable) error {
reader, body, err := tap.Reader(k8sAPI, req)
reader, body, err := tap.Reader(k8sAPI, req, 0)
if err != nil {
return err
}
Expand Down
37 changes: 16 additions & 21 deletions pkg/profiles/tap.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package profiles

import (
"context"
"errors"
"bufio"
"fmt"
"io"
"os"
Expand All @@ -14,15 +13,17 @@ import (
"github.com/linkerd/linkerd2/controller/api/util"
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
pb "github.com/linkerd/linkerd2/controller/gen/public"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/protohttp"
"github.com/linkerd/linkerd2/pkg/tap"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// RenderTapOutputProfile performs a tap on the desired resource and generates
// a service profile with routes pre-populated from the tap data
// Only inbound tap traffic is considered.
func RenderTapOutputProfile(client pb.ApiClient, tapResource, namespace, name string, tapDuration time.Duration, routeLimit int, w io.Writer) error {
func RenderTapOutputProfile(k8sAPI *k8s.KubernetesAPI, tapResource, namespace, name string, tapDuration time.Duration, routeLimit int, w io.Writer) error {
requestParams := util.TapRequestParams{
Resource: tapResource,
Namespace: namespace,
Expand All @@ -34,7 +35,7 @@ func RenderTapOutputProfile(client pb.ApiClient, tapResource, namespace, name st
return err
}

profile, err := tapToServiceProfile(client, req, namespace, name, tapDuration, routeLimit)
profile, err := tapToServiceProfile(k8sAPI, req, namespace, name, tapDuration, routeLimit)
if err != nil {
return err
}
Expand All @@ -47,7 +48,7 @@ func RenderTapOutputProfile(client pb.ApiClient, tapResource, namespace, name st
return nil
}

func tapToServiceProfile(client pb.ApiClient, tapReq *pb.TapByResourceRequest, namespace, name string, tapDuration time.Duration, routeLimit int) (sp.ServiceProfile, error) {
func tapToServiceProfile(k8sAPI *k8s.KubernetesAPI, tapReq *pb.TapByResourceRequest, namespace, name string, tapDuration time.Duration, routeLimit int) (sp.ServiceProfile, error) {
profile := sp.ServiceProfile{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s.%s.svc.cluster.local", name, namespace),
Expand All @@ -56,44 +57,38 @@ func tapToServiceProfile(client pb.ApiClient, tapReq *pb.TapByResourceRequest, n
TypeMeta: serviceProfileMeta,
}

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(tapDuration))
defer cancel()

tapClient, err := client.TapByResource(ctx, tapReq)
reader, body, err := tap.Reader(k8sAPI, tapReq, tapDuration)
if err != nil {
if strings.HasSuffix(err.Error(), context.DeadlineExceeded.Error()) {
// return a more user friendly error if we've exceeded the specified duration
return profile, errors.New("Tap duration exceeded, try increasing --tap-duration")
}
return profile, err
}
defer body.Close()

routes := routeSpecFromTap(tapClient, routeLimit)
routes := routeSpecFromTap(reader, routeLimit)

profile.Spec.Routes = routes

return profile, nil
}

func routeSpecFromTap(tapClient pb.Api_TapByResourceClient, routeLimit int) []*sp.RouteSpec {
func routeSpecFromTap(tapByteStream *bufio.Reader, routeLimit int) []*sp.RouteSpec {
routes := make([]*sp.RouteSpec, 0)
routesMap := make(map[string]*sp.RouteSpec)

for {
log.Debug("Waiting for data...")
event, err := tapClient.Recv()

event := pb.TapEvent{}
err := protohttp.FromByteStreamToProtocolBuffers(tapByteStream, &event)
if err != nil {
// expected errors when hitting the tapDuration deadline
if err != io.EOF &&
!strings.HasSuffix(err.Error(), context.DeadlineExceeded.Error()) &&
!strings.HasSuffix(err.Error(), "(Client.Timeout exceeded while reading body)") &&
!strings.HasSuffix(err.Error(), "http2: response body closed") {
fmt.Fprintln(os.Stderr, err)
}
break
}

routeSpec := getPathDataFromTap(event)
routeSpec := getPathDataFromTap(&event)
log.Debugf("Created route spec: %v", routeSpec)

if routeSpec != nil {
Expand Down
26 changes: 21 additions & 5 deletions pkg/profiles/tap_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package profiles

import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/linkerd/linkerd2/controller/api/public"
"github.com/linkerd/linkerd2/controller/api/util"
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/protohttp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -71,10 +74,23 @@ func TestTapToServiceProfile(t *testing.T) {
pb.TapEvent_INBOUND,
)

mockAPIClient := &public.MockAPIClient{}
mockAPIClient.APITapByResourceClientToReturn = &public.MockAPITapByResourceClient{
TapEventsToReturn: []pb.TapEvent{event1, event2},
kubeAPI, err := k8s.NewFakeAPI()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
ts := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
for _, event := range []pb.TapEvent{event1, event2} {
event := event // pin
err = protohttp.WriteProtoToHTTPResponse(w, &event)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
}),
)
defer ts.Close()
kubeAPI.Config.Host = ts.URL

expectedServiceProfile := sp.ServiceProfile{
TypeMeta: serviceProfileMeta,
Expand Down Expand Up @@ -102,7 +118,7 @@ func TestTapToServiceProfile(t *testing.T) {
},
}

actualServiceProfile, err := tapToServiceProfile(mockAPIClient, tapReq, namespace, name, tapDuration, routeLimit)
actualServiceProfile, err := tapToServiceProfile(kubeAPI, tapReq, namespace, name, tapDuration, routeLimit)
if err != nil {
t.Fatalf("Failed to create ServiceProfile: %v", err)
}
Expand Down
4 changes: 3 additions & 1 deletion cli/tap/tap.go → pkg/tap/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"net/http"
"net/url"
"time"

"github.com/golang/protobuf/proto"
pb "github.com/linkerd/linkerd2/controller/gen/public"
Expand All @@ -16,11 +17,12 @@ import (

// Reader initiates a TapByResourceRequest and returns a buffered Reader.
// It is the caller's responsibility to call Close() on the io.ReadCloser.
func Reader(k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest) (*bufio.Reader, io.ReadCloser, error) {
func Reader(k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, timeout time.Duration) (*bufio.Reader, io.ReadCloser, error) {
client, err := k8sAPI.NewClient()
if err != nil {
return nil, nil, err
}
client.Timeout = timeout

reqBytes, err := proto.Marshal(req)
if err != nil {
Expand Down

0 comments on commit 5171b51

Please sign in to comment.