Skip to content

Commit

Permalink
beater: create listener synchronously in Reload
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Aug 26, 2021
1 parent a86fce3 commit bd2ec9c
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 35 deletions.
12 changes: 9 additions & 3 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,14 @@ func (r *reloader) reload(rawConfig *common.Config, namespace string, fleetConfi
if err != nil {
return err
}
// Start listening before we stop the existing runner (if any), to ensure zero downtime.
listener, err := listen(runner.config, runner.logger)
if err != nil {
return err
}
go func() {
if err := runner.run(); err != nil {
defer listener.Close()
if err := runner.run(listener); err != nil {
r.args.Logger.Error(err)
}
}()
Expand Down Expand Up @@ -345,7 +351,7 @@ func newServerRunner(ctx context.Context, args serverRunnerParams) (*serverRunne
}, nil
}

func (s *serverRunner) run() error {
func (s *serverRunner) run(listener net.Listener) error {
defer close(s.done)

// Send config to telemetry.
Expand Down Expand Up @@ -430,7 +436,7 @@ func (s *serverRunner) run() error {
// onboarding docs. Because these bypass the model processor framework, we
// must augment the reporter to set common `observer` and `ecs.version` fields.
reporter := publisher.Send
runServer := newBaseRunServer(augmentedReporter(reporter, s.beat.Info))
runServer := newBaseRunServer(listener, augmentedReporter(reporter, s.beat.Info))
if s.tracerServer != nil {
runServer = runServerWithTracerServer(runServer, s.tracerServer, s.tracer)
}
Expand Down
58 changes: 30 additions & 28 deletions beater/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func newHTTPServer(
cfg *config.Config,
handler http.Handler,
reporter publish.Reporter,
listener net.Listener,
) (*httpServer, error) {

server := &http.Server{
Expand Down Expand Up @@ -77,26 +78,11 @@ func newHTTPServer(
if err != nil {
return nil, err
}
httpListener, err := listen(cfg)
if err != nil {
return nil, err
}
if cfg.MaxConnections > 0 {
httpListener = netutil.LimitListener(httpListener, cfg.MaxConnections)
logger.Infof("Connection limit set to: %d", cfg.MaxConnections)
}

return &httpServer{server, cfg, logger, reporter, grpcListener, httpListener}, nil
return &httpServer{server, cfg, logger, reporter, grpcListener, listener}, nil
}

func (h *httpServer) start() error {
addr := h.httpListener.Addr()
if addr.Network() == "tcp" {
h.logger.Infof("Listening on: %s", addr)
} else {
h.logger.Infof("Listening on: %s:%s", addr.Network(), addr.String())
}

if h.cfg.RumConfig.Enabled {
h.logger.Info("RUM endpoints enabled!")
for _, s := range h.cfg.RumConfig.AllowOrigins {
Expand All @@ -114,7 +100,7 @@ func (h *httpServer) start() error {
// listening address. We only do this if data streams are not enabled,
// as onboarding documents are incompatible with data streams.
// Onboarding documents should be replaced by Fleet status later.
notifyListening(context.Background(), addr, h.reporter)
notifyListening(context.Background(), h.httpListener.Addr(), h.reporter)
}

if h.cfg.TLS.IsEnabled() {
Expand All @@ -140,21 +126,37 @@ func (h *httpServer) stop() {
}

// listen starts the listener for bt.config.Host.
func listen(cfg *config.Config) (net.Listener, error) {
if url, err := url.Parse(cfg.Host); err == nil && url.Scheme == "unix" {
func listen(cfg *config.Config, logger *logp.Logger) (net.Listener, error) {
var listener net.Listener
url, err := url.Parse(cfg.Host)
if err == nil && url.Scheme == "unix" {
// SO_REUSEPORT does not support unix sockets
return net.Listen("unix", url.Path)
listener, err = net.Listen("unix", url.Path)
} else {
addr := cfg.Host
if _, _, err := net.SplitHostPort(addr); err != nil {
// Tack on a port if SplitHostPort fails on what should be a
// tcp network address. If splitting failed because there were
// already too many colons, one more won't change that.
addr = net.JoinHostPort(addr, config.DefaultPort)
}
listener, err = reuseport.Listen("tcp", addr)
}
if err != nil {
return nil, err
}

const network = "tcp"
addr := cfg.Host
if _, _, err := net.SplitHostPort(addr); err != nil {
// Tack on a port if SplitHostPort fails on what should be a
// tcp network address. If splitting failed because there were
// already too many colons, one more won't change that.
addr = net.JoinHostPort(addr, config.DefaultPort)
addr := listener.Addr()
if network := addr.Network(); network == "tcp" {
logger.Infof("Listening on: %s", addr)
} else {
logger.Infof("Listening on: %s:%s", network, addr.String())
}
if cfg.MaxConnections > 0 {
logger.Infof("Connection limit set to: %d", cfg.MaxConnections)
listener = netutil.LimitListener(listener, cfg.MaxConnections)
}
return reuseport.Listen(network, addr)
return listener, nil
}

func doNotTrace(req *http.Request) bool {
Expand Down
9 changes: 5 additions & 4 deletions beater/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package beater

import (
"context"
"net"
"net/http"
"time"

Expand Down Expand Up @@ -88,9 +89,9 @@ type ServerParams struct {
//
// Once we remove sourcemap uploading and onboarding docs, we
// should remove the reporter parameter.
func newBaseRunServer(reporter publish.Reporter) RunServerFunc {
func newBaseRunServer(listener net.Listener, reporter publish.Reporter) RunServerFunc {
return func(ctx context.Context, args ServerParams) error {
srv, err := newServer(args, reporter)
srv, err := newServer(args, listener, reporter)
if err != nil {
return err
}
Expand Down Expand Up @@ -118,7 +119,7 @@ type server struct {
jaegerServer *jaeger.Server
}

func newServer(args ServerParams, reporter publish.Reporter) (server, error) {
func newServer(args ServerParams, listener net.Listener, reporter publish.Reporter) (server, error) {
agentcfgFetchReporter := agentcfg.NewReporter(agentcfg.NewFetcher(args.Config), args.BatchProcessor, 30*time.Second)

// DEPRECATED: dedicated Jaeger server. This does not use the same authenticator and is not rate limited.
Expand Down Expand Up @@ -157,7 +158,7 @@ func newServer(args ServerParams, reporter publish.Reporter) (server, error) {
return server{}, err
}
handler := apmhttp.Wrap(mux, apmhttp.WithServerRequestIgnorer(doNotTrace), apmhttp.WithTracer(args.Tracer))
httpServer, err := newHTTPServer(args.Logger, args.Info, args.Config, handler, reporter)
httpServer, err := newHTTPServer(args.Logger, args.Info, args.Config, handler, reporter, listener)
if err != nil {
return server{}, err
}
Expand Down
12 changes: 12 additions & 0 deletions beater/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,18 @@ func TestServerConfigReload(t *testing.T) {
err = reloadable.Reload([]*reload.ConfigWithMeta{{Config: common.NewConfig()}})
assert.EqualError(t, err, "'apm-server' not found in integration config")

// Creating the socket listener is performed synchronously in the Reload method
// to ensure zero downtime when reloading an already running server. Illustrate
// that the socket listener is created synhconously in Reload by attempting to
// reload with an invalid host.
err = reloadable.Reload([]*reload.ConfigWithMeta{{Config: common.MustNewConfigFrom(map[string]interface{}{
"apm-server": map[string]interface{}{
"host": "testing.invalid:123",
},
})}})
require.Error(t, err)
assert.Regexp(t, "listen tcp: lookup testing.invalid: .*", err.Error())

inputConfig := common.MustNewConfigFrom(map[string]interface{}{
"apm-server": map[string]interface{}{
"host": "localhost:0",
Expand Down

0 comments on commit bd2ec9c

Please sign in to comment.