diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index 808e6aa0b04..0e026559597 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -16,10 +16,11 @@ package app import ( - "io" + "context" "net" "net/http" "sync/atomic" + "time" "github.com/gorilla/mux" "go.uber.org/zap" @@ -33,7 +34,6 @@ type Agent struct { httpServer *http.Server httpAddr atomic.Value // string, set once agent starts listening logger *zap.Logger - closer io.Closer } // NewAgent creates the new Agent. @@ -65,10 +65,9 @@ func (a *Agent) Run() error { return err } a.httpAddr.Store(listener.Addr().String()) - a.closer = listener go func() { a.logger.Info("Starting jaeger-agent HTTP server", zap.Int("http-port", listener.Addr().(*net.TCPAddr).Port)) - if err := a.httpServer.Serve(listener); err != nil { + if err := a.httpServer.Serve(listener); err != http.ErrServerClosed { a.logger.Error("http server failure", zap.Error(err)) } a.logger.Info("agent's http server exiting") @@ -86,8 +85,16 @@ func (a *Agent) HTTPAddr() string { // Stop forces all agent go routines to exit. func (a *Agent) Stop() { + // first, close the http server, so that we don't have any more inflight requests + a.logger.Info("shutting down agent's HTTP server", zap.String("addr", a.HTTPAddr())) + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := a.httpServer.Shutdown(timeout); err != nil { + a.logger.Error("failed to close HTTP server", zap.Error(err)) + } + cancel() + + // then, close all processors that are called for the incoming http requests for _, processor := range a.processors { - go processor.Stop() + processor.Stop() } - a.closer.Close() } diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index 8338e9c959f..285d9364f8e 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -17,6 +17,7 @@ package app import ( "fmt" + "io" "net/http" "strconv" @@ -68,6 +69,7 @@ var ( type CollectorProxy interface { GetReporter() reporter.Reporter GetManager() configmanager.ClientConfigManager + io.Closer } // Builder Struct to hold configurations diff --git a/cmd/agent/app/builder_test.go b/cmd/agent/app/builder_test.go index 9a6b0e63ec6..0bea462db05 100644 --- a/cmd/agent/app/builder_test.go +++ b/cmd/agent/app/builder_test.go @@ -180,6 +180,9 @@ func (fakeCollectorProxy) EmitZipkinBatch(spans []*zipkincore.Span) (err error) func (fakeCollectorProxy) EmitBatch(batch *jaeger.Batch) (err error) { return nil } +func (fakeCollectorProxy) Close() error { + return nil +} func (f fakeCollectorProxy) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) { return nil, errors.New("no peers available") diff --git a/cmd/agent/app/reporter/grpc/collector_proxy_test.go b/cmd/agent/app/reporter/grpc/collector_proxy_test.go index f80febbd593..78403512112 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy_test.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy_test.go @@ -15,6 +15,7 @@ package grpc import ( + "io" "net" "testing" "time" @@ -29,6 +30,8 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/jaeger" ) +var _ io.Closer = (*ProxyBuilder)(nil) + func TestMultipleCollectors(t *testing.T) { spanHandler1 := &mockSpanHandler{} s1, addr1 := initializeGRPCTestServer(t, func(s *grpc.Server) { diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy.go b/cmd/agent/app/reporter/tchannel/collector_proxy.go index b8e3e082cc6..996686880f1 100644 --- a/cmd/agent/app/reporter/tchannel/collector_proxy.go +++ b/cmd/agent/app/reporter/tchannel/collector_proxy.go @@ -62,7 +62,7 @@ func (b ProxyBuilder) GetManager() configmanager.ClientConfigManager { // Close closes connections used by proxy. func (b ProxyBuilder) Close() error { - b.tchanRep.Channel().Close() b.reporter.Close() + b.tchanRep.Channel().Close() return nil } diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy_test.go b/cmd/agent/app/reporter/tchannel/collector_proxy_test.go index a9bdd762bef..b7cafd4ac5b 100644 --- a/cmd/agent/app/reporter/tchannel/collector_proxy_test.go +++ b/cmd/agent/app/reporter/tchannel/collector_proxy_test.go @@ -15,6 +15,7 @@ package tchannel import ( + "io" "testing" "github.com/stretchr/testify/assert" @@ -27,6 +28,8 @@ import ( "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" ) +var _ io.Closer = (*ProxyBuilder)(nil) + func TestErrorReporterBuilder(t *testing.T) { tbuilder := NewBuilder().WithDiscoverer(fakeDiscoverer{}) b, err := NewCollectorProxy(tbuilder, metrics.NullFactory, zap.NewNop()) diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 77dc9642106..1d0c2474519 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -17,7 +17,6 @@ package main import ( "fmt" - "io" "os" "github.com/spf13/cobra" @@ -76,9 +75,8 @@ func main() { return fmt.Errorf("failed to run the agent: %w", err) } svc.RunAndThen(func() { - if closer, ok := cp.(io.Closer); ok { - closer.Close() - } + agent.Stop() + cp.Close() }) return nil }, diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 8e2032265df..3c62cfadbd5 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -120,6 +120,7 @@ by default uses only in-memory database.`, cOpts := new(collectorApp.CollectorOptions).InitFromViper(v) qOpts := new(queryApp.QueryOptions).InitFromViper(v, logger) + // collector c := collectorApp.New(&collectorApp.CollectorParams{ ServiceName: "jaeger-collector", Logger: logger, @@ -130,7 +131,16 @@ by default uses only in-memory database.`, }) c.Start(cOpts) - startAgent(aOpts, repOpts, tchanBuilder, grpcBuilder, cOpts.CollectorGRPCPort, logger, metricsFactory) + // agent + grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorGRPCPort)) + agentMetricsFactory := metricsFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil}) + cp, err := agentApp.CreateCollectorProxy(repOpts, tchanBuilder, grpcBuilder, logger, agentMetricsFactory) + if err != nil { + logger.Fatal("Could not create collector proxy", zap.Error(err)) + } + agent := startAgent(cp, aOpts, logger, metricsFactory) + + // query querySrv := startQuery( svc, qOpts, archiveOptions(storageFactory, logger), spanReader, dependencyReader, @@ -138,6 +148,8 @@ by default uses only in-memory database.`, ) svc.RunAndThen(func() { + agent.Stop() + cp.Close() c.Close() querySrv.Close() if closer, ok := spanWriter.(io.Closer); ok { @@ -177,21 +189,11 @@ by default uses only in-memory database.`, } func startAgent( + cp agentApp.CollectorProxy, b *agentApp.Builder, - repOpts *agentRep.Options, - tchanBuilder *agentTchanRep.Builder, - grpcBuilder *agentGrpcRep.ConnBuilder, - collectorGRPCPort int, logger *zap.Logger, baseFactory metrics.Factory, -) { - metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil}) - - grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", collectorGRPCPort)) - cp, err := agentApp.CreateCollectorProxy(repOpts, tchanBuilder, grpcBuilder, logger, metricsFactory) - if err != nil { - logger.Fatal("Could not create collector proxy", zap.Error(err)) - } +) *agentApp.Agent { agent, err := b.CreateAgent(cp, logger, baseFactory) if err != nil { @@ -202,6 +204,8 @@ func startAgent( if err := agent.Run(); err != nil { logger.Fatal("Failed to run the agent", zap.Error(err)) } + + return agent } func startQuery(