Skip to content

Commit

Permalink
Sample gRPC server
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <[email protected]>

Fix lint

Signed-off-by: Yuri Shkuro <[email protected]>

Add gRPC endpoint to collector

Signed-off-by: Yuri Shkuro <[email protected]>

Add proper grpc server

Signed-off-by: Yuri Shkuro <[email protected]>

Add grpc to all-in-one

Signed-off-by: Yuri Shkuro <[email protected]>

adds the gRPC port to the docker-all-in-one Dockerfile (jaegertracing#773) (jaegertracing#967)

Signed-off-by: Jan Heylen <[email protected]>

Simple fixes (jaegertracing#999)

* Fix test error

Signed-off-by: Isaac Hier <[email protected]>

* go fmt

Signed-off-by: Isaac Hier <[email protected]>

Implement PostSpans for collector gRPC handler

Signed-off-by: Isaac Hier <[email protected]>

Fix formatting

Signed-off-by: Isaac Hier <[email protected]>

Remove timeout

Signed-off-by: Isaac Hier <[email protected]>

Use expectedError variable

Signed-off-by: Isaac Hier <[email protected]>
  • Loading branch information
Yuri Shkuro authored and pavolloffay committed Nov 5, 2018
1 parent eab423c commit 6566ba1
Show file tree
Hide file tree
Showing 20 changed files with 1,715 additions and 993 deletions.
43 changes: 26 additions & 17 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ TOP_PKGS := $(shell glide novendor | \
grep -v \
-e ./thrift-gen/... \
-e ./swagger-gen/... \
-e ./proto-gen/... \
-e ./examples/... \
-e ./scripts/...\
)
Expand All @@ -17,6 +18,7 @@ ALL_SRC := $(shell find . -name "*.go" | \
-e vendor \
-e /thrift-gen/ \
-e /swagger-gen/ \
-e /proto-gen/ \
-e /examples/ \
-e doc.go \
-e model.pb.go \
Expand Down Expand Up @@ -335,6 +337,20 @@ generate-mocks: install-mockery
echo-version:
@echo $(GIT_CLOSEST_TAG)

PROTO_INCLUDES := \
-I model/proto \
-I vendor/github.com/grpc-ecosystem/grpc-gateway \
-I vendor/github.com/gogo/googleapis \
-I vendor/github.com/gogo/protobuf
# Remapping of std types to gogo types (must not contain spaces)
PROTO_GOGO_MAPPINGS := $(shell echo \
Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types, \
Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types, \
Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types, \
Mgoogle/api/annotations.proto=github.com/gogo/googleapis/google/api, \
Mmodel.proto=github.com/jaegertracing/jaeger/model \
| sed 's/ //g')

.PHONY: proto
proto:
# Generate gogo, gRPC-Gateway, swagger, go-validators output.
Expand Down Expand Up @@ -362,27 +378,20 @@ proto:
# (https://medium.com/@linchenon/generate-grpc-and-protobuf-libraries-with-containers-c15ba4e4f3ad)
#
protoc \
-I model/proto \
-I vendor/github.com/grpc-ecosystem/grpc-gateway/ \
-I vendor/github.com/gogo/googleapis/ \
-I vendor/ \
--gogo_out=plugins=grpc,\
Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,\
Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types,\
Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types,\
Mgoogle/api/annotations.proto=github.com/gogo/googleapis/google/api:\
$$GOPATH/src/github.com/jaegertracing/jaeger/model/ \
--grpc-gateway_out=\
Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,\
Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types,\
Mgoogle/api/annotations.proto=github.com/gogo/googleapis/google/api:\
$$GOPATH/src/github.com/jaegertracing/jaeger/model \
--swagger_out=model/proto/openapi/ \
$(PROTO_INCLUDES) \
--gogo_out=plugins=grpc,$(PROTO_GOGO_MAPPINGS):$(PWD)/model/ \
model/proto/model.proto

protoc \
$(PROTO_INCLUDES) \
--gogo_out=plugins=grpc,$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/api_v2/ \
--grpc-gateway_out=$(PROTO_GOGO_MAPPINGS):$(PWD)/proto-gen/api_v2/ \
--swagger_out=$(PWD)/proto-gen/openapi/ \
model/proto/api_v2.proto

protoc \
-I model/proto \
--go_out=$$GOPATH/src/github.com/jaegertracing/jaeger/model/prototest/ \
--go_out=$(PWD)/model/prototest/ \
model/proto/model_test.proto

.PHONY: proto-install
Expand Down
3 changes: 3 additions & 0 deletions cmd/all-in-one/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ EXPOSE 5778
# Collector HTTP
EXPOSE 14268

# Collector gRPC
EXPOSE 14270

# Web HTTP
EXPOSE 16686

Expand Down
84 changes: 57 additions & 27 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"

agentApp "github.com/jaegertracing/jaeger/cmd/agent/app"
agentTchanRep "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/tchannel"
Expand All @@ -54,6 +56,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/version"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
Expand Down Expand Up @@ -216,38 +219,65 @@ func startCollector(
if err != nil {
logger.Fatal("Unable to set up builder", zap.Error(err))
}
ch, err := tchannel.NewChannel("jaeger-collector", &tchannel.ChannelOptions{})
if err != nil {
logger.Fatal("Unable to create new TChannel", zap.Error(err))

zipkinSpansHandler, jaegerBatchesHandler, grpcHandler := spanBuilder.BuildHandlers()

{
ch, err := tchannel.NewChannel("jaeger-collector", &tchannel.ChannelOptions{})
if err != nil {
logger.Fatal("Unable to create new TChannel", zap.Error(err))
}
server := thrift.NewServer(ch)
server.Register(jc.NewTChanCollectorServer(jaegerBatchesHandler))
server.Register(zc.NewTChanZipkinCollectorServer(zipkinSpansHandler))
server.Register(sc.NewTChanSamplingManagerServer(samplingHandler))
portStr := ":" + strconv.Itoa(cOpts.CollectorPort)
listener, err := net.Listen("tcp", portStr)
if err != nil {
logger.Fatal("Unable to start listening on channel", zap.Error(err))
}
logger.Info("Starting jaeger-collector TChannel server", zap.Int("port", cOpts.CollectorPort))
ch.Serve(listener)
}
server := thrift.NewServer(ch)
zipkinSpansHandler, jaegerBatchesHandler := spanBuilder.BuildHandlers()
server.Register(jc.NewTChanCollectorServer(jaegerBatchesHandler))
server.Register(zc.NewTChanZipkinCollectorServer(zipkinSpansHandler))
server.Register(sc.NewTChanSamplingManagerServer(samplingHandler))
portStr := ":" + strconv.Itoa(cOpts.CollectorPort)
listener, err := net.Listen("tcp", portStr)
if err != nil {
logger.Fatal("Unable to start listening on channel", zap.Error(err))

{
grpcPortStr := ":" + strconv.Itoa(cOpts.CollectorGRPCPort)
lis, err := net.Listen("tcp", grpcPortStr)
if err != nil {
logger.Fatal("Failed to listen on gRPC port", zap.Error(err))
}

log := grpclog.NewLoggerV2(os.Stdout, os.Stderr, os.Stderr)
grpclog.SetLoggerV2(log)

grpcSrv := grpc.NewServer()
api_v2.RegisterCollectorServiceServer(grpcSrv, grpcHandler)
logger.Info("Starting Jaeger Collector gRPC server", zap.Int("grpc-port", cOpts.CollectorGRPCPort))
go func() {
if err := grpcSrv.Serve(lis); err != nil {
logger.Fatal("Could not launch gRPC service", zap.Error(err))
}
hc.Set(healthcheck.Unavailable)
}()
}
ch.Serve(listener)
logger.Info("Starting jaeger-collector TChannel server", zap.Int("port", cOpts.CollectorPort))

r := mux.NewRouter()
apiHandler := collectorApp.NewAPIHandler(jaegerBatchesHandler)
apiHandler.RegisterRoutes(r)
httpPortStr := ":" + strconv.Itoa(cOpts.CollectorHTTPPort)
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)
{
r := mux.NewRouter()
apiHandler := collectorApp.NewAPIHandler(jaegerBatchesHandler)
apiHandler.RegisterRoutes(r)
httpPortStr := ":" + strconv.Itoa(cOpts.CollectorHTTPPort)
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)

go startZipkinHTTPAPI(logger, cOpts.CollectorZipkinHTTPPort, zipkinSpansHandler, recoveryHandler)
go startZipkinHTTPAPI(logger, cOpts.CollectorZipkinHTTPPort, zipkinSpansHandler, recoveryHandler)

logger.Info("Starting jaeger-collector HTTP server", zap.Int("http-port", cOpts.CollectorHTTPPort))
go func() {
if err := http.ListenAndServe(httpPortStr, recoveryHandler(r)); err != nil {
logger.Fatal("Could not launch jaeger-collector HTTP server", zap.Error(err))
}
hc.Set(healthcheck.Unavailable)
}()
logger.Info("Starting jaeger-collector HTTP server", zap.Int("http-port", cOpts.CollectorHTTPPort))
go func() {
if err := http.ListenAndServe(httpPortStr, recoveryHandler(r)); err != nil {
logger.Fatal("Could not launch jaeger-collector HTTP server", zap.Error(err))
}
hc.Set(healthcheck.Unavailable)
}()
}
}

func startZipkinHTTPAPI(
Expand Down
28 changes: 18 additions & 10 deletions cmd/collector/app/builder/builder_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ import (
)

const (
collectorQueueSize = "collector.queue-size"
collectorNumWorkers = "collector.num-workers"
collectorWriteCacheTTL = "collector.write-cache-ttl"
collectorPort = "collector.port"
collectorHTTPPort = "collector.http-port"
collectorZipkinHTTPPort = "collector.zipkin.http-port"
collectorQueueSize = "collector.queue-size"
collectorNumWorkers = "collector.num-workers"
collectorPort = "collector.port"
collectorHTTPPort = "collector.http-port"
collectorGRPCPort = "collector.grpc-port"
collectorZipkinHTTPort = "collector.zipkin.http-port"

defaultTChannelPort = 14267
defaultHTTPPort = 14268
defaultGRPCPort = 14270
// CollectorDefaultHealthCheckHTTPPort is the default HTTP Port for health check
CollectorDefaultHealthCheckHTTPPort = 14269
)
Expand All @@ -43,6 +47,8 @@ type CollectorOptions struct {
CollectorPort int
// CollectorHTTPPort is the port that the collector service listens in on for http requests
CollectorHTTPPort int
// CollectorGRPCPort is the port that the collector service listens in on for gRPC requests
CollectorGRPCPort int
// CollectorZipkinHTTPPort is the port that the Zipkin collector service listens in on for http requests
CollectorZipkinHTTPPort int
}
Expand All @@ -51,9 +57,10 @@ type CollectorOptions struct {
func AddFlags(flags *flag.FlagSet) {
flags.Int(collectorQueueSize, app.DefaultQueueSize, "The queue size of the collector")
flags.Int(collectorNumWorkers, app.DefaultNumWorkers, "The number of workers pulling items from the queue")
flags.Int(collectorPort, 14267, "The tchannel port for the collector service")
flags.Int(collectorHTTPPort, 14268, "The http port for the collector service")
flags.Int(collectorZipkinHTTPPort, 0, "The http port for the Zipkin collector service e.g. 9411")
flags.Int(collectorPort, defaultTChannelPort, "The TChannel port for the collector service")
flags.Int(collectorHTTPPort, defaultHTTPPort, "The HTTP port for the collector service")
flags.Int(collectorGRPCPort, defaultGRPCPort, "The gRPC port for the collector service")
flags.Int(collectorZipkinHTTPort, 0, "The HTTP port for the Zipkin collector service e.g. 9411")
}

// InitFromViper initializes CollectorOptions with properties from viper
Expand All @@ -62,6 +69,7 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions {
cOpts.NumWorkers = v.GetInt(collectorNumWorkers)
cOpts.CollectorPort = v.GetInt(collectorPort)
cOpts.CollectorHTTPPort = v.GetInt(collectorHTTPPort)
cOpts.CollectorZipkinHTTPPort = v.GetInt(collectorZipkinHTTPPort)
cOpts.CollectorGRPCPort = v.GetInt(collectorGRPCPort)
cOpts.CollectorZipkinHTTPPort = v.GetInt(collectorZipkinHTTPort)
return cOpts
}
9 changes: 7 additions & 2 deletions cmd/collector/app/builder/span_handler_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ func NewSpanHandlerBuilder(cOpts *CollectorOptions, spanWriter spanstore.Writer,
}

// BuildHandlers builds span handlers (Zipkin, Jaeger)
func (spanHb *SpanHandlerBuilder) BuildHandlers() (app.ZipkinSpansHandler, app.JaegerBatchesHandler) {
func (spanHb *SpanHandlerBuilder) BuildHandlers() (
app.ZipkinSpansHandler,
app.JaegerBatchesHandler,
*app.GRPCHandler,
) {
hostname, _ := os.Hostname()
hostMetrics := spanHb.metricsFactory.Namespace("", map[string]string{"host": hostname})

Expand All @@ -79,7 +83,8 @@ func (spanHb *SpanHandlerBuilder) BuildHandlers() (app.ZipkinSpansHandler, app.J
)

return app.NewZipkinSpanHandler(spanHb.logger, spanProcessor, zSanitizer),
app.NewJaegerSpanHandler(spanHb.logger, spanProcessor)
app.NewJaegerSpanHandler(spanHb.logger, spanProcessor),
app.NewGRPCHandler(spanHb.logger, spanProcessor)
}

func defaultSpanFilter(*model.Span) bool {
Expand Down
3 changes: 2 additions & 1 deletion cmd/collector/app/builder/span_handler_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ func TestNewSpanHandlerBuilder(t *testing.T) {
)
require.NoError(t, err)
assert.NotNil(t, handler)
zipkin, jaeger := handler.BuildHandlers()
zipkin, jaeger, grpc := handler.BuildHandlers()
assert.NotNil(t, zipkin)
assert.NotNil(t, jaeger)
assert.NotNil(t, grpc)
}

func TestDefaultSpanFilter(t *testing.T) {
Expand Down
73 changes: 73 additions & 0 deletions cmd/collector/app/grpc_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"context"
"time"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

// GRPCHandler implements gRPC CollectorService.
type GRPCHandler struct {
logger *zap.Logger
spanProcessor SpanProcessor
}

// NewGRPCHandler registers routes for this handler on the given router.
func NewGRPCHandler(logger *zap.Logger, spanProcessor SpanProcessor) *GRPCHandler {
return &GRPCHandler{
logger: logger,
spanProcessor: spanProcessor,
}
}

// PostSpans implements gRPC CollectorService.
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
oks, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, JaegerFormatType)
if err != nil {
g.logger.Error("cannot process spans", zap.Error(err))
return nil, err
}
success := true
for _, ok := range oks {
if !ok {
success = false
break
}
}
return &api_v2.PostSpansResponse{Ok: success}, nil
}

// GetTrace gets trace
func (g *GRPCHandler) GetTrace(ctx context.Context, req *api_v2.GetTraceRequest) (*api_v2.GetTraceResponse, error) {
// TODO
return &api_v2.GetTraceResponse{
Trace: &model.Trace{
Spans: []*model.Span{
{
TraceID: model.TraceID{Low: 123},
SpanID: model.NewSpanID(456),
OperationName: "foo bar",
StartTime: time.Now(),
},
},
},
}, nil
}
Loading

0 comments on commit 6566ba1

Please sign in to comment.