Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close gRPC for collecting and reporting #1187

Merged
merged 1 commit into from
Nov 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/agent/app/reporter/grpc/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
type ProxyBuilder struct {
reporter aReporter.Reporter
manager httpserver.ClientConfigManager
conn *grpc.ClientConn
}

// NewCollectorProxy creates ProxyBuilder
Expand All @@ -54,6 +55,7 @@ func NewCollectorProxy(o *Options, logger *zap.Logger) (*ProxyBuilder, error) {
conn, _ = grpc.Dial(o.CollectorHostPort[0], grpc.WithInsecure())
}
return &ProxyBuilder{
conn: conn,
reporter: NewReporter(conn, logger),
manager: NewSamplingManager(conn)}, nil
}
Expand All @@ -67,3 +69,8 @@ func (b ProxyBuilder) GetReporter() aReporter.Reporter {
func (b ProxyBuilder) GetManager() httpserver.ClientConfigManager {
return b.manager
}

// Close closes connections used by proxy.
func (b ProxyBuilder) Close() error {
return b.conn.Close()
}
3 changes: 3 additions & 0 deletions cmd/agent/app/reporter/grpc/collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func TestProxyBuilder(t *testing.T) {
require.NotNil(t, proxy)
assert.NotNil(t, proxy.GetReporter())
assert.NotNil(t, proxy.GetManager())
assert.Nil(t, proxy.Close())
assert.EqualError(t, proxy.Close(), "rpc error: code = Canceled desc = grpc: the client connection is closing")
}

func TestMultipleCollectors(t *testing.T) {
Expand Down Expand Up @@ -70,4 +72,5 @@ func TestMultipleCollectors(t *testing.T) {
}
}
assert.Equal(t, true, bothServers)
require.Nil(t, proxy.Close())
}
6 changes: 6 additions & 0 deletions cmd/agent/app/reporter/tchannel/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,9 @@ func (b ProxyBuilder) GetReporter() reporter.Reporter {
func (b ProxyBuilder) GetManager() httpserver.ClientConfigManager {
return b.manager
}

// Close closes connections used by proxy.
func (b ProxyBuilder) Close() error {
b.reporter.Channel().Close()
return nil
}
1 change: 1 addition & 0 deletions cmd/agent/app/reporter/tchannel/collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ func TestCreate(t *testing.T) {
assert.Equal(t, r, b.GetReporter())
m := httpserver.NewCollectorProxy(r.CollectorServiceName(), r.Channel(), mFactory)
assert.Equal(t, m, b.GetManager())
assert.Nil(t, b.Close())
}
14 changes: 13 additions & 1 deletion cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package main

import (
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/pkg/errors"
"github.com/spf13/cobra"
Expand All @@ -38,6 +41,9 @@ import (
)

func main() {
var signalsChannel = make(chan os.Signal)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)

v := viper.New()
var command = &cobra.Command{
Use: "jaeger-agent",
Expand Down Expand Up @@ -88,7 +94,13 @@ func main() {
if err := agent.Run(); err != nil {
return errors.Wrap(err, "Failed to run the agent")
}
select {}
<-signalsChannel
logger.Info("Shutting down")
if closer, ok := cp.(io.Closer); ok {
closer.Close()
}
logger.Info("Shutdown complete")
return nil
},
}

Expand Down
30 changes: 23 additions & 7 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,13 @@ func main() {
qOpts := new(queryApp.QueryOptions).InitFromViper(v)

startAgent(aOpts, repOpts, tchannelRepOpts, grpcRepOpts, cOpts, logger, metricsFactory)
startCollector(cOpts, spanWriter, logger, metricsFactory, strategyStore, hc)
grpcServer := startCollector(cOpts, spanWriter, logger, metricsFactory, strategyStore, hc)
startQuery(qOpts, spanReader, dependencyReader, logger, metricsFactory, mBldr, hc)
hc.Ready()
<-signalsChannel
logger.Info("Shutting down")
if closer, ok := spanWriter.(io.Closer); ok {
grpcServer.GracefulStop()
err := closer.Close()
if err != nil {
logger.Error("Failed to close span writer", zap.Error(err))
Expand Down Expand Up @@ -236,7 +237,7 @@ func startCollector(
baseFactory metrics.Factory,
strategyStore strategystore.StrategyStore,
hc *healthcheck.HealthCheck,
) {
) *grpc.Server {
metricsFactory := baseFactory.Namespace("collector", nil)

spanBuilder, err := collector.NewSpanHandlerBuilder(
Expand Down Expand Up @@ -269,11 +270,9 @@ func startCollector(
ch.Serve(listener)
}

{
grpcserver.StartGRPCCollector(cOpts.CollectorGRPCPort, grpc.NewServer(), grpcHandler, strategyStore, logger,
func(err error) {
logger.Fatal("gRPC collector failed", zap.Error(err))
})
server, err := startGRPCServer(cOpts.CollectorGRPCPort, grpcHandler, strategyStore, logger)
if err != nil {
logger.Fatal("Could not start gRPC collector", zap.Error(err))
}

{
Expand All @@ -293,6 +292,23 @@ func startCollector(
hc.Set(healthcheck.Unavailable)
}()
}
return server
}

func startGRPCServer(
port int,
handler *collectorApp.GRPCHandler,
samplingStore strategystore.StrategyStore,
logger *zap.Logger,
) (*grpc.Server, error) {
server := grpc.NewServer()
_, err := grpcserver.StartGRPCCollector(port, server, handler, samplingStore, logger, func(err error) {
logger.Fatal("gRPC collector failed", zap.Error(err))
})
if err != nil {
return nil, err
}
return server, err
}

func startZipkinHTTPAPI(
Expand Down
25 changes: 20 additions & 5 deletions cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,9 @@ func main() {
ch.Serve(listener)
}

{
grpcserver.StartGRPCCollector(builderOpts.CollectorGRPCPort, grpc.NewServer(), grpcHandler, strategyStore, logger,
func(err error) {
logger.Fatal("gRPC collector failed", zap.Error(err))
})
server, err := startGRPCServer(builderOpts.CollectorGRPCPort, grpcHandler, strategyStore, logger)
if err != nil {
logger.Fatal("Could not start gRPC collector", zap.Error(err))
}

{
Expand Down Expand Up @@ -174,6 +172,7 @@ func main() {
<-signalsChannel
logger.Info("Shutting down")
if closer, ok := spanWriter.(io.Closer); ok {
server.GracefulStop()
err := closer.Close()
if err != nil {
logger.Error("Failed to close span writer", zap.Error(err))
Expand Down Expand Up @@ -207,6 +206,22 @@ func main() {
}
}

func startGRPCServer(
port int,
handler *app.GRPCHandler,
samplingStore strategystore.StrategyStore,
logger *zap.Logger,
) (*grpc.Server, error) {
server := grpc.NewServer()
_, err := grpcserver.StartGRPCCollector(port, server, handler, samplingStore, logger, func(err error) {
logger.Fatal("gRPC collector failed", zap.Error(err))
})
if err != nil {
return nil, err
}
return server, err
}

func startZipkinHTTPAPI(
logger *zap.Logger,
zipkinPort int,
Expand Down