Skip to content

Commit

Permalink
Fix double fetch (#35843)
Browse files Browse the repository at this point in the history
Fixes #35646 by only unpacking project monitors once.

This fixes the ever growing temp folder issue and is more efficient to boot. Previously we would call fetch on a monitor source every time it was run, but only cleanup the fetched resource once, when the monitor was unloaded. We now fetch once and cleanup once.

This project also fixes the very confusing issue of two files browser/project.go and browser/source/project.go, we have renamed browser/project.go to browser/sourcejob.go which makes reasoning about this change simpler.
  • Loading branch information
andrewvc authored Jun 22, 2023
1 parent 90f6a25 commit f7111dc
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 60 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Fix formatting issue with socket trace timeout. {pull}35434[35434]
- Update gval version. {pull}35636[35636]
- Fix serialization of processors when running diagnostics. {pull}35698[35698]
- Fix temp dir running out of space with project monitors. {issue}35843[35843]

*Heartbeat*

Expand Down
2 changes: 1 addition & 1 deletion x-pack/heartbeat/monitors/browser/browser.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func create(name string, cfg *config.C) (p plugin.Plugin, err error) {
return plugin.Plugin{}, fmt.Errorf("script monitors cannot be run as root")
}

s, err := NewProject(cfg)
s, err := NewSourceJob(cfg)
if err != nil {
return plugin.Plugin{}, err
}
Expand Down
18 changes: 18 additions & 0 deletions x-pack/heartbeat/monitors/browser/source/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"syscall"

"github.com/elastic/elastic-agent-libs/logp"
Expand All @@ -26,6 +27,8 @@ import (
type ProjectSource struct {
Content string `config:"content" json:"content"`
TargetDirectory string
fetched bool
mtx sync.Mutex
}

var ErrNoContent = fmt.Errorf("no 'content' value specified for project monitor source")
Expand All @@ -39,6 +42,14 @@ func (p *ProjectSource) Validate() error {
}

func (p *ProjectSource) Fetch() error {
// We only need to unzip the source exactly once
p.mtx.Lock()
defer p.mtx.Unlock()
if p.fetched {
logp.L().Debugf("browser project: re-use already unpacked source: %s", p.Workdir())
return nil
}

decodedBytes, err := base64.StdEncoding.DecodeString(p.Content)
if err != nil {
return err
Expand All @@ -60,6 +71,9 @@ func (p *ProjectSource) Fetch() error {
if err != nil {
return fmt.Errorf("could not make temp dir for unzipping project source: %w", err)
}

logp.L().Debugf("browser project: unpack source: %s", p.Workdir())

err = os.Chmod(p.TargetDirectory, defaultMod)
if err != nil {
return fmt.Errorf("failed assigning default mode %s to temp dir: %w", defaultMod, err)
Expand All @@ -81,6 +95,8 @@ func (p *ProjectSource) Fetch() error {
}
}

// We've succeeded, mark the fetch as a success
p.fetched = true
return nil
}

Expand Down Expand Up @@ -142,6 +158,8 @@ func (p *ProjectSource) Workdir() string {
}

func (p *ProjectSource) Close() error {
logp.L().Debugf("browser project: close project source: %s", p.Workdir())

if p.TargetDirectory != "" {
return os.RemoveAll(p.TargetDirectory)
}
Expand Down
21 changes: 21 additions & 0 deletions x-pack/heartbeat/monitors/browser/source/project_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,29 @@ func TestProjectSource(t *testing.T) {
return
}
require.NoError(t, err)

fetchAndValidate(t, psrc)
})
}
}

func TestFetchCaching(t *testing.T) {
cfg := mapstr.M{
"content": "UEsDBBQACAAIAJ27qVQAAAAAAAAAAAAAAAAiAAAAZXhhbXBsZXMvdG9kb3MvYWR2YW5jZWQuam91cm5leS50c5VRPW/CMBDd+RWnLA0Sigt0KqJqpbZTN+iEGKzkIC6JbfkuiBTx3+uEEAGlgi7Rnf38viIESCLkR/FJ6Eis1VIjpanATBKrWFCpOUU/kcCNzG2GJNgkhoRM1lLHmERfpnAay4ipo3JrHMMWmjPYwcKZHILn33zBqIV3ADIjkxdrJ4y251eZJFNJq3b1Hh1XJx+KeKK+8XATpxiv3o07RidI7Ex5OOocTEQixcz6mF66MRgGXkmxMhqkTiA2VcJ6NQsgpZcZAnueoAfhFqxcYs9/ncwJdl0YP9XeY6OJgb3qFDcMYwhejb5jsAUDyYxBaSi9HmCJlfZJ2vCYNCpc1h2d5m8AB/r99cU+GmS/hpwXc4nmrKh/K917yK57VqZe1lU6zM26WvIiY2WbHunWIiusb3IWVBP0/bP9NGinYTC/qcqWLloY9ybjNAy5VbzYdP1sdz3+8FqJleqsP7/ONPjjp++TPgS3eaks/wBQSwcIVYIEHGwBAADRAwAAUEsDBBQACAAIAJ27qVQAAAAAAAAAAAAAAAAZAAAAZXhhbXBsZXMvdG9kb3MvaGVscGVycy50c5VUTYvbMBC9768YRGAVyKb0uktCu9CeektvpRCtM4nFKpKQxt2kwf+9I9lJ5cRb6MWW5+u9eTOW3nsXCE4QCf0M8OCxImhhG9wexCc0KpKuPsSjpRr5FMXTXeVsJDBObT57v+I8WID0aoczaIKZwmIJpzvIFaUwqrFVDcp7MQPFdSqQlxAA9aY0QUqe7xw5mQo8saflZ3uGUpvNdxVfh1DEliHWmuOyGSan9GrXY4hdSW19Q1yswJ9Ika1zi28P5DZOZCZnjp2Pjh5lhr71+YAxSvHFEgZx20UqGVdoWGAXGFo0Zp5sD0YnOXX+uMi71TY3nTh2PYy0HZCaYMsm0umrC2cYuWYpStwWlksgPNBC9CKJ9UDqGDFQAv7GrFb6N/aqD0hEtl9pX9VYvQLViroR5KZqFXmlVEXmyDNJWS0wkT1aiqPD6fZPynIsEznoYDqdG7Q7qqcs2DPKzOVG7EyHhSj25n0Zyw62PJvcwH2vzz1PN3czSrifwHlaZfUbThuMFNzxPyj1GVeE/rHWRr2guaz1e6wu0foSmhPTL3DwiuqFshVDu/D4aPSPjz/FIK1n9dwQOfu3gk7pL9k4jK+M5lk0LBRy9CB7nn2yD+cStfuFQQ5+riK9kJQ3JV9cbCmuh1n6HF3h5LleimS7GkoynWVL5+KWS6h/AFBLBwgvDHpj+wEAAC8FAABQSwECLQMUAAgACACdu6lUVYIEHGwBAADRAwAAIgAAAAAAAAAAACAApIEAAAAAZXhhbXBsZXMvdG9kb3MvYWR2YW5jZWQuam91cm5leS50c1BLAQItAxQACAAIAJ27qVQvDHpj+wEAAC8FAAAZAAAAAAAAAAAAIACkgbwBAABleGFtcGxlcy90b2Rvcy9oZWxwZXJzLnRzUEsFBgAAAAACAAIAlwAAAP4DAAAAAA==",
}
psrc, err := dummyPSource(cfg)
require.NoError(t, err)
defer psrc.Close()

err = psrc.Fetch()
require.NoError(t, err)
wdir := psrc.Workdir()
err = psrc.Fetch()
require.NoError(t, err)
wdirNext := psrc.Workdir()
require.Equal(t, wdir, wdirNext)
}

func validateFileContents(t *testing.T, dir string) {
expected := []string{
"examples/todos/helpers.ts",
Expand All @@ -73,6 +91,9 @@ func validateFileContents(t *testing.T, dir string) {
}

func fetchAndValidate(t *testing.T, psrc *ProjectSource) {
defer func() {
_ = psrc.Close()
}()
err := psrc.Fetch()
require.NoError(t, err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,27 @@ import (
"github.com/elastic/beats/v7/x-pack/heartbeat/monitors/browser/synthexec"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

type JourneyLister func(ctx context.Context, projectPath string, params mapstr.M) (journeyNames []string, err error)

type Project struct {
type SourceJob struct {
rawCfg *config.C
projectCfg *Config
browserCfg *Config
ctx context.Context
cancel context.CancelFunc
}

func NewProject(rawCfg *config.C) (*Project, error) {
// Global project context to cancel all jobs
func NewSourceJob(rawCfg *config.C) (*SourceJob, error) {
// Global browser context to cancel all jobs
// on close
ctx, cancel := context.WithCancel(context.Background())

s := &Project{
s := &SourceJob{
rawCfg: rawCfg,
projectCfg: DefaultConfig(),
browserCfg: DefaultConfig(),
ctx: ctx,
cancel: cancel,
}
err := rawCfg.Unpack(s.projectCfg)
err := rawCfg.Unpack(s.browserCfg)
if err != nil {
return nil, ErrBadConfig(err)
}
Expand All @@ -50,31 +47,31 @@ func NewProject(rawCfg *config.C) (*Project, error) {
}

func ErrBadConfig(err error) error {
return fmt.Errorf("could not parse project config: %w", err)
return fmt.Errorf("could not parse browser config: %w", err)
}

func (p *Project) String() string {
func (sj *SourceJob) String() string {
panic("implement me")
}

func (p *Project) Fetch() error {
return p.projectCfg.Source.Active().Fetch()
func (sj *SourceJob) Fetch() error {
return sj.browserCfg.Source.Active().Fetch()
}

func (p *Project) Workdir() string {
return p.projectCfg.Source.Active().Workdir()
func (sj *SourceJob) Workdir() string {
return sj.browserCfg.Source.Active().Workdir()
}

func (p *Project) Params() map[string]interface{} {
return p.projectCfg.Params
func (sj *SourceJob) Params() map[string]interface{} {
return sj.browserCfg.Params
}

func (p *Project) FilterJourneys() synthexec.FilterJourneyConfig {
return p.projectCfg.FilterJourneys
func (sj *SourceJob) FilterJourneys() synthexec.FilterJourneyConfig {
return sj.browserCfg.FilterJourneys
}

func (p *Project) StdFields() stdfields.StdMonitorFields {
sFields, err := stdfields.ConfigToStdMonitorFields(p.rawCfg)
func (sj *SourceJob) StdFields() stdfields.StdMonitorFields {
sFields, err := stdfields.ConfigToStdMonitorFields(sj.rawCfg)
// Should be impossible since outer monitor.go should run this same code elsewhere
// TODO: Just pass stdfields in to remove second deserialize
if err != nil {
Expand All @@ -83,45 +80,45 @@ func (p *Project) StdFields() stdfields.StdMonitorFields {
return sFields
}

func (p *Project) Close() error {
if p.projectCfg.Source.ActiveMemo != nil {
p.projectCfg.Source.ActiveMemo.Close()
func (sj *SourceJob) Close() error {
if sj.browserCfg.Source.ActiveMemo != nil {
sj.browserCfg.Source.ActiveMemo.Close()
}

// Cancel running jobs ctxs
p.cancel()
sj.cancel()

return nil
}

func (p *Project) extraArgs() []string {
extraArgs := p.projectCfg.SyntheticsArgs
if len(p.projectCfg.PlaywrightOpts) > 0 {
s, err := json.Marshal(p.projectCfg.PlaywrightOpts)
func (sj *SourceJob) extraArgs() []string {
extraArgs := sj.browserCfg.SyntheticsArgs
if len(sj.browserCfg.PlaywrightOpts) > 0 {
s, err := json.Marshal(sj.browserCfg.PlaywrightOpts)
if err != nil {
// This should never happen, if it was parsed as a config it should be serializable
logp.L().Warn("could not serialize playwright options '%v': %w", p.projectCfg.PlaywrightOpts, err)
logp.L().Warn("could not serialize playwright options '%v': %w", sj.browserCfg.PlaywrightOpts, err)
} else {
extraArgs = append(extraArgs, "--playwright-options", string(s))
}
}
if p.projectCfg.IgnoreHTTPSErrors {
if sj.browserCfg.IgnoreHTTPSErrors {
extraArgs = append(extraArgs, "--ignore-https-errors")
}
if p.projectCfg.Sandbox {
if sj.browserCfg.Sandbox {
extraArgs = append(extraArgs, "--sandbox")
}
if p.projectCfg.Screenshots != "" {
extraArgs = append(extraArgs, "--screenshots", p.projectCfg.Screenshots)
if sj.browserCfg.Screenshots != "" {
extraArgs = append(extraArgs, "--screenshots", sj.browserCfg.Screenshots)
}
if p.projectCfg.Throttling != nil {
switch t := p.projectCfg.Throttling.(type) {
if sj.browserCfg.Throttling != nil {
switch t := sj.browserCfg.Throttling.(type) {
case bool:
if !t {
extraArgs = append(extraArgs, "--no-throttling")
}
case string:
extraArgs = append(extraArgs, "--throttling", fmt.Sprintf("%v", p.projectCfg.Throttling))
extraArgs = append(extraArgs, "--throttling", fmt.Sprintf("%v", sj.browserCfg.Throttling))
case map[string]interface{}:
j, err := json.Marshal(t)
if err != nil {
Expand All @@ -135,22 +132,22 @@ func (p *Project) extraArgs() []string {
return extraArgs
}

func (p *Project) jobs() []jobs.Job {
func (sj *SourceJob) jobs() []jobs.Job {
var j jobs.Job

isScript := p.projectCfg.Source.Inline != nil
ctx := context.WithValue(p.ctx, synthexec.SynthexecTimeout, p.projectCfg.Timeout+30*time.Second)
isScript := sj.browserCfg.Source.Inline != nil
ctx := context.WithValue(sj.ctx, synthexec.SynthexecTimeout, sj.browserCfg.Timeout+30*time.Second)

if isScript {
src := p.projectCfg.Source.Inline.Script
j = synthexec.InlineJourneyJob(ctx, src, p.Params(), p.StdFields(), p.extraArgs()...)
src := sj.browserCfg.Source.Inline.Script
j = synthexec.InlineJourneyJob(ctx, src, sj.Params(), sj.StdFields(), sj.extraArgs()...)
} else {
j = func(event *beat.Event) ([]jobs.Job, error) {
err := p.Fetch()
err := sj.Fetch()
if err != nil {
return nil, fmt.Errorf("could not fetch for project job: %w", err)
return nil, fmt.Errorf("could not fetch for browser source job: %w", err)
}
sj, err := synthexec.ProjectJob(ctx, p.Workdir(), p.Params(), p.FilterJourneys(), p.StdFields(), p.extraArgs()...)
sj, err := synthexec.ProjectJob(ctx, sj.Workdir(), sj.Params(), sj.FilterJourneys(), sj.StdFields(), sj.extraArgs()...)
if err != nil {
return nil, err
}
Expand All @@ -160,10 +157,10 @@ func (p *Project) jobs() []jobs.Job {
return []jobs.Job{j}
}

func (p *Project) plugin() plugin.Plugin {
func (sj *SourceJob) plugin() plugin.Plugin {
return plugin.Plugin{
Jobs: p.jobs(),
DoClose: p.Close,
Jobs: sj.jobs(),
DoClose: sj.Close,
Endpoints: 1,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestValidLocal(t *testing.T) {
},
"timeout": timeout,
})
_, e := NewProject(cfg)
_, e := NewSourceJob(cfg)
require.Error(t, e)
}

Expand All @@ -66,10 +66,10 @@ func TestValidInline(t *testing.T) {
},
"timeout": timeout,
})
s, e := NewProject(cfg)
s, e := NewSourceJob(cfg)
require.NoError(t, e)
require.NotNil(t, s)
require.Equal(t, script, s.projectCfg.Source.Inline.Script)
require.Equal(t, script, s.browserCfg.Source.Inline.Script)
require.Equal(t, "", s.Workdir())
require.Equal(t, testParams, s.Params())

Expand All @@ -86,7 +86,7 @@ func TestNameRequired(t *testing.T) {
},
},
})
_, e := NewProject(cfg)
_, e := NewSourceJob(cfg)
require.Regexp(t, ErrNameRequired, e)
}

Expand All @@ -99,15 +99,15 @@ func TestIDRequired(t *testing.T) {
},
},
})
_, e := NewProject(cfg)
_, e := NewSourceJob(cfg)
require.Regexp(t, ErrIdRequired, e)
}

func TestEmptySource(t *testing.T) {
cfg := conf.MustNewConfigFrom(mapstr.M{
"source": mapstr.M{},
})
s, e := NewProject(cfg)
s, e := NewSourceJob(cfg)

require.Regexp(t, ErrBadConfig(source.ErrInvalidSource), e)
require.Nil(t, s)
Expand Down Expand Up @@ -196,8 +196,8 @@ func TestExtraArgs(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &Project{
projectCfg: tt.cfg,
s := &SourceJob{
browserCfg: tt.cfg,
}
if got := s.extraArgs(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Project.extraArgs() = %v, want %v", got, tt.want)
Expand All @@ -217,9 +217,9 @@ func TestEmptyTimeout(t *testing.T) {
},
},
})
s, e := NewProject(cfg)
s, e := NewSourceJob(cfg)

require.NoError(t, e)
require.NotNil(t, s)
require.Equal(t, s.projectCfg.Timeout, defaults.Timeout)
require.Equal(t, s.browserCfg.Timeout, defaults.Timeout)
}

0 comments on commit f7111dc

Please sign in to comment.