Skip to content

Commit

Permalink
feat (mondel): Buffered and synced write
Browse files Browse the repository at this point in the history
Bonus: Added an e2e test.
Signed-off-by: Ivan Velichko <[email protected]>
  • Loading branch information
iximiuz committed Dec 22, 2023
1 parent f7fbef0 commit 567f4fc
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 37 deletions.
42 changes: 42 additions & 0 deletions pkg/app/sensor/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,3 +628,45 @@ func TestControlCommands_StopTargetApp(t *testing.T) {

sensor.AssertSensorLogsContain(t, ctx, sensorFullLifecycleSequence...)
}

func TestEnableMondel(t *testing.T) {
runID := newTestRun(t)
ctx := context.Background()

sensor := testsensor.NewSensorOrFail(
t, ctx, t.TempDir(), runID, imageSimpleService,
testsensor.WithEnableMondel(),
)
defer sensor.Cleanup(t, ctx)

sensor.StartStandaloneOrFail(t, ctx, nil)
go testutil.Delayed(ctx, 5*time.Second, func() {
sensor.SignalOrFail(t, ctx, syscall.SIGTERM)
})
sensor.WaitOrFail(t, ctx)

sensor.AssertSensorLogsContain(t, ctx, sensorFullLifecycleSequence...)

sensor.DownloadArtifactsOrFail(t, ctx)

sensor.AssertMondelIncludesFiles(t,
"/etc/nginx/nginx.conf",
"/etc/nginx/conf.d/default.conf",

// TODO: investigate why these files are not included in the mondel (but are in the creport).
// "/bin/sh",
// "/var/cache/nginx",
// "/var/run",
)
sensor.AssertMondelNotIncludesFiles(t,
"/bin/bash",
"/bin/cat",
"/etc/apt/sources.list",

// TODO: investigate why this file is included in the mondel (but not in the creport).
// "/run/nginx.pid",
)

// Uncomment when the mondel and creport file sets are synced.
// sensor.AssertReportAndMondelFileListsMatch(t)
}
11 changes: 8 additions & 3 deletions pkg/app/sensor/monitor/fanotify/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,10 @@ func (m *monitor) processEvent(e Event, fanReport *report.FanMonitorReport) {
}

if err := m.del.Publish(delEvent); err != nil {
logger.Tracef("m.del.Publish - not ok - %v", err)
logger.Errorf(
"mondel publish event failed - source=%v type=%v: %v",
delEvent.Source, delEvent.Type, err,
)
}
}

Expand Down Expand Up @@ -328,9 +331,11 @@ func (m *monitor) processEvent(e Event, fanReport *report.FanMonitorReport) {
WorkDir: newProcess.Cwd,
Root: newProcess.Root,
}

if err := m.del.Publish(delEvent); err != nil {
logger.Tracef("m.del.Publish - not ok - %v", err)
logger.Errorf(
"mondel publish event failed - source=%v type=%v: %v",
delEvent.Source, delEvent.Type, err,
)
}
}

Expand Down
67 changes: 45 additions & 22 deletions pkg/mondel/mondel.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/slimtoolkit/slim/pkg/acounter"
"github.com/slimtoolkit/slim/pkg/report"
"github.com/slimtoolkit/slim/pkg/util/fsutil"
)

const eventBufSize = 10000
Expand All @@ -28,12 +29,11 @@ type Publisher interface {
}

type publisher struct {
ctx context.Context
enable bool
outputFile string
output *os.File
eventCh chan *report.MonitorDataEvent
seqNumber acounter.Type
ctx context.Context
enable bool
output *os.File
eventCh chan *report.MonitorDataEvent
seqNumber acounter.Type
}

func NewPublisher(ctx context.Context, enable bool, outputFile string) *publisher {
Expand All @@ -45,26 +45,36 @@ func NewPublisher(ctx context.Context, enable bool, outputFile string) *publishe
defer logger.Trace("exit")

ref := &publisher{
ctx: ctx,
enable: enable,
outputFile: outputFile,
ctx: ctx,
enable: enable,
}

if !ref.enable {
return ref
}

f, err := os.OpenFile(ref.outputFile, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
// fsutil.Touch() creates potentially missing folder(s).
if err := fsutil.Touch(outputFile); err != nil {
log.WithError(err).Errorf("cannot create mondel file %q - fsutil.Touch() failed", outputFile)
ref.enable = false
return ref
}

// Using O_SYNC because there is another process (art_collector) that is
// reading from this file. If we don't use O_SYNC, then the file may not
// be flushed to disk for too long.
f, err := os.OpenFile(outputFile, os.O_APPEND|os.O_WRONLY|os.O_SYNC, 0644)
if err != nil {
log.WithError(err).Errorf("os.OpenFile(%v)", ref.outputFile)
} else {
ref.output = f
log.WithError(err).Errorf("os.OpenFile(%v)", outputFile)
ref.enable = false
return ref
}

ref.output = f
ref.eventCh = make(chan *report.MonitorDataEvent, eventBufSize)

go ref.process()

return ref
}

Expand All @@ -77,8 +87,12 @@ func (ref *publisher) Publish(event *report.MonitorDataEvent) error {
event.SeqNumber = ref.seqNumber.Inc()

select {
case <-ref.ctx.Done():
return ref.ctx.Err()

case ref.eventCh <- event:
return nil

default:
log.Debugf("mondel.publisher.Publish: dropped event (%#v)", event)
return ErrEventDropped
Expand All @@ -90,11 +104,21 @@ func (ref *publisher) process() {
logger.Trace("call")
defer logger.Trace("exit")

var buf bytes.Buffer
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

done:
for {
select {
case <-ref.ctx.Done():
logger.Debug("done - stopping...")
// Flush any remaining data in the buffer
if buf.Len() > 0 {
if _, err := ref.output.WriteString(buf.String()); err != nil {
logger.Errorf("Error writing remaining data: %v", err)
}
}
break done

case evt := <-ref.eventCh:
Expand All @@ -103,21 +127,20 @@ done:
logger.Debugf("could not encode - %v", encoded)
continue
}
buf.WriteString(encoded)

if ref.output != nil {
_, err := ref.output.WriteString(encoded)
if err != nil {
logger.Tracef("TMP: error writing - %v (%s)\n", err, encoded)
case <-ticker.C:
// Flush the buffer every second
if buf.Len() > 0 {
if _, err := ref.output.Write(buf.Bytes()); err != nil {
logger.Errorf("Error writing batch: %v", err)
}
} else {
fmt.Printf("%s", encoded)
buf.Reset()
}
}
}

if ref.output != nil {
ref.output.Close()
}
ref.output.Close()
}

func encodeEvent(event *report.MonitorDataEvent) (string, error) {
Expand Down
10 changes: 8 additions & 2 deletions pkg/monitor/ptrace/ptrace.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,10 @@ func (app *App) processFileActivity(e *syscallEvent) {
}

if err := app.del.Publish(delEvent); err != nil {
logger.Tracef("app.del.Publish - not ok - %v", err)
logger.Errorf(
"mondel publish event failed - source=%v type=%v: %v",
delEvent.Source, delEvent.Type, err,
)
}
}
}
Expand Down Expand Up @@ -368,7 +371,10 @@ func (app *App) processFileActivity(e *syscallEvent) {
}

if err := app.del.Publish(delEvent); err != nil {
logger.Tracef("app.del.Publish exec - not ok - %v", err)
logger.Errorf(
"mondel publish event failed - source=%v type=%v op_type=%v: %v",
delEvent.Source, delEvent.Type, delEvent.OpType, err,
)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/test/e2e/sensor/monitor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package sensor

import (
"github.com/slimtoolkit/slim/pkg/app/master/commands"
mastercommand "github.com/slimtoolkit/slim/pkg/app/master/command"
"github.com/slimtoolkit/slim/pkg/ipc/command"
)

Expand Down Expand Up @@ -48,7 +48,7 @@ func WithAppStderrToFile() StartMonitorOpt {

func WithPreserves(path ...string) StartMonitorOpt {
return func(cmd *command.StartMonitor) {
cmd.Preserves = commands.ParsePaths(path)
cmd.Preserves = mastercommand.ParsePaths(path)
}
}

Expand Down
Loading

0 comments on commit 567f4fc

Please sign in to comment.