Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved graceful shutdown - Agent #2031

Merged
merged 6 commits into from
Feb 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions cmd/agent/app/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
package app

import (
"io"
"context"
"net"
"net/http"
"sync/atomic"
"time"

"github.com/gorilla/mux"
"go.uber.org/zap"
Expand All @@ -33,7 +34,6 @@ type Agent struct {
httpServer *http.Server
httpAddr atomic.Value // string, set once agent starts listening
logger *zap.Logger
closer io.Closer
}

// NewAgent creates the new Agent.
Expand Down Expand Up @@ -65,10 +65,9 @@ func (a *Agent) Run() error {
return err
}
a.httpAddr.Store(listener.Addr().String())
a.closer = listener
go func() {
a.logger.Info("Starting jaeger-agent HTTP server", zap.Int("http-port", listener.Addr().(*net.TCPAddr).Port))
if err := a.httpServer.Serve(listener); err != nil {
if err := a.httpServer.Serve(listener); err != http.ErrServerClosed {
a.logger.Error("http server failure", zap.Error(err))
}
a.logger.Info("agent's http server exiting")
Expand All @@ -86,8 +85,16 @@ func (a *Agent) HTTPAddr() string {

// Stop forces all agent go routines to exit.
func (a *Agent) Stop() {
// first, close the http server, so that we don't have any more inflight requests
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
a.logger.Info("shutting down agent's HTTP server", zap.String("addr", a.HTTPAddr()))
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := a.httpServer.Shutdown(timeout); err != nil {
a.logger.Error("failed to close HTTP server", zap.Error(err))
}
cancel()

// then, close all processors that are called for the incoming http requests
for _, processor := range a.processors {
go processor.Stop()
processor.Stop()
}
a.closer.Close()
}
2 changes: 2 additions & 0 deletions cmd/agent/app/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package app

import (
"fmt"
"io"
"net/http"
"strconv"

Expand Down Expand Up @@ -68,6 +69,7 @@ var (
type CollectorProxy interface {
GetReporter() reporter.Reporter
GetManager() configmanager.ClientConfigManager
io.Closer
}

// Builder Struct to hold configurations
Expand Down
3 changes: 3 additions & 0 deletions cmd/agent/app/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ func (fakeCollectorProxy) EmitZipkinBatch(spans []*zipkincore.Span) (err error)
func (fakeCollectorProxy) EmitBatch(batch *jaeger.Batch) (err error) {
return nil
}
func (fakeCollectorProxy) Close() error {
return nil
}

func (f fakeCollectorProxy) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
return nil, errors.New("no peers available")
Expand Down
3 changes: 3 additions & 0 deletions cmd/agent/app/reporter/grpc/collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package grpc

import (
"io"
"net"
"testing"
"time"
Expand All @@ -29,6 +30,8 @@ import (
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
)

var _ io.Closer = (*ProxyBuilder)(nil)

func TestMultipleCollectors(t *testing.T) {
spanHandler1 := &mockSpanHandler{}
s1, addr1 := initializeGRPCTestServer(t, func(s *grpc.Server) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/tchannel/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (b ProxyBuilder) GetManager() configmanager.ClientConfigManager {

// Close closes connections used by proxy.
func (b ProxyBuilder) Close() error {
b.tchanRep.Channel().Close()
b.reporter.Close()
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
b.tchanRep.Channel().Close()
return nil
}
3 changes: 3 additions & 0 deletions cmd/agent/app/reporter/tchannel/collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package tchannel

import (
"io"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -27,6 +28,8 @@ import (
"github.com/jaegertracing/jaeger/cmd/agent/app/reporter"
)

var _ io.Closer = (*ProxyBuilder)(nil)

func TestErrorReporterBuilder(t *testing.T) {
tbuilder := NewBuilder().WithDiscoverer(fakeDiscoverer{})
b, err := NewCollectorProxy(tbuilder, metrics.NullFactory, zap.NewNop())
Expand Down
6 changes: 2 additions & 4 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package main

import (
"fmt"
"io"
"os"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -76,9 +75,8 @@ func main() {
return fmt.Errorf("failed to run the agent: %w", err)
}
svc.RunAndThen(func() {
if closer, ok := cp.(io.Closer); ok {
closer.Close()
}
agent.Stop()
cp.Close()
})
return nil
},
Expand Down
30 changes: 17 additions & 13 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ by default uses only in-memory database.`,
cOpts := new(collectorApp.CollectorOptions).InitFromViper(v)
qOpts := new(queryApp.QueryOptions).InitFromViper(v, logger)

// collector
c := collectorApp.New(&collectorApp.CollectorParams{
ServiceName: "jaeger-collector",
Logger: logger,
Expand All @@ -130,14 +131,25 @@ by default uses only in-memory database.`,
})
c.Start(cOpts)

startAgent(aOpts, repOpts, tchanBuilder, grpcBuilder, cOpts.CollectorGRPCPort, logger, metricsFactory)
// agent
grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorGRPCPort))
agentMetricsFactory := metricsFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil})
cp, err := agentApp.CreateCollectorProxy(repOpts, tchanBuilder, grpcBuilder, logger, agentMetricsFactory)
if err != nil {
logger.Fatal("Could not create collector proxy", zap.Error(err))
}
agent := startAgent(cp, aOpts, logger, metricsFactory)
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved

// query
querySrv := startQuery(
svc, qOpts, archiveOptions(storageFactory, logger),
spanReader, dependencyReader,
rootMetricsFactory, metricsFactory,
)

svc.RunAndThen(func() {
agent.Stop()
cp.Close()
c.Close()
querySrv.Close()
if closer, ok := spanWriter.(io.Closer); ok {
Expand Down Expand Up @@ -177,21 +189,11 @@ by default uses only in-memory database.`,
}

func startAgent(
cp agentApp.CollectorProxy,
b *agentApp.Builder,
repOpts *agentRep.Options,
tchanBuilder *agentTchanRep.Builder,
grpcBuilder *agentGrpcRep.ConnBuilder,
collectorGRPCPort int,
logger *zap.Logger,
baseFactory metrics.Factory,
) {
metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil})

grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", collectorGRPCPort))
cp, err := agentApp.CreateCollectorProxy(repOpts, tchanBuilder, grpcBuilder, logger, metricsFactory)
if err != nil {
logger.Fatal("Could not create collector proxy", zap.Error(err))
}
) *agentApp.Agent {

agent, err := b.CreateAgent(cp, logger, baseFactory)
if err != nil {
Expand All @@ -202,6 +204,8 @@ func startAgent(
if err := agent.Run(); err != nil {
logger.Fatal("Failed to run the agent", zap.Error(err))
}

return agent
}

func startQuery(
Expand Down