Skip to content

Commit

Permalink
Handle Launchers with a manager type (#11286)
Browse files Browse the repository at this point in the history
This is quite similar to pkg/logs/scheduler.Schedulers.  One substantial
change is that the interesting internal bits of the logs-agent to which
a launcher has access are passed to its Start method.

This exposes the remaining "weird" launchers (container, kubernetes, and
docker) as feeling even weirder, but that will be addressed in
subsequent commits.
  • Loading branch information
djmitche authored Mar 25, 2022
1 parent 20681c9 commit 85b6987
Show file tree
Hide file tree
Showing 21 changed files with 318 additions and 136 deletions.
67 changes: 28 additions & 39 deletions pkg/logs/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,11 @@ import (
"time"

coreConfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/status/health"
"github.com/DataDog/datadog-agent/pkg/util"
"github.com/DataDog/datadog-agent/pkg/util/log"

"github.com/DataDog/datadog-agent/pkg/logs/auditor"
"github.com/DataDog/datadog-agent/pkg/logs/client"
"github.com/DataDog/datadog-agent/pkg/logs/config"
"github.com/DataDog/datadog-agent/pkg/logs/diagnostic"
"github.com/DataDog/datadog-agent/pkg/logs/internal/launchers"
"github.com/DataDog/datadog-agent/pkg/logs/internal/launchers/channel"
"github.com/DataDog/datadog-agent/pkg/logs/internal/launchers/container"
"github.com/DataDog/datadog-agent/pkg/logs/internal/launchers/docker"
Expand All @@ -30,6 +27,9 @@ import (
"github.com/DataDog/datadog-agent/pkg/logs/pipeline"
"github.com/DataDog/datadog-agent/pkg/logs/schedulers"
"github.com/DataDog/datadog-agent/pkg/logs/service"
"github.com/DataDog/datadog-agent/pkg/status/health"
"github.com/DataDog/datadog-agent/pkg/util"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/util/startstop"
)

Expand All @@ -43,7 +43,7 @@ type Agent struct {
auditor auditor.Auditor
destinationsCtx *client.DestinationsContext
pipelineProvider pipeline.Provider
inputs []startstop.StartStoppable
launchers *launchers.Launchers
health *health.Handle
diagnosticMessageReceiver *diagnostic.BufferedMessageReceiver

Expand All @@ -66,23 +66,34 @@ func NewAgent(sources *config.LogSources, services *service.Services, processing
// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(config.NumberOfPipelines, auditor, diagnosticMessageReceiver, processingRules, endpoints, destinationsCtx)

// setup the launchers
lnchrs := launchers.NewLaunchers(sources, pipelineProvider, auditor)
lnchrs.AddLauncher(filelauncher.NewLauncher(
coreConfig.Datadog.GetInt("logs_config.open_files_limit"),
filelauncher.DefaultSleepDuration,
coreConfig.Datadog.GetBool("logs_config.validate_pod_container_id"),
time.Duration(coreConfig.Datadog.GetFloat64("logs_config.file_scan_period")*float64(time.Second))))
lnchrs.AddLauncher(listener.NewLauncher(coreConfig.Datadog.GetInt("logs_config.frame_size")))
lnchrs.AddLauncher(journald.NewLauncher())
lnchrs.AddLauncher(windowsevent.NewLauncher())
lnchrs.AddLauncher(traps.NewLauncher())

// Only try to start the container launchers if Docker or Kubernetes is available
containerLaunchables := []container.Launchable{
{
IsAvailable: docker.IsAvailable,
Launcher: func() startstop.StartStoppable {
Launcher: func() launchers.Launcher {
return docker.NewLauncher(
time.Duration(coreConfig.Datadog.GetInt("logs_config.docker_client_read_timeout"))*time.Second,
sources,
services,
pipelineProvider,
auditor,
coreConfig.Datadog.GetBool("logs_config.docker_container_use_file"),
coreConfig.Datadog.GetBool("logs_config.docker_container_force_use_file"))
},
},
{
IsAvailable: kubernetes.IsAvailable,
Launcher: func() startstop.StartStoppable {
Launcher: func() launchers.Launcher {
return kubernetes.NewLauncher(sources, services, coreConfig.Datadog.GetBool("logs_config.container_collect_all"))
},
},
Expand All @@ -93,21 +104,8 @@ func NewAgent(sources *config.LogSources, services *service.Services, processing
containerLaunchables[0], containerLaunchables[1] = containerLaunchables[1], containerLaunchables[0]
}

validatePodContainerID := coreConfig.Datadog.GetBool("logs_config.validate_pod_container_id")

// setup the inputs
inputs := []startstop.StartStoppable{
filelauncher.NewLauncher(sources, coreConfig.Datadog.GetInt("logs_config.open_files_limit"), pipelineProvider, auditor,
filelauncher.DefaultSleepDuration, validatePodContainerID, time.Duration(coreConfig.Datadog.GetFloat64("logs_config.file_scan_period")*float64(time.Second))),
listener.NewLauncher(sources, coreConfig.Datadog.GetInt("logs_config.frame_size"), pipelineProvider),
journald.NewLauncher(sources, pipelineProvider, auditor),
windowsevent.NewLauncher(sources, pipelineProvider),
traps.NewLauncher(sources, pipelineProvider),
}

// Only try to start the container launchers if Docker or Kubernetes is available
if coreConfig.IsFeaturePresent(coreConfig.Docker) || coreConfig.IsFeaturePresent(coreConfig.Kubernetes) {
inputs = append(inputs, container.NewLauncher(containerLaunchables))
lnchrs.AddLauncher(container.NewLauncher(containerLaunchables))
}

return &Agent{
Expand All @@ -117,7 +115,7 @@ func NewAgent(sources *config.LogSources, services *service.Services, processing
auditor: auditor,
destinationsCtx: destinationsCtx,
pipelineProvider: pipelineProvider,
inputs: inputs,
launchers: lnchrs,
health: health,
diagnosticMessageReceiver: diagnosticMessageReceiver,
}
Expand All @@ -138,10 +136,9 @@ func NewServerless(sources *config.LogSources, services *service.Services, proce
// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewServerlessProvider(config.NumberOfPipelines, auditor, processingRules, endpoints, destinationsCtx)

// setup the inputs
inputs := []startstop.StartStoppable{
channel.NewLauncher(sources, pipelineProvider),
}
// setup the sole launcher for this agent
lnchrs := launchers.NewLaunchers(sources, pipelineProvider, auditor)
lnchrs.AddLauncher(channel.NewLauncher())

return &Agent{
sources: sources,
Expand All @@ -150,7 +147,7 @@ func NewServerless(sources *config.LogSources, services *service.Services, proce
auditor: auditor,
destinationsCtx: destinationsCtx,
pipelineProvider: pipelineProvider,
inputs: inputs,
launchers: lnchrs,
health: health,
diagnosticMessageReceiver: diagnosticMessageReceiver,
}
Expand All @@ -164,16 +161,12 @@ func (a *Agent) Start() {
}
a.started = true

inputs := startstop.NewStarter()
for _, input := range a.inputs {
inputs.Add(input)
}
starter := startstop.NewStarter(
a.destinationsCtx,
a.auditor,
a.pipelineProvider,
a.diagnosticMessageReceiver,
inputs,
a.launchers,
a.schedulers,
)
starter.Start()
Expand All @@ -187,13 +180,9 @@ func (a *Agent) Flush(ctx context.Context) {
// Stop stops all the elements of the data pipeline
// in the right order to prevent data loss
func (a *Agent) Stop() {
inputs := startstop.NewParallelStopper()
for _, input := range a.inputs {
inputs.Add(input)
}
stopper := startstop.NewSerialStopper(
a.schedulers,
inputs,
a.launchers,
a.pipelineProvider,
a.auditor,
a.destinationsCtx,
Expand Down
2 changes: 2 additions & 0 deletions pkg/logs/internal/launchers/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Launchers

Launchers are responsible for translating sources (config.LogSource) to tailers, and managing their tailers' lifecycle.

The logs agent maintains a set of current launchers, starting and stopping them at startup and stopping them when the logs-agent stops.
12 changes: 7 additions & 5 deletions pkg/logs/internal/launchers/channel/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
package channel

import (
"github.com/DataDog/datadog-agent/pkg/logs/auditor"
"github.com/DataDog/datadog-agent/pkg/logs/config"
"github.com/DataDog/datadog-agent/pkg/logs/internal/launchers"
tailer "github.com/DataDog/datadog-agent/pkg/logs/internal/tailers/channel"
"github.com/DataDog/datadog-agent/pkg/logs/pipeline"
)
Expand All @@ -20,16 +22,16 @@ type Launcher struct {
}

// NewLauncher returns an initialized Launcher
func NewLauncher(sources *config.LogSources, pipelineProvider pipeline.Provider) *Launcher {
func NewLauncher() *Launcher {
return &Launcher{
pipelineProvider: pipelineProvider,
sources: sources.GetAddedForType(config.StringChannelType),
stop: make(chan struct{}),
stop: make(chan struct{}),
}
}

// Start starts the launcher.
func (l *Launcher) Start() {
func (l *Launcher) Start(sourceProvider launchers.SourceProvider, pipelineProvider pipeline.Provider, registry auditor.Registry) {
l.pipelineProvider = pipelineProvider
l.sources = sourceProvider.GetAddedForType(config.StringChannelType)
go l.run()
}

Expand Down
21 changes: 15 additions & 6 deletions pkg/logs/internal/launchers/container/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,26 @@ import (
"sync"
"time"

"github.com/DataDog/datadog-agent/pkg/logs/auditor"
"github.com/DataDog/datadog-agent/pkg/logs/internal/launchers"
"github.com/DataDog/datadog-agent/pkg/logs/pipeline"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/util/retry"
"github.com/DataDog/datadog-agent/pkg/util/startstop"
)

// Launchable is a retryable wrapper for a restartable
type Launchable struct {
IsAvailable func() (bool, *retry.Retrier)
Launcher func() startstop.StartStoppable
Launcher func() launchers.Launcher
}

// Launcher tries to select a container launcher and retry on failure
type Launcher struct {
containerLaunchables []Launchable
activeLauncher startstop.StartStoppable
activeLauncher launchers.Launcher
sourceProvider launchers.SourceProvider
pipelineProvider pipeline.Provider
registry auditor.Registry
stop bool
sync.Mutex
}
Expand All @@ -41,7 +46,7 @@ func (l *Launcher) launch(launchable Launchable) {
launcher = NewNoopLauncher()
}
l.activeLauncher = launcher
l.activeLauncher.Start()
l.activeLauncher.Start(l.sourceProvider, l.pipelineProvider, l.registry)
}

func (l *Launcher) shouldRetry() (bool, time.Duration) {
Expand All @@ -68,12 +73,16 @@ func (l *Launcher) shouldRetry() (bool, time.Duration) {
}

// Start starts the launcher
func (l *Launcher) Start() {
func (l *Launcher) Start(sourceProvider launchers.SourceProvider, pipelineProvider pipeline.Provider, registry auditor.Registry) {
l.sourceProvider = sourceProvider
l.pipelineProvider = pipelineProvider
l.registry = registry

// If we are restarting, start up the active launcher since we already picked one from a previous run
l.Lock()
if l.activeLauncher != nil {
l.stop = true
l.activeLauncher.Start()
l.activeLauncher.Start(l.sourceProvider, l.pipelineProvider, l.registry)
l.Unlock()
return
}
Expand Down
32 changes: 17 additions & 15 deletions pkg/logs/internal/launchers/container/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import (
"testing"
"time"

"github.com/DataDog/datadog-agent/pkg/logs/auditor"
"github.com/DataDog/datadog-agent/pkg/logs/internal/launchers"
"github.com/DataDog/datadog-agent/pkg/logs/pipeline"
"github.com/DataDog/datadog-agent/pkg/util/retry"
"github.com/DataDog/datadog-agent/pkg/util/startstop"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -71,7 +73,7 @@ func (m *mockLauncher) SetAvailable(Available bool) {
m.isAvailable = Available
}

func (m *mockLauncher) Start() {
func (m *mockLauncher) Start(sourceProvider launchers.SourceProvider, pipelineProvider pipeline.Provider, registry auditor.Registry) {
m.startCount++
m.wg.Done()
}
Expand All @@ -82,7 +84,7 @@ func (m *mockLauncher) Stop() {
func (m *mockLauncher) ToLaunchable() Launchable {
return Launchable{
IsAvailable: m.IsAvailable,
Launcher: func() startstop.StartStoppable {
Launcher: func() launchers.Launcher {
m.wg.Done()
return m
},
Expand All @@ -92,7 +94,7 @@ func (m *mockLauncher) ToLaunchable() Launchable {
func (m *mockLauncher) ToErrLaunchable() Launchable {
return Launchable{
IsAvailable: m.IsAvailable,
Launcher: func() startstop.StartStoppable {
Launcher: func() launchers.Launcher {
m.wg.Done()
return nil
},
Expand All @@ -105,7 +107,7 @@ func TestSelectFirst(t *testing.T) {

l1.wg.Add(2)
l := NewLauncher([]Launchable{l1.ToLaunchable(), l2.ToLaunchable()})
l.Start()
l.Start(nil, nil, nil)

l1.wg.Wait()
assert.Equal(t, 1, l1.startCount)
Expand All @@ -118,7 +120,7 @@ func TestSelectSecond(t *testing.T) {

l2.wg.Add(2)
l := NewLauncher([]Launchable{l1.ToLaunchable(), l2.ToLaunchable()})
l.Start()
l.Start(nil, nil, nil)

l2.wg.Wait()
assert.Equal(t, 0, l1.startCount)
Expand All @@ -131,7 +133,7 @@ func TestFailsThenSucceeds(t *testing.T) {

l2.wg.Add(2)
l := NewLauncher([]Launchable{l1.ToLaunchable(), l2.ToLaunchable()})
l.Start()
l.Start(nil, nil, nil)

// let it run a few times
time.Sleep(10 * time.Millisecond)
Expand All @@ -152,7 +154,7 @@ func TestFailsThenSucceedsRetrier(t *testing.T) {

l1.wg.Add(3)
l := NewLauncher([]Launchable{l1.ToLaunchable(), l2.ToLaunchable()})
l.Start()
l.Start(nil, nil, nil)

l1.wg.Wait()

Expand All @@ -166,7 +168,7 @@ func TestAvailableLauncherReturnsNil(t *testing.T) {

l2.wg.Add(1)
l := NewLauncher([]Launchable{l1.ToLaunchable(), l2.ToErrLaunchable()})
l.Start()
l.Start(nil, nil, nil)

l2.wg.Wait()
assert.Equal(t, 0, l1.startCount)
Expand All @@ -183,7 +185,7 @@ func TestRestartUsesPreviousLauncher(t *testing.T) {

l1.wg.Add(2)
l := NewLauncher([]Launchable{l1.ToLaunchable(), l2.ToLaunchable()})
l.Start()
l.Start(nil, nil, nil)

l1.wg.Wait()
l.Stop()
Expand All @@ -193,7 +195,7 @@ func TestRestartUsesPreviousLauncher(t *testing.T) {
assert.Equal(t, 0, l2.stopCount)

l1.wg.Add(1)
l.Start()
l.Start(nil, nil, nil)
l1.wg.Wait()

assert.Equal(t, 2, l1.startCount)
Expand All @@ -205,7 +207,7 @@ func TestRestartFindLauncherLater(t *testing.T) {
l2 := newMockLauncher(false)

l := NewLauncher([]Launchable{l1.ToLaunchable(), l2.ToLaunchable()})
l.Start()
l.Start(nil, nil, nil)

// let it run a few times
time.Sleep(10 * time.Millisecond)
Expand All @@ -219,7 +221,7 @@ func TestRestartFindLauncherLater(t *testing.T) {
l1.SetAvailable(true)

l1.wg.Add(2)
l.Start()
l.Start(nil, nil, nil)
l1.wg.Wait()

assert.Equal(t, 1, l1.startCount)
Expand All @@ -232,7 +234,7 @@ func TestRestartSameLauncher(t *testing.T) {

l1.wg.Add(2)
l := NewLauncher([]Launchable{l1.ToLaunchable(), l2.ToLaunchable()})
l.Start()
l.Start(nil, nil, nil)

// let it run a few times
time.Sleep(10 * time.Millisecond)
Expand All @@ -245,7 +247,7 @@ func TestRestartSameLauncher(t *testing.T) {
assert.Equal(t, 0, l2.stopCount)

l1.wg.Add(1)
l.Start()
l.Start(nil, nil, nil)
l1.wg.Wait()

assert.Equal(t, 2, l1.startCount)
Expand Down
Loading

0 comments on commit 85b6987

Please sign in to comment.