Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Development Otel #38

Merged
merged 16 commits into from
Nov 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,9 @@ certbot renew

---


> Yaml file fields

BenchMarking
------------

```bash
ab -c 1000 -n 10000 http://localhost:<proxyPort>/health
```
```
1 change: 1 addition & 0 deletions cmd/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ var (
flagPrevKey = "prevkey"
flagCfgFile = "cfgfile"
flagCfgPath = "cfgpath"
flagMetric = "metric"
)
58 changes: 55 additions & 3 deletions cmd/cli/grpcxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (

"github.com/kenriortega/ngonx/pkg/errors"
"github.com/kenriortega/ngonx/pkg/logger"
"github.com/kenriortega/ngonx/pkg/otelify"
"github.com/spf13/cobra"
"github.com/talos-systems/grpc-proxy/proxy"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand All @@ -25,6 +28,25 @@ var grpcCmd = &cobra.Command{
Use: "grpc",
Short: "Run ngonx as a grpc proxy",
Run: func(cmd *cobra.Command, args []string) {

enableMetric, err := cmd.Flags().GetBool(flagMetric)
if err != nil {
logger.LogError(errors.Errorf("proxy: %v", err).Error())
}
if enableMetric {
// TODO: pass from compile vars
flush := otelify.InitProvider(
"ngonx",
"v0.4.5",
"dev",
// TODO: pass from yml file object
"0.0.0.0:55680",
)
defer flush()
// Exporter Metrics
go otelify.ExposeMetricServer(configFromYaml.ProxyGateway.PortExporterProxy)
}

var opts []grpc.ServerOption

lis, err := net.Listen("tcp", configFromYaml.GrpcProxy.Listener)
Expand All @@ -34,14 +56,19 @@ var grpcCmd = &cobra.Command{

logger.LogInfo(fmt.Sprintf("Proxy running at %q\n", configFromYaml.GrpcProxy.Listener))
simpleBackendGen := func(hostname string) proxy.Backend {
ctx, span := otel.Tracer("grpcxy.simpleBackendGen").Start(context.Background(), "simpleBackendGen")
defer span.End()
traceID := trace.SpanContextFromContext(ctx).TraceID().String()
return &proxy.SingleBackend{
GetConn: func(ctx context.Context) (context.Context, *grpc.ClientConn, error) {
md, _ := metadata.FromIncomingContext(ctx)

outCtx := metadata.NewOutgoingContext(ctx, md.Copy())
if configFromYaml.GrpcSSL.Enable {
creds, sslErr := credentials.NewClientTLSFromFile(
configFromYaml.GrpcClientCert, "")
configFromYaml.GrpcClientCert,
"",
)
if sslErr != nil {
logger.LogError(errors.Errorf("grpc: failed to parse credentials: %v", sslErr).Error())
}
Expand All @@ -51,6 +78,10 @@ var grpcCmd = &cobra.Command{
grpc.WithTransportCredentials(creds),
grpc.WithCodec(proxy.Codec()),
) //nolint: staticcheck
if err != nil {
otelify.InstrumentedError(span, "grpcxy.grpc.DialContext", traceID, err)
}
otelify.InstrumentedInfo(span, "grpcxy.grpc.DialContext", traceID)
return outCtx, conn, err
}
conn, err := grpc.DialContext(
Expand All @@ -59,27 +90,48 @@ var grpcCmd = &cobra.Command{
grpc.WithInsecure(),
grpc.WithCodec(proxy.Codec()),
) //nolint: staticcheck

if err != nil {
otelify.InstrumentedError(span, "grpcxy.grpc.DialContext", traceID, err)
}
otelify.InstrumentedInfo(span, "grpcxy.grpc.DialContext", traceID)
return outCtx, conn, err
},
}
}

director = func(ctx context.Context, fullMethodName string) (proxy.Mode, []proxy.Backend, error) {
ctx, span := otel.Tracer("grpcxy.director").Start(context.Background(), "director")
defer span.End()
traceID := trace.SpanContextFromContext(ctx).TraceID().String()
for _, bkd := range configFromYaml.GrpcEndpoints {
// Make sure we never forward internal services.
if !strings.HasPrefix(fullMethodName, bkd.Name) {
otelify.InstrumentedError(
span,
"grpcxy.not.strings.HasPrefix",
traceID,
errors.NewError("Unknown method"),
)

return proxy.One2One, nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if _, exists := md[":authority"]; exists {
otelify.InstrumentedInfo(span, "director.proxy.One2One", traceID)

return proxy.One2One, []proxy.Backend{
simpleBackendGen(bkd.HostURI),
}, nil
}
}
}
otelify.InstrumentedError(
span,
"grpcxy.One2One",
traceID,
errors.NewError("Unknown method"),
)
return proxy.One2One, nil, status.Errorf(codes.Unimplemented, "Unknown method")
}
opts = append(opts,
Expand Down Expand Up @@ -110,7 +162,7 @@ var grpcCmd = &cobra.Command{
}

func init() {

grpcCmd.Flags().Bool(flagMetric, false, "Action for enable metrics OTEL")
rootCmd.AddCommand(grpcCmd)
}

Expand Down
29 changes: 24 additions & 5 deletions cmd/cli/proxy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cli

import (
"context"

domain "github.com/kenriortega/ngonx/internal/proxy/domain"
handlers "github.com/kenriortega/ngonx/internal/proxy/handlers"
services "github.com/kenriortega/ngonx/internal/proxy/services"
Expand All @@ -9,14 +11,32 @@ import (
"github.com/kenriortega/ngonx/pkg/genkey"
"github.com/kenriortega/ngonx/pkg/httpsrv"
"github.com/kenriortega/ngonx/pkg/logger"
"github.com/kenriortega/ngonx/pkg/metric"
"github.com/kenriortega/ngonx/pkg/otelify"
"github.com/spf13/cobra"
)

var proxyCmd = &cobra.Command{
Use: "proxy",
Short: "Run ngonx as a reverse proxy",
Run: func(cmd *cobra.Command, args []string) {
enableMetric, err := cmd.Flags().GetBool(flagMetric)
if err != nil {
logger.LogError(errors.Errorf("proxy: %v", err).Error())
}
if enableMetric {
// TODO: pass from compile vars
flush := otelify.InitProvider(
"ngonx",
"v0.4.5",
"dev",
// TODO: pass from yml file object
"0.0.0.0:55680",
)
defer flush()
// Exporter Metrics
go otelify.ExposeMetricServer(configFromYaml.ProxyGateway.PortExporterProxy)
}

port, err := cmd.Flags().GetInt(flagPort)
if err != nil {
logger.LogError(errors.Errorf("proxy: %v", err).Error())
Expand All @@ -29,15 +49,14 @@ var proxyCmd = &cobra.Command{
if err != nil {
logger.LogError(errors.Errorf("proxy: %v", err).Error())
}
// Exporter Metrics
go metric.ExposeMetricServer(configFromYaml.ProxyGateway.PortExporterProxy)

// proxy logic
engine := configFromYaml.ProxyCache.Engine
securityType := configFromYaml.ProxySecurity.Type
key := configFromYaml.ProxyCache.Key + "_" + securityType

var proxyRepository domain.ProxyRepository
clientBadger := badgerdb.GetBadgerDB(false)
clientBadger := badgerdb.GetBadgerDB(context.Background(), false)
proxyRepository = domain.NewProxyRepository(clientBadger)
h := handlers.ProxyHandler{
Service: services.NewProxyService(proxyRepository),
Expand All @@ -61,7 +80,6 @@ var proxyCmd = &cobra.Command{
}

for _, endpoints := range configFromYaml.ProxyGateway.EnpointsProxy {

h.ProxyGateway(endpoints, engine, key, securityType)
}

Expand Down Expand Up @@ -91,6 +109,7 @@ var proxyCmd = &cobra.Command{
func init() {
proxyCmd.Flags().Int(flagPort, 5000, "Port to serve to run proxy")
proxyCmd.Flags().Bool(flagGenApiKey, false, "Action for generate hash for protected routes")
proxyCmd.Flags().Bool(flagMetric, false, "Action for enable metrics OTEL")
proxyCmd.Flags().String(flagPrevKey, "", "Action for save a previous hash for protected routes to validate JWT")
rootCmd.AddCommand(proxyCmd)

Expand Down
146 changes: 146 additions & 0 deletions examples/otelp/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package main

import (
"context"
"fmt"
"log"
"math/rand"
"net/http"
"time"

"github.com/gorilla/mux"
"github.com/kenriortega/ngonx/pkg/logger"
"github.com/kenriortega/ngonx/pkg/otelify"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)

// global vars...gasp!
var addr = "127.0.0.1:8000"
var tracer trace.Tracer
var httpClient http.Client

func main() {
flush := otelify.InitProvider(
"example",
"v0.4.5",
"test",
"0.0.0.0:55680",
)
defer flush()

// initiate globals
tracer = otel.Tracer("example-app")
httpClient = http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)}
// create and start server
server := instrumentedServer(handler)

fmt.Println("listening...")
log.Fatal(server.ListenAndServe())
}

func handler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()

longRunningProcess(ctx)

// check cache
if shouldExecute(40) {
url := "http://" + addr + "/"

resp, err := instrumentedGet(ctx, url)
defer resp.Body.Close()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}

// query database
if shouldExecute(40) {
url := "http://" + addr + "/"

resp, err := instrumentedGet(ctx, url)
defer resp.Body.Close()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}

// #nosec
func shouldExecute(percent int) bool {
return rand.Int()%100 < percent
}

func longRunningProcess(ctx context.Context) {
ctx, sp := tracer.Start(ctx, "Long Running Process")
defer sp.End()

time.Sleep(time.Millisecond * 50)
sp.AddEvent("halfway done!")
time.Sleep(time.Millisecond * 50)
}

/***
Server
***/
func instrumentedServer(handler http.HandlerFunc) *http.Server {
// OpenMetrics handler : metrics and exemplars
omHandleFunc := func(w http.ResponseWriter, r *http.Request) {
start := time.Now()

handler.ServeHTTP(w, r)

ctx := r.Context()
traceID := trace.SpanContextFromContext(ctx).TraceID().String()

otelify.MetricRequestLatencyProxy.(prometheus.ExemplarObserver).ObserveWithExemplar(
time.Since(start).Seconds(), prometheus.Labels{"traceID": traceID},
)

// log the trace id with other fields so we can discover traces through logs
logger.LogInfo(
"http request",
zap.String("traceID", traceID),
zap.String("path", r.URL.Path),
zap.Duration("latency", time.Since(start)),
)
}

// OTel handler : traces
otelHandler := otelhttp.NewHandler(http.HandlerFunc(omHandleFunc), "http")

r := mux.NewRouter()
r.Handle("/", otelHandler)
r.Handle("/metrics", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{
EnableOpenMetrics: true,
}))

return &http.Server{
Handler: r,
Addr: "0.0.0.0:8000",
}
}

/***
Client
***/
func instrumentedGet(ctx context.Context, url string) (*http.Response, error) {
// create http request
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
panic(err)
}

return httpClient.Do(req)
}

func handleErr(err error, message string) {
if err != nil {
panic(fmt.Sprintf("%s: %s", err, message))
}
}
Loading