From 0f8013e794090657424fc6621a4a26d48832b4b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juraci=20Paix=C3=A3o=20Kr=C3=B6hling?= Date: Wed, 12 Feb 2020 11:29:04 +0100 Subject: [PATCH 1/5] Add support for graceful shutdown of agent. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juraci Paixão Kröhling --- cmd/agent/app/agent.go | 19 ++++++++---- cmd/agent/app/builder.go | 1 + cmd/agent/app/builder_test.go | 3 ++ cmd/agent/app/reporter/client_metrics.go | 5 +++- cmd/agent/app/reporter/client_metrics_test.go | 3 ++ .../app/reporter/grpc/collector_proxy.go | 14 +++++++-- .../app/reporter/grpc/collector_proxy_test.go | 3 ++ cmd/agent/app/reporter/grpc/reporter.go | 5 ++++ cmd/agent/app/reporter/grpc/reporter_test.go | 3 ++ cmd/agent/app/reporter/metrics.go | 5 ++++ cmd/agent/app/reporter/metrics_test.go | 3 ++ cmd/agent/app/reporter/reporter.go | 12 ++++++++ cmd/agent/app/reporter/reporter_test.go | 7 +++++ .../app/reporter/tchannel/collector_proxy.go | 3 +- .../reporter/tchannel/collector_proxy_test.go | 3 ++ cmd/agent/app/reporter/tchannel/reporter.go | 5 ++++ .../app/reporter/tchannel/reporter_test.go | 3 ++ cmd/agent/app/testutils/in_memory_reporter.go | 5 ++++ cmd/agent/main.go | 6 ++-- cmd/all-in-one/main.go | 30 +++++++++++-------- 20 files changed, 110 insertions(+), 28 deletions(-) diff --git a/cmd/agent/app/agent.go b/cmd/agent/app/agent.go index 808e6aa0b04..0e026559597 100644 --- a/cmd/agent/app/agent.go +++ b/cmd/agent/app/agent.go @@ -16,10 +16,11 @@ package app import ( - "io" + "context" "net" "net/http" "sync/atomic" + "time" "github.com/gorilla/mux" "go.uber.org/zap" @@ -33,7 +34,6 @@ type Agent struct { httpServer *http.Server httpAddr atomic.Value // string, set once agent starts listening logger *zap.Logger - closer io.Closer } // NewAgent creates the new Agent. @@ -65,10 +65,9 @@ func (a *Agent) Run() error { return err } a.httpAddr.Store(listener.Addr().String()) - a.closer = listener go func() { a.logger.Info("Starting jaeger-agent HTTP server", zap.Int("http-port", listener.Addr().(*net.TCPAddr).Port)) - if err := a.httpServer.Serve(listener); err != nil { + if err := a.httpServer.Serve(listener); err != http.ErrServerClosed { a.logger.Error("http server failure", zap.Error(err)) } a.logger.Info("agent's http server exiting") @@ -86,8 +85,16 @@ func (a *Agent) HTTPAddr() string { // Stop forces all agent go routines to exit. func (a *Agent) Stop() { + // first, close the http server, so that we don't have any more inflight requests + a.logger.Info("shutting down agent's HTTP server", zap.String("addr", a.HTTPAddr())) + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := a.httpServer.Shutdown(timeout); err != nil { + a.logger.Error("failed to close HTTP server", zap.Error(err)) + } + cancel() + + // then, close all processors that are called for the incoming http requests for _, processor := range a.processors { - go processor.Stop() + processor.Stop() } - a.closer.Close() } diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index 8338e9c959f..4adf791fb66 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -68,6 +68,7 @@ var ( type CollectorProxy interface { GetReporter() reporter.Reporter GetManager() configmanager.ClientConfigManager + Close() error } // Builder Struct to hold configurations diff --git a/cmd/agent/app/builder_test.go b/cmd/agent/app/builder_test.go index 9a6b0e63ec6..892624a5eed 100644 --- a/cmd/agent/app/builder_test.go +++ b/cmd/agent/app/builder_test.go @@ -180,6 +180,9 @@ func (fakeCollectorProxy) EmitZipkinBatch(spans []*zipkincore.Span) (err error) func (fakeCollectorProxy) EmitBatch(batch *jaeger.Batch) (err error) { return nil } +func (fakeCollectorProxy) Close() (err error) { + return nil +} func (f fakeCollectorProxy) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) { return nil, errors.New("no peers available") diff --git a/cmd/agent/app/reporter/client_metrics.go b/cmd/agent/app/reporter/client_metrics.go index 97b9d0fa611..1837d4c7db4 100644 --- a/cmd/agent/app/reporter/client_metrics.go +++ b/cmd/agent/app/reporter/client_metrics.go @@ -121,10 +121,13 @@ func (r *ClientMetricsReporter) EmitBatch(batch *jaeger.Batch) error { } // Close stops background gc goroutine for client stats map. -func (r *ClientMetricsReporter) Close() { +func (r *ClientMetricsReporter) Close() error { if r.closed.CAS(false, true) { close(r.shutdown) + return r.params.Reporter.Close() } + + return nil } func (r *ClientMetricsReporter) expireClientMetricsLoop() { diff --git a/cmd/agent/app/reporter/client_metrics_test.go b/cmd/agent/app/reporter/client_metrics_test.go index 8d1651b0207..d19140424ae 100644 --- a/cmd/agent/app/reporter/client_metrics_test.go +++ b/cmd/agent/app/reporter/client_metrics_test.go @@ -16,6 +16,7 @@ package reporter import ( "fmt" + "io" "testing" "time" @@ -29,6 +30,8 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) +var _ io.Closer = (*ClientMetricsReporter)(nil) + type clientMetricsTest struct { mr *testutils.InMemoryReporter r *ClientMetricsReporter diff --git a/cmd/agent/app/reporter/grpc/collector_proxy.go b/cmd/agent/app/reporter/grpc/collector_proxy.go index faad4b87c4b..d8c4e333fc7 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy.go @@ -22,6 +22,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager" grpcManager "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" + "github.com/jaegertracing/jaeger/pkg/multierror" ) // ProxyBuilder holds objects communicating with collector @@ -69,6 +70,15 @@ func (b ProxyBuilder) GetManager() configmanager.ClientConfigManager { // Close closes connections used by proxy. func (b ProxyBuilder) Close() error { - b.reporter.Close() - return b.conn.Close() + var errs []error + + if err := b.conn.Close(); err != nil { + errs = append(errs, err) + } + + if err := b.reporter.Close(); err != nil { + errs = append(errs, err) + } + + return multierror.Wrap(errs) } diff --git a/cmd/agent/app/reporter/grpc/collector_proxy_test.go b/cmd/agent/app/reporter/grpc/collector_proxy_test.go index f80febbd593..78403512112 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy_test.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy_test.go @@ -15,6 +15,7 @@ package grpc import ( + "io" "net" "testing" "time" @@ -29,6 +30,8 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/jaeger" ) +var _ io.Closer = (*ProxyBuilder)(nil) + func TestMultipleCollectors(t *testing.T) { spanHandler1 := &mockSpanHandler{} s1, addr1 := initializeGRPCTestServer(t, func(s *grpc.Server) { diff --git a/cmd/agent/app/reporter/grpc/reporter.go b/cmd/agent/app/reporter/grpc/reporter.go index 7c3432ed1a1..d1be4391c9f 100644 --- a/cmd/agent/app/reporter/grpc/reporter.go +++ b/cmd/agent/app/reporter/grpc/reporter.go @@ -65,6 +65,11 @@ func (r *Reporter) EmitZipkinBatch(zSpans []*zipkincore.Span) error { return r.send(trace.Spans, nil) } +// Close the reporter +func (r *Reporter) Close() error { + return nil +} + func (r *Reporter) send(spans []*model.Span, process *model.Process) error { spans, process = addProcessTags(spans, process, r.agentTags) batch := model.Batch{Spans: spans, Process: process} diff --git a/cmd/agent/app/reporter/grpc/reporter_test.go b/cmd/agent/app/reporter/grpc/reporter_test.go index 2859ce00a75..c75a59d5b61 100644 --- a/cmd/agent/app/reporter/grpc/reporter_test.go +++ b/cmd/agent/app/reporter/grpc/reporter_test.go @@ -16,6 +16,7 @@ package grpc import ( "context" + "io" "sync" "testing" "time" @@ -31,6 +32,8 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) +var _ io.Closer = (*Reporter)(nil) + type mockSpanHandler struct { mux sync.Mutex requests []*api_v2.PostSpansRequest diff --git a/cmd/agent/app/reporter/metrics.go b/cmd/agent/app/reporter/metrics.go index 2a52fd8bb87..fb7046a1aa6 100644 --- a/cmd/agent/app/reporter/metrics.go +++ b/cmd/agent/app/reporter/metrics.go @@ -85,6 +85,11 @@ func (r *MetricsReporter) EmitBatch(batch *jaeger.Batch) error { return err } +// Close this reporter +func (r *MetricsReporter) Close() error { + return r.wrapped.Close() +} + func updateMetrics(m batchMetrics, size int64, err error) { if err != nil { m.BatchesFailures.Inc(1) diff --git a/cmd/agent/app/reporter/metrics_test.go b/cmd/agent/app/reporter/metrics_test.go index de0331b35e9..119f4a36be5 100644 --- a/cmd/agent/app/reporter/metrics_test.go +++ b/cmd/agent/app/reporter/metrics_test.go @@ -16,6 +16,7 @@ package reporter import ( "errors" + "io" "testing" "time" @@ -26,6 +27,8 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) +var _ io.Closer = (*MetricsReporter)(nil) + func TestMetricsReporter(t *testing.T) { tests := []struct { expectedCounters []metricstest.ExpectedMetric diff --git a/cmd/agent/app/reporter/reporter.go b/cmd/agent/app/reporter/reporter.go index c6e56707816..868ac8fcbac 100644 --- a/cmd/agent/app/reporter/reporter.go +++ b/cmd/agent/app/reporter/reporter.go @@ -26,6 +26,7 @@ import ( type Reporter interface { EmitZipkinBatch(spans []*zipkincore.Span) (err error) EmitBatch(batch *jaeger.Batch) (err error) + Close() error } // MultiReporter provides serial span emission to one or more reporters. If @@ -60,3 +61,14 @@ func (mr MultiReporter) EmitBatch(batch *jaeger.Batch) error { } return multierror.Wrap(errors) } + +// Close all the underlying reporters +func (mr MultiReporter) Close() error { + var errors []error + for _, rep := range mr { + if err := rep.Close(); err != nil { + errors = append(errors, err) + } + } + return multierror.Wrap(errors) +} diff --git a/cmd/agent/app/reporter/reporter_test.go b/cmd/agent/app/reporter/reporter_test.go index 37ba306e23c..32b5b88761f 100644 --- a/cmd/agent/app/reporter/reporter_test.go +++ b/cmd/agent/app/reporter/reporter_test.go @@ -18,6 +18,7 @@ package reporter import ( "errors" "fmt" + "io" "testing" "github.com/stretchr/testify/assert" @@ -27,6 +28,8 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) +var _ io.Closer = (Reporter)(nil) + func TestMultiReporter(t *testing.T) { r1, r2 := testutils.NewInMemoryReporter(), testutils.NewInMemoryReporter() r := NewMultiReporter(r1, r2) @@ -74,3 +77,7 @@ func (r mockReporter) EmitZipkinBatch(spans []*zipkincore.Span) error { func (r mockReporter) EmitBatch(batch *jaeger.Batch) error { return r.err } + +func (r mockReporter) Close() error { + return nil +} diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy.go b/cmd/agent/app/reporter/tchannel/collector_proxy.go index b8e3e082cc6..c47e7fe4f57 100644 --- a/cmd/agent/app/reporter/tchannel/collector_proxy.go +++ b/cmd/agent/app/reporter/tchannel/collector_proxy.go @@ -63,6 +63,5 @@ func (b ProxyBuilder) GetManager() configmanager.ClientConfigManager { // Close closes connections used by proxy. func (b ProxyBuilder) Close() error { b.tchanRep.Channel().Close() - b.reporter.Close() - return nil + return b.reporter.Close() } diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy_test.go b/cmd/agent/app/reporter/tchannel/collector_proxy_test.go index a9bdd762bef..b7cafd4ac5b 100644 --- a/cmd/agent/app/reporter/tchannel/collector_proxy_test.go +++ b/cmd/agent/app/reporter/tchannel/collector_proxy_test.go @@ -15,6 +15,7 @@ package tchannel import ( + "io" "testing" "github.com/stretchr/testify/assert" @@ -27,6 +28,8 @@ import ( "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" ) +var _ io.Closer = (*ProxyBuilder)(nil) + func TestErrorReporterBuilder(t *testing.T) { tbuilder := NewBuilder().WithDiscoverer(fakeDiscoverer{}) b, err := NewCollectorProxy(tbuilder, metrics.NullFactory, zap.NewNop()) diff --git a/cmd/agent/app/reporter/tchannel/reporter.go b/cmd/agent/app/reporter/tchannel/reporter.go index f5c37c683c0..4fbeca5f1f5 100644 --- a/cmd/agent/app/reporter/tchannel/reporter.go +++ b/cmd/agent/app/reporter/tchannel/reporter.go @@ -65,6 +65,11 @@ func (r *Reporter) Channel() *tchannel.Channel { return r.channel } +// Close the underlying channel +func (r *Reporter) Close() error { + return nil +} + // EmitZipkinBatch implements EmitZipkinBatch() of Reporter func (r *Reporter) EmitZipkinBatch(spans []*zipkincore.Span) error { submissionFunc := func(ctx thrift.Context) error { diff --git a/cmd/agent/app/reporter/tchannel/reporter_test.go b/cmd/agent/app/reporter/tchannel/reporter_test.go index 1f304470d79..df5d5b0a94f 100644 --- a/cmd/agent/app/reporter/tchannel/reporter_test.go +++ b/cmd/agent/app/reporter/tchannel/reporter_test.go @@ -16,6 +16,7 @@ package tchannel import ( + "io" "testing" "time" @@ -29,6 +30,8 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) +var _ io.Closer = (*Reporter)(nil) + func initRequirements(t *testing.T) (*metricstest.Factory, *testutils.MockTCollector, *Reporter) { metricsFactory, collector := testutils.InitMockCollector(t) reporter := New("jaeger-collector", collector.Channel, time.Second, nil, zap.NewNop()) diff --git a/cmd/agent/app/testutils/in_memory_reporter.go b/cmd/agent/app/testutils/in_memory_reporter.go index f4ed343e8b1..236722e8fee 100644 --- a/cmd/agent/app/testutils/in_memory_reporter.go +++ b/cmd/agent/app/testutils/in_memory_reporter.go @@ -53,6 +53,11 @@ func (i *InMemoryReporter) EmitBatch(batch *jaeger.Batch) (err error) { return nil } +// Close the reporter +func (i *InMemoryReporter) Close() error { + return nil +} + // ZipkinSpans returns accumulated Zipkin spans as a copied slice func (i *InMemoryReporter) ZipkinSpans() []*zipkincore.Span { i.mutex.Lock() diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 77dc9642106..1d0c2474519 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -17,7 +17,6 @@ package main import ( "fmt" - "io" "os" "github.com/spf13/cobra" @@ -76,9 +75,8 @@ func main() { return fmt.Errorf("failed to run the agent: %w", err) } svc.RunAndThen(func() { - if closer, ok := cp.(io.Closer); ok { - closer.Close() - } + agent.Stop() + cp.Close() }) return nil }, diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 8e2032265df..3c62cfadbd5 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -120,6 +120,7 @@ by default uses only in-memory database.`, cOpts := new(collectorApp.CollectorOptions).InitFromViper(v) qOpts := new(queryApp.QueryOptions).InitFromViper(v, logger) + // collector c := collectorApp.New(&collectorApp.CollectorParams{ ServiceName: "jaeger-collector", Logger: logger, @@ -130,7 +131,16 @@ by default uses only in-memory database.`, }) c.Start(cOpts) - startAgent(aOpts, repOpts, tchanBuilder, grpcBuilder, cOpts.CollectorGRPCPort, logger, metricsFactory) + // agent + grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorGRPCPort)) + agentMetricsFactory := metricsFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil}) + cp, err := agentApp.CreateCollectorProxy(repOpts, tchanBuilder, grpcBuilder, logger, agentMetricsFactory) + if err != nil { + logger.Fatal("Could not create collector proxy", zap.Error(err)) + } + agent := startAgent(cp, aOpts, logger, metricsFactory) + + // query querySrv := startQuery( svc, qOpts, archiveOptions(storageFactory, logger), spanReader, dependencyReader, @@ -138,6 +148,8 @@ by default uses only in-memory database.`, ) svc.RunAndThen(func() { + agent.Stop() + cp.Close() c.Close() querySrv.Close() if closer, ok := spanWriter.(io.Closer); ok { @@ -177,21 +189,11 @@ by default uses only in-memory database.`, } func startAgent( + cp agentApp.CollectorProxy, b *agentApp.Builder, - repOpts *agentRep.Options, - tchanBuilder *agentTchanRep.Builder, - grpcBuilder *agentGrpcRep.ConnBuilder, - collectorGRPCPort int, logger *zap.Logger, baseFactory metrics.Factory, -) { - metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil}) - - grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", collectorGRPCPort)) - cp, err := agentApp.CreateCollectorProxy(repOpts, tchanBuilder, grpcBuilder, logger, metricsFactory) - if err != nil { - logger.Fatal("Could not create collector proxy", zap.Error(err)) - } +) *agentApp.Agent { agent, err := b.CreateAgent(cp, logger, baseFactory) if err != nil { @@ -202,6 +204,8 @@ func startAgent( if err := agent.Run(); err != nil { logger.Fatal("Failed to run the agent", zap.Error(err)) } + + return agent } func startQuery( From 761386f7b27d87b84bf6a2ac4220ce1faf9fa6ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juraci=20Paix=C3=A3o=20Kr=C3=B6hling?= Date: Wed, 19 Feb 2020 09:56:37 +0100 Subject: [PATCH 2/5] Removed #Close from noop places MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juraci Paixão Kröhling --- cmd/agent/app/builder.go | 1 - cmd/agent/app/builder_test.go | 3 --- cmd/agent/app/reporter/client_metrics.go | 5 +---- cmd/agent/app/reporter/client_metrics_test.go | 3 --- cmd/agent/app/reporter/grpc/collector_proxy.go | 4 ---- cmd/agent/app/reporter/grpc/reporter.go | 5 ----- cmd/agent/app/reporter/grpc/reporter_test.go | 3 --- cmd/agent/app/reporter/metrics.go | 5 ----- cmd/agent/app/reporter/metrics_test.go | 3 --- cmd/agent/app/reporter/reporter.go | 12 ------------ cmd/agent/app/reporter/reporter_test.go | 7 ------- cmd/agent/app/reporter/tchannel/collector_proxy.go | 2 +- cmd/agent/app/reporter/tchannel/reporter.go | 5 ----- cmd/agent/app/reporter/tchannel/reporter_test.go | 3 --- cmd/agent/app/testutils/in_memory_reporter.go | 5 ----- cmd/agent/main.go | 1 - cmd/all-in-one/main.go | 1 - 17 files changed, 2 insertions(+), 66 deletions(-) diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index 4adf791fb66..8338e9c959f 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -68,7 +68,6 @@ var ( type CollectorProxy interface { GetReporter() reporter.Reporter GetManager() configmanager.ClientConfigManager - Close() error } // Builder Struct to hold configurations diff --git a/cmd/agent/app/builder_test.go b/cmd/agent/app/builder_test.go index 892624a5eed..9a6b0e63ec6 100644 --- a/cmd/agent/app/builder_test.go +++ b/cmd/agent/app/builder_test.go @@ -180,9 +180,6 @@ func (fakeCollectorProxy) EmitZipkinBatch(spans []*zipkincore.Span) (err error) func (fakeCollectorProxy) EmitBatch(batch *jaeger.Batch) (err error) { return nil } -func (fakeCollectorProxy) Close() (err error) { - return nil -} func (f fakeCollectorProxy) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) { return nil, errors.New("no peers available") diff --git a/cmd/agent/app/reporter/client_metrics.go b/cmd/agent/app/reporter/client_metrics.go index 1837d4c7db4..97b9d0fa611 100644 --- a/cmd/agent/app/reporter/client_metrics.go +++ b/cmd/agent/app/reporter/client_metrics.go @@ -121,13 +121,10 @@ func (r *ClientMetricsReporter) EmitBatch(batch *jaeger.Batch) error { } // Close stops background gc goroutine for client stats map. -func (r *ClientMetricsReporter) Close() error { +func (r *ClientMetricsReporter) Close() { if r.closed.CAS(false, true) { close(r.shutdown) - return r.params.Reporter.Close() } - - return nil } func (r *ClientMetricsReporter) expireClientMetricsLoop() { diff --git a/cmd/agent/app/reporter/client_metrics_test.go b/cmd/agent/app/reporter/client_metrics_test.go index d19140424ae..8d1651b0207 100644 --- a/cmd/agent/app/reporter/client_metrics_test.go +++ b/cmd/agent/app/reporter/client_metrics_test.go @@ -16,7 +16,6 @@ package reporter import ( "fmt" - "io" "testing" "time" @@ -30,8 +29,6 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) -var _ io.Closer = (*ClientMetricsReporter)(nil) - type clientMetricsTest struct { mr *testutils.InMemoryReporter r *ClientMetricsReporter diff --git a/cmd/agent/app/reporter/grpc/collector_proxy.go b/cmd/agent/app/reporter/grpc/collector_proxy.go index d8c4e333fc7..603c3e549a7 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy.go @@ -76,9 +76,5 @@ func (b ProxyBuilder) Close() error { errs = append(errs, err) } - if err := b.reporter.Close(); err != nil { - errs = append(errs, err) - } - return multierror.Wrap(errs) } diff --git a/cmd/agent/app/reporter/grpc/reporter.go b/cmd/agent/app/reporter/grpc/reporter.go index d1be4391c9f..7c3432ed1a1 100644 --- a/cmd/agent/app/reporter/grpc/reporter.go +++ b/cmd/agent/app/reporter/grpc/reporter.go @@ -65,11 +65,6 @@ func (r *Reporter) EmitZipkinBatch(zSpans []*zipkincore.Span) error { return r.send(trace.Spans, nil) } -// Close the reporter -func (r *Reporter) Close() error { - return nil -} - func (r *Reporter) send(spans []*model.Span, process *model.Process) error { spans, process = addProcessTags(spans, process, r.agentTags) batch := model.Batch{Spans: spans, Process: process} diff --git a/cmd/agent/app/reporter/grpc/reporter_test.go b/cmd/agent/app/reporter/grpc/reporter_test.go index c75a59d5b61..2859ce00a75 100644 --- a/cmd/agent/app/reporter/grpc/reporter_test.go +++ b/cmd/agent/app/reporter/grpc/reporter_test.go @@ -16,7 +16,6 @@ package grpc import ( "context" - "io" "sync" "testing" "time" @@ -32,8 +31,6 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) -var _ io.Closer = (*Reporter)(nil) - type mockSpanHandler struct { mux sync.Mutex requests []*api_v2.PostSpansRequest diff --git a/cmd/agent/app/reporter/metrics.go b/cmd/agent/app/reporter/metrics.go index fb7046a1aa6..2a52fd8bb87 100644 --- a/cmd/agent/app/reporter/metrics.go +++ b/cmd/agent/app/reporter/metrics.go @@ -85,11 +85,6 @@ func (r *MetricsReporter) EmitBatch(batch *jaeger.Batch) error { return err } -// Close this reporter -func (r *MetricsReporter) Close() error { - return r.wrapped.Close() -} - func updateMetrics(m batchMetrics, size int64, err error) { if err != nil { m.BatchesFailures.Inc(1) diff --git a/cmd/agent/app/reporter/metrics_test.go b/cmd/agent/app/reporter/metrics_test.go index 119f4a36be5..de0331b35e9 100644 --- a/cmd/agent/app/reporter/metrics_test.go +++ b/cmd/agent/app/reporter/metrics_test.go @@ -16,7 +16,6 @@ package reporter import ( "errors" - "io" "testing" "time" @@ -27,8 +26,6 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) -var _ io.Closer = (*MetricsReporter)(nil) - func TestMetricsReporter(t *testing.T) { tests := []struct { expectedCounters []metricstest.ExpectedMetric diff --git a/cmd/agent/app/reporter/reporter.go b/cmd/agent/app/reporter/reporter.go index 868ac8fcbac..c6e56707816 100644 --- a/cmd/agent/app/reporter/reporter.go +++ b/cmd/agent/app/reporter/reporter.go @@ -26,7 +26,6 @@ import ( type Reporter interface { EmitZipkinBatch(spans []*zipkincore.Span) (err error) EmitBatch(batch *jaeger.Batch) (err error) - Close() error } // MultiReporter provides serial span emission to one or more reporters. If @@ -61,14 +60,3 @@ func (mr MultiReporter) EmitBatch(batch *jaeger.Batch) error { } return multierror.Wrap(errors) } - -// Close all the underlying reporters -func (mr MultiReporter) Close() error { - var errors []error - for _, rep := range mr { - if err := rep.Close(); err != nil { - errors = append(errors, err) - } - } - return multierror.Wrap(errors) -} diff --git a/cmd/agent/app/reporter/reporter_test.go b/cmd/agent/app/reporter/reporter_test.go index 32b5b88761f..37ba306e23c 100644 --- a/cmd/agent/app/reporter/reporter_test.go +++ b/cmd/agent/app/reporter/reporter_test.go @@ -18,7 +18,6 @@ package reporter import ( "errors" "fmt" - "io" "testing" "github.com/stretchr/testify/assert" @@ -28,8 +27,6 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) -var _ io.Closer = (Reporter)(nil) - func TestMultiReporter(t *testing.T) { r1, r2 := testutils.NewInMemoryReporter(), testutils.NewInMemoryReporter() r := NewMultiReporter(r1, r2) @@ -77,7 +74,3 @@ func (r mockReporter) EmitZipkinBatch(spans []*zipkincore.Span) error { func (r mockReporter) EmitBatch(batch *jaeger.Batch) error { return r.err } - -func (r mockReporter) Close() error { - return nil -} diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy.go b/cmd/agent/app/reporter/tchannel/collector_proxy.go index c47e7fe4f57..83e09cc629e 100644 --- a/cmd/agent/app/reporter/tchannel/collector_proxy.go +++ b/cmd/agent/app/reporter/tchannel/collector_proxy.go @@ -63,5 +63,5 @@ func (b ProxyBuilder) GetManager() configmanager.ClientConfigManager { // Close closes connections used by proxy. func (b ProxyBuilder) Close() error { b.tchanRep.Channel().Close() - return b.reporter.Close() + return nil } diff --git a/cmd/agent/app/reporter/tchannel/reporter.go b/cmd/agent/app/reporter/tchannel/reporter.go index 4fbeca5f1f5..f5c37c683c0 100644 --- a/cmd/agent/app/reporter/tchannel/reporter.go +++ b/cmd/agent/app/reporter/tchannel/reporter.go @@ -65,11 +65,6 @@ func (r *Reporter) Channel() *tchannel.Channel { return r.channel } -// Close the underlying channel -func (r *Reporter) Close() error { - return nil -} - // EmitZipkinBatch implements EmitZipkinBatch() of Reporter func (r *Reporter) EmitZipkinBatch(spans []*zipkincore.Span) error { submissionFunc := func(ctx thrift.Context) error { diff --git a/cmd/agent/app/reporter/tchannel/reporter_test.go b/cmd/agent/app/reporter/tchannel/reporter_test.go index df5d5b0a94f..1f304470d79 100644 --- a/cmd/agent/app/reporter/tchannel/reporter_test.go +++ b/cmd/agent/app/reporter/tchannel/reporter_test.go @@ -16,7 +16,6 @@ package tchannel import ( - "io" "testing" "time" @@ -30,8 +29,6 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) -var _ io.Closer = (*Reporter)(nil) - func initRequirements(t *testing.T) (*metricstest.Factory, *testutils.MockTCollector, *Reporter) { metricsFactory, collector := testutils.InitMockCollector(t) reporter := New("jaeger-collector", collector.Channel, time.Second, nil, zap.NewNop()) diff --git a/cmd/agent/app/testutils/in_memory_reporter.go b/cmd/agent/app/testutils/in_memory_reporter.go index 236722e8fee..f4ed343e8b1 100644 --- a/cmd/agent/app/testutils/in_memory_reporter.go +++ b/cmd/agent/app/testutils/in_memory_reporter.go @@ -53,11 +53,6 @@ func (i *InMemoryReporter) EmitBatch(batch *jaeger.Batch) (err error) { return nil } -// Close the reporter -func (i *InMemoryReporter) Close() error { - return nil -} - // ZipkinSpans returns accumulated Zipkin spans as a copied slice func (i *InMemoryReporter) ZipkinSpans() []*zipkincore.Span { i.mutex.Lock() diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 1d0c2474519..d07462bf565 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -76,7 +76,6 @@ func main() { } svc.RunAndThen(func() { agent.Stop() - cp.Close() }) return nil }, diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 3c62cfadbd5..0a2f78e06d6 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -149,7 +149,6 @@ by default uses only in-memory database.`, svc.RunAndThen(func() { agent.Stop() - cp.Close() c.Close() querySrv.Close() if closer, ok := spanWriter.(io.Closer); ok { From 2d78dacce13d7a1c5d1966bf702abbee9aed00cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juraci=20Paix=C3=A3o=20Kr=C3=B6hling?= Date: Wed, 19 Feb 2020 10:22:12 +0100 Subject: [PATCH 3/5] Added #Close to CollectorProxy back MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juraci Paixão Kröhling --- cmd/agent/app/builder.go | 2 ++ cmd/agent/app/builder_test.go | 3 +++ cmd/agent/main.go | 1 + cmd/all-in-one/main.go | 1 + 4 files changed, 7 insertions(+) diff --git a/cmd/agent/app/builder.go b/cmd/agent/app/builder.go index 8338e9c959f..285d9364f8e 100644 --- a/cmd/agent/app/builder.go +++ b/cmd/agent/app/builder.go @@ -17,6 +17,7 @@ package app import ( "fmt" + "io" "net/http" "strconv" @@ -68,6 +69,7 @@ var ( type CollectorProxy interface { GetReporter() reporter.Reporter GetManager() configmanager.ClientConfigManager + io.Closer } // Builder Struct to hold configurations diff --git a/cmd/agent/app/builder_test.go b/cmd/agent/app/builder_test.go index 9a6b0e63ec6..0bea462db05 100644 --- a/cmd/agent/app/builder_test.go +++ b/cmd/agent/app/builder_test.go @@ -180,6 +180,9 @@ func (fakeCollectorProxy) EmitZipkinBatch(spans []*zipkincore.Span) (err error) func (fakeCollectorProxy) EmitBatch(batch *jaeger.Batch) (err error) { return nil } +func (fakeCollectorProxy) Close() error { + return nil +} func (f fakeCollectorProxy) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) { return nil, errors.New("no peers available") diff --git a/cmd/agent/main.go b/cmd/agent/main.go index d07462bf565..1d0c2474519 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -76,6 +76,7 @@ func main() { } svc.RunAndThen(func() { agent.Stop() + cp.Close() }) return nil }, diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 0a2f78e06d6..3c62cfadbd5 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -149,6 +149,7 @@ by default uses only in-memory database.`, svc.RunAndThen(func() { agent.Stop() + cp.Close() c.Close() querySrv.Close() if closer, ok := spanWriter.(io.Closer); ok { From d968cd4b50810d171ad042916887ab98c4b58bf1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juraci=20Paix=C3=A3o=20Kr=C3=B6hling?= Date: Wed, 19 Feb 2020 17:10:23 +0100 Subject: [PATCH 4/5] Removed multierr from collector proxy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juraci Paixão Kröhling --- cmd/agent/app/reporter/grpc/collector_proxy.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/cmd/agent/app/reporter/grpc/collector_proxy.go b/cmd/agent/app/reporter/grpc/collector_proxy.go index 603c3e549a7..5367949c97d 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy.go @@ -22,7 +22,6 @@ import ( "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager" grpcManager "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc" "github.com/jaegertracing/jaeger/cmd/agent/app/reporter" - "github.com/jaegertracing/jaeger/pkg/multierror" ) // ProxyBuilder holds objects communicating with collector @@ -70,11 +69,5 @@ func (b ProxyBuilder) GetManager() configmanager.ClientConfigManager { // Close closes connections used by proxy. func (b ProxyBuilder) Close() error { - var errs []error - - if err := b.conn.Close(); err != nil { - errs = append(errs, err) - } - - return multierror.Wrap(errs) + return b.conn.Close() } From cb780f26b988317a6e0499d4a72a4e8e200e4566 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juraci=20Paix=C3=A3o=20Kr=C3=B6hling?= Date: Thu, 20 Feb 2020 09:47:03 +0100 Subject: [PATCH 5/5] Restored collector's proxy closing of its reporter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juraci Paixão Kröhling --- cmd/agent/app/reporter/grpc/collector_proxy.go | 1 + cmd/agent/app/reporter/tchannel/collector_proxy.go | 1 + 2 files changed, 2 insertions(+) diff --git a/cmd/agent/app/reporter/grpc/collector_proxy.go b/cmd/agent/app/reporter/grpc/collector_proxy.go index 5367949c97d..faad4b87c4b 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy.go @@ -69,5 +69,6 @@ func (b ProxyBuilder) GetManager() configmanager.ClientConfigManager { // Close closes connections used by proxy. func (b ProxyBuilder) Close() error { + b.reporter.Close() return b.conn.Close() } diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy.go b/cmd/agent/app/reporter/tchannel/collector_proxy.go index 83e09cc629e..996686880f1 100644 --- a/cmd/agent/app/reporter/tchannel/collector_proxy.go +++ b/cmd/agent/app/reporter/tchannel/collector_proxy.go @@ -62,6 +62,7 @@ func (b ProxyBuilder) GetManager() configmanager.ClientConfigManager { // Close closes connections used by proxy. func (b ProxyBuilder) Close() error { + b.reporter.Close() b.tchanRep.Channel().Close() return nil }