Skip to content

Commit

Permalink
feat: trace HTTP gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
moul committed Mar 12, 2020
1 parent f54007e commit 2254bcf
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 16 deletions.
1 change: 1 addition & 0 deletions go/cmd/pathwar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ func globalPreRun() error {
return errcode.ErrInitTracer.Wrap(err)
}
tracer = zipkinot.Wrap(nativeTracer)
opentracing.SetGlobalTracer(tracer)
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions go/cmd/pathwar/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func Example() {
// admin admin commands
//
// FLAGS
// -bearer-secretkey ... bearer.sh secret key
// -debug false debug mode
// -opentracing-server ... optional opentracing server endpoint
// -bearer-secretkey ... bearer.sh secret key
// -debug false debug mode
// -zipkin-endpoint ... optional opentracing server
}
13 changes: 6 additions & 7 deletions go/pkg/pwagent/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/moby/moby/pkg/stdcopy"
"go.uber.org/zap"
"golang.org/x/crypto/sha3"
"moul.io/godev"
"pathwar.land/v2/go/pkg/errcode"
"pathwar.land/v2/go/pkg/pwapi"
"pathwar.land/v2/go/pkg/pwcompose"
Expand Down Expand Up @@ -49,9 +48,9 @@ func applyNginxConfig(ctx context.Context, apiInstances *pwapi.AgentListInstance
if err != nil {
return errcode.TODO.Wrap(err)
}
if logger.Check(zap.DebugLevel, "") != nil {
/*if logger.Check(zap.DebugLevel, "") != nil {
fmt.Fprintln(os.Stderr, "config", godev.PrettyJSON(config))
}
}*/

// configure nginx binary
buf, err := buildNginxConfigTar(config, logger)
Expand All @@ -77,15 +76,15 @@ func applyNginxConfig(ctx context.Context, apiInstances *pwapi.AgentListInstance
if err != nil {
return errcode.ErrNginxSendCommandReloadConfig.Wrap(err)
}
if logger.Check(zap.DebugLevel, "") != nil {
/*if logger.Check(zap.DebugLevel, "") != nil {
for _, upstream := range config.Upstreams {
fmt.Fprintf(os.Stderr, "- %s\n", upstream.Name)
for _, hash := range upstream.Hashes {
fmt.Fprintf(os.Stderr, " - %s.%s\n", hash, opts.DomainSuffix)
}
}
}
}*/

return nil
}
Expand Down Expand Up @@ -236,9 +235,9 @@ func buildNginxConfigTar(config *nginxConfig, logger *zap.Logger) (*bytes.Buffer
}
configBytes := configBuf.Bytes()

if logger.Check(zap.DebugLevel, "") != nil {
/* if logger.Check(zap.DebugLevel, "") != nil {
fmt.Fprintln(os.Stderr, string(configBytes))
}
}*/

var buf bytes.Buffer
tw := tar.NewWriter(&buf)
Expand Down
6 changes: 4 additions & 2 deletions go/pkg/pwagent/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"github.com/docker/docker/client"
"moul.io/godev"
"pathwar.land/v2/go/pkg/errcode"
"pathwar.land/v2/go/pkg/pwapi"
"pathwar.land/v2/go/pkg/pwcompose"
Expand Down Expand Up @@ -41,7 +40,10 @@ func updateAPIState(ctx context.Context, apiInstances *pwapi.AgentListInstances_

// FIXME: update state only if it changed
input := pwapi.AgentUpdateState_Input{Instances: apiInstances.Instances}
fmt.Println(godev.PrettyJSONPB(&input))
//if logger.Check(zap.DebugLevel, "") != nil {
// fmt.Println(godev.PrettyJSONPB(&input))
//}

if _, err := apiClient.AgentUpdateState(&input); err != nil {
return errcode.TODO.Wrap(err)
}
Expand Down
3 changes: 2 additions & 1 deletion go/pkg/pwapi/api_agent-update-state.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ func (svc *service) AgentUpdateState(ctx context.Context, in *AgentUpdateState_I
}
}

return nil, errcode.ErrNotImplemented
ret := &AgentUpdateState_Output{}
return ret, nil
}
44 changes: 41 additions & 3 deletions go/pkg/pwapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/rs/cors"
"github.com/soheilhy/cmux"
chilogger "github.com/treastech/logger"
Expand Down Expand Up @@ -208,19 +209,56 @@ func httpServer(ctx context.Context, serverListenerAddr string, opts ServerOpts)
r.Use(middleware.Timeout(opts.RequestTimeout))
r.Use(middleware.RealIP)
r.Use(middleware.RequestID)
gwmux := runtime.NewServeMux(

runtimeMux := runtime.NewServeMux(
runtime.WithMarshalerOption(runtime.MIMEWildcard, &gateway.JSONPb{
EmitDefaults: false,
Indent: " ",
OrigName: true,
}),
runtime.WithProtoErrorHandler(runtime.DefaultHTTPProtoErrorHandler),
runtime.WithIncomingHeaderMatcher(incomingHeaderMatcherFunc),
)
grpcOpts := []grpc.DialOption{grpc.WithInsecure()}
err := RegisterServiceHandlerFromEndpoint(ctx, gwmux, serverListenerAddr, grpcOpts)
var gwmux http.Handler = runtimeMux
dialOpts := []grpc.DialOption{grpc.WithInsecure()}
if opts.Tracer != nil {
var grpcGatewayTag = opentracing.Tag{Key: string(ext.Component), Value: "grpc-gateway"}
tracingWrapper := func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
parentSpanContext, err := opts.Tracer.Extract(
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(r.Header),
)
if err == nil || err == opentracing.ErrSpanContextNotFound {
serverSpan := opts.Tracer.StartSpan(
"ServeHTTP",
ext.RPCServerOption(parentSpanContext),
grpcGatewayTag,
)
r = r.WithContext(opentracing.ContextWithSpan(r.Context(), serverSpan))
defer serverSpan.Finish()
}
fmt.Println(r.Context())
h.ServeHTTP(w, r)
})
}
gwmux = tracingWrapper(gwmux)

dialOpts = append(dialOpts,
grpc.WithStreamInterceptor(
grpc_opentracing.StreamClientInterceptor(
grpc_opentracing.WithTracer(opts.Tracer))),
grpc.WithUnaryInterceptor(
grpc_opentracing.UnaryClientInterceptor(
grpc_opentracing.WithTracer(opts.Tracer))),
)
}

err := RegisterServiceHandlerFromEndpoint(ctx, runtimeMux, serverListenerAddr, dialOpts)
if err != nil {
return nil, errcode.ErrServerRegisterGateway.Wrap(err)
}

r.Mount("/", gwmux)
if opts.WithPprof {
r.HandleFunc("/debug/pprof/*", pprof.Index)
Expand Down
36 changes: 36 additions & 0 deletions go/pkg/pwapi/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package pwapi

import (
"strings"

"github.com/grpc-ecosystem/grpc-gateway/runtime"
)

var (
// trace header to propagate.
traceHeaders = []string{
"x-ot-span-context",
"x-request-id",

// Zipkin headers
"b3",
"x-b3-traceid",
"x-b3-spanid",
"x-b3-parentspanid",
"x-b3-sampled",
"X-b3-flags",

// Jaeger header (for native client)
"uber-trace-id",
}
)

func incomingHeaderMatcherFunc(key string) (string, bool) {
k := strings.ToLower(key)
for _, v := range traceHeaders {
if v == k {
return key, true
}
}
return runtime.DefaultHeaderMatcher(key)
}

0 comments on commit 2254bcf

Please sign in to comment.