diff --git a/agent/config.go b/agent/config.go index e5d8587bf..38eb69f6d 100644 --- a/agent/config.go +++ b/agent/config.go @@ -16,7 +16,7 @@ type Config struct { // NewConfigFromFile will create a new agent config from a YAML file. func NewConfigFromFile(file string) (*Config, error) { - contents, err := ioutil.ReadFile(file) + contents, err := ioutil.ReadFile(file) // #nosec - configs load based on user specified directory if err != nil { return nil, fmt.Errorf("could not find config file: %s", err) } diff --git a/cmd/stanza/logging.go b/cmd/stanza/logging.go index f4d3eb7de..a16abfe19 100644 --- a/cmd/stanza/logging.go +++ b/cmd/stanza/logging.go @@ -59,5 +59,6 @@ func registerWindowsSink() { } func newWinFileSink(u *url.URL) (zap.Sink, error) { - return os.OpenFile(u.Path[1:], os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644) + // Ensure permissions restrict access to the running user only + return os.OpenFile(u.Path[1:], os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) } diff --git a/cmd/stanza/offsets.go b/cmd/stanza/offsets.go index 31c2b2e53..b60cdeb6b 100644 --- a/cmd/stanza/offsets.go +++ b/cmd/stanza/offsets.go @@ -9,6 +9,7 @@ import ( "github.com/observiq/stanza/operator/helper" "github.com/spf13/cobra" "go.etcd.io/bbolt" + "go.uber.org/zap" ) var stdout io.Writer = os.Stdout @@ -19,9 +20,6 @@ func NewOffsetsCmd(rootFlags *RootFlags) *cobra.Command { Use: "offsets", Short: "Manage input operator offsets", Args: cobra.NoArgs, - Run: func(command *cobra.Command, args []string) { - stdout.Write([]byte("No offsets subcommand specified. See `stanza offsets help` for details\n")) - }, } offsets.AddCommand(NewOffsetsClearCmd(rootFlags)) @@ -46,7 +44,10 @@ func NewOffsetsClearCmd(rootFlags *RootFlags) *cobra.Command { if all { if len(args) != 0 { - stdout.Write([]byte("Providing a list of operator IDs does nothing with the --all flag\n")) + _, err := stdout.Write([]byte("Providing a list of operator IDs does nothing with the --all flag\n")) + if err != nil { + exitOnErr("", err) + } } err := db.Update(func(tx *bbolt.Tx) error { @@ -59,7 +60,10 @@ func NewOffsetsClearCmd(rootFlags *RootFlags) *cobra.Command { exitOnErr("Failed to delete offsets", err) } else { if len(args) == 0 { - stdout.Write([]byte("Must either specify a list of operators or the --all flag\n")) + _, err := stdout.Write([]byte("Must either specify a list of operators or the --all flag\n")) + if err != nil { + exitOnErr("", err) + } os.Exit(1) } @@ -101,8 +105,8 @@ func NewOffsetsListCmd(rootFlags *RootFlags) *cobra.Command { } return offsetBucket.ForEach(func(key, value []byte) error { - stdout.Write(append(key, '\n')) - return nil + _, err := stdout.Write(append(key, '\n')) + return err }) }) if err != nil { @@ -115,8 +119,12 @@ func NewOffsetsListCmd(rootFlags *RootFlags) *cobra.Command { } func exitOnErr(msg string, err error) { + var sugaredLogger *zap.SugaredLogger if err != nil { - os.Stderr.WriteString(fmt.Sprintf("%s: %s\n", msg, err)) + _, err := os.Stderr.WriteString(fmt.Sprintf("%s: %s\n", msg, err)) + if err != nil { + sugaredLogger.Errorw("Failed to write to stdout", zap.Any("error", err)) + } os.Exit(1) } } diff --git a/cmd/stanza/root.go b/cmd/stanza/root.go index 708276656..ca6bec2dd 100644 --- a/cmd/stanza/root.go +++ b/cmd/stanza/root.go @@ -7,7 +7,7 @@ import ( "net/http" // This package registers its HTTP endpoints for profiling using an init hook - _ "net/http/pprof" + _ "net/http/pprof" // #nosec "os" "runtime" "runtime/pprof" @@ -156,7 +156,11 @@ func startProfiling(ctx context.Context, flags *RootFlags, logger *zap.SugaredLo if err != nil { logger.Errorw("Failed to create CPU profile", zap.Error(err)) } - defer f.Close() + defer func() { + if err := f.Close(); err != nil { + logger.Errorf(err.Error()) + } + }() if err := pprof.StartCPUProfile(f); err != nil { log.Fatal("could not start CPU profile: ", err) @@ -185,7 +189,11 @@ func startProfiling(ctx context.Context, flags *RootFlags, logger *zap.SugaredLo if err != nil { logger.Errorw("Failed to create memory profile", zap.Error(err)) } - defer f.Close() // error handling omitted for example + defer func() { + if err := f.Close(); err != nil { + logger.Errorw("Failed to close file", zap.Error(err)) + } + }() runtime.GC() // get up-to-date statistics if err := pprof.WriteHeapProfile(f); err != nil { diff --git a/database/database.go b/database/database.go index 53304523a..24445860f 100644 --- a/database/database.go +++ b/database/database.go @@ -49,7 +49,7 @@ func OpenDatabase(file string) (Database, error) { if _, err := os.Stat(filepath.Dir(file)); err != nil { if os.IsNotExist(err) { - err := os.MkdirAll(filepath.Dir(file), 0755) + err := os.MkdirAll(filepath.Dir(file), 0755) // #nosec - 0755 directory permissions are okay if err != nil { return nil, fmt.Errorf("creating database directory: %s", err) } @@ -59,5 +59,5 @@ func OpenDatabase(file string) (Database, error) { } options := &bbolt.Options{Timeout: 1 * time.Second} - return bbolt.Open(file, 0666, options) + return bbolt.Open(file, 0600, options) } diff --git a/operator/buffer/disk.go b/operator/buffer/disk.go index 94ea1e9d1..cc6ee5125 100644 --- a/operator/buffer/disk.go +++ b/operator/buffer/disk.go @@ -125,7 +125,8 @@ func (d *DiskBuffer) Open(path string, sync bool) error { if sync { flags |= os.O_SYNC } - if d.data, err = os.OpenFile(dataPath, flags, 0755); err != nil { + // #nosec - configs load based on user specified directory + if d.data, err = os.OpenFile(dataPath, flags, 0600); err != nil { return err } diff --git a/operator/buffer/disk_metadata.go b/operator/buffer/disk_metadata.go index fd9328791..856811b35 100644 --- a/operator/buffer/disk_metadata.go +++ b/operator/buffer/disk_metadata.go @@ -56,7 +56,8 @@ func OpenMetadata(path string, sync bool) (*Metadata, error) { if sync { flags |= os.O_SYNC } - if m.file, err = os.OpenFile(path, flags, 0755); err != nil { + // #nosec - configs load based on user specified directory + if m.file, err = os.OpenFile(path, flags, 0600); err != nil { return &Metadata{}, err } @@ -108,8 +109,7 @@ func (m *Metadata) Close() error { if err != nil { return err } - m.file.Close() - return nil + return m.file.Close() } // setDeadRange sets the dead range start and length, then persists it to disk diff --git a/operator/buffer/disk_test.go b/operator/buffer/disk_test.go index ae7c8a53c..303739a5c 100644 --- a/operator/buffer/disk_test.go +++ b/operator/buffer/disk_test.go @@ -142,6 +142,8 @@ func TestDiskBuffer(t *testing.T) { err = b2.Open(dir, false) require.NoError(t, err) readN(t, b2, 20, 0) + err = b2.Close() + require.NoError(t, err) }) t.Run("Write20Flush10CloseRead20", func(t *testing.T) { @@ -160,6 +162,8 @@ func TestDiskBuffer(t *testing.T) { err = b2.Open(dir, false) require.NoError(t, err) readN(t, b2, 10, 10) + err = b2.Close() + require.NoError(t, err) }) t.Run("ReadWaitTimesOut", func(t *testing.T) { @@ -178,6 +182,11 @@ func TestDiskBuffer(t *testing.T) { b := NewDiskBuffer(100) // Enough space for 1, but not 2 entries dir := testutil.NewTempDir(t) err := b.Open(dir, false) + defer func() { + if err := b.Close(); err != nil { + t.Error(err.Error()) + } + }() require.NoError(t, err) // Add a first entry @@ -214,6 +223,11 @@ func TestDiskBuffer(t *testing.T) { b := NewDiskBuffer(1 << 30) dir := testutil.NewTempDir(t) err := b.Open(dir, false) + defer func() { + if err := b.Close(); err != nil { + t.Error(err.Error()) + } + }() require.NoError(t, err) writes := 0 @@ -247,6 +261,11 @@ func TestDiskBufferBuild(t *testing.T) { cfg.Path = testutil.NewTempDir(t) b, err := cfg.Build(testutil.NewBuildContext(t), "test") require.NoError(t, err) + defer func() { + if err := b.Close(); err != nil { + t.Error(err.Error()) + } + }() diskBuffer := b.(*DiskBuffer) require.Equal(t, diskBuffer.atEnd, false) require.Len(t, diskBuffer.entryAdded, 1) diff --git a/operator/builtin/input/aws/cloudwatch/cloudwatch.go b/operator/builtin/input/aws/cloudwatch/cloudwatch.go index 921c88575..54430d390 100644 --- a/operator/builtin/input/aws/cloudwatch/cloudwatch.go +++ b/operator/builtin/input/aws/cloudwatch/cloudwatch.go @@ -10,7 +10,6 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/cloudwatchlogs" - "github.com/observiq/stanza/errors" "github.com/observiq/stanza/operator" "github.com/observiq/stanza/operator/helper" ) @@ -302,14 +301,14 @@ func (c *CloudwatchInput) filterLogEventsInputBuilder(nextToken string) cloudwat } // handleEvent is the handler for a AWS Cloudwatch Logs Filtered Event. -func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs.FilteredLogEvent) error { +func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs.FilteredLogEvent) { e := map[string]interface{}{ "message": event.Message, "ingestion_time": event.IngestionTime, } entry, err := c.NewEntry(e) if err != nil { - return errors.Wrap(err, "Failed to create new entry from record") + c.Errorf("Failed to create new entry from record: %s", err) } entry.AddResourceKey("log_group", c.logGroupName) @@ -327,15 +326,15 @@ func (c *CloudwatchInput) handleEvent(ctx context.Context, event *cloudwatchlogs c.Debugf("Writing start time %d to database", *event.IngestionTime) c.persist.Write(c.logGroupName, c.startTime) } - return nil } -func (c *CloudwatchInput) handleEvents(ctx context.Context, events []*cloudwatchlogs.FilteredLogEvent) error { +func (c *CloudwatchInput) handleEvents(ctx context.Context, events []*cloudwatchlogs.FilteredLogEvent) { for _, event := range events { c.handleEvent(ctx, event) } - c.persist.DB.Sync() - return nil + if err := c.persist.DB.Sync(); err != nil { + c.Errorf("Failed to sync offset database: %s", err) + } } // Returns time.Now() as Unix Time in Milliseconds diff --git a/operator/builtin/input/aws/cloudwatch/cloudwatch_persist_test.go b/operator/builtin/input/aws/cloudwatch/cloudwatch_persist_test.go index 90a68b8cb..387921745 100644 --- a/operator/builtin/input/aws/cloudwatch/cloudwatch_persist_test.go +++ b/operator/builtin/input/aws/cloudwatch/cloudwatch_persist_test.go @@ -25,6 +25,11 @@ func TestPersisterLoad(t *testing.T) { tempDir := testutil.NewTempDir(t) db, openDbErr := database.OpenDatabase(filepath.Join(tempDir, "test.db")) require.NoError(t, openDbErr) + defer func() { + if err := db.Close(); err != nil { + t.Error(err.Error()) + } + }() persister := Persister{ DB: helper.NewScopedDBPersister(db, "test"), } diff --git a/operator/builtin/input/file/file.go b/operator/builtin/input/file/file.go index 48869288f..c2b85820e 100644 --- a/operator/builtin/input/file/file.go +++ b/operator/builtin/input/file/file.go @@ -214,7 +214,7 @@ func (f *InputOperator) makeReaders(filePaths []string) []*Reader { } f.SeenPaths[path] = struct{}{} } - file, err := os.Open(path) + file, err := os.Open(path) // #nosec - operator must read in files defined by user if err != nil { f.Errorw("Failed to open file", zap.Error(err)) continue @@ -238,7 +238,9 @@ OUTER: for i := 0; i < len(fps); i++ { fp := fps[i] if len(fp.FirstBytes) == 0 { - files[i].Close() + if err := files[i].Close(); err != nil { + f.Errorf("problem closing file", "file", files[i].Name()) + } // Empty file, don't read it until we can compare its fingerprint fps = append(fps[:i], fps[i+1:]...) files = append(files[:i], files[i+1:]...) diff --git a/operator/builtin/input/journald/journald.go b/operator/builtin/input/journald/journald.go index cf808da97..eee33a4af 100644 --- a/operator/builtin/input/journald/journald.go +++ b/operator/builtin/input/journald/journald.go @@ -82,7 +82,8 @@ func (c JournaldInputConfig) Build(buildContext operator.BuildContext) ([]operat if cursor != nil { args = append(args, "--after-cursor", string(cursor)) } - return exec.CommandContext(ctx, "journalctl", args...) + return exec.CommandContext(ctx, "journalctl", args...) // #nosec - ... + // journalctl is an executable that is required for this operator to function }, json: jsoniter.ConfigFastest, } diff --git a/operator/builtin/input/tcp/tcp.go b/operator/builtin/input/tcp/tcp.go index 3a3a1d934..6125ee27e 100644 --- a/operator/builtin/input/tcp/tcp.go +++ b/operator/builtin/input/tcp/tcp.go @@ -158,7 +158,17 @@ func (t *TCPInput) configureListener() error { return nil } - config := tls.Config{Certificates: []tls.Certificate{t.tlsKeyPair}} + // TLS 1.0 is the package default since Go 1.2 + // https://golang.org/pkg/crypto/tls/ + // An issue has been filed to support modifyingn the minimum version + // https://github.com/observIQ/stanza/issues/349 + var tlsVersion uint16 = tls.VersionTLS10 + + // #nosec - Go defaults to TLS 1.0, and some users may require it + config := tls.Config{ + Certificates: []tls.Certificate{t.tlsKeyPair}, + MinVersion: tlsVersion, + } config.Time = func() time.Time { return time.Now() } config.Rand = rand.Reader diff --git a/operator/builtin/input/udp/udp.go b/operator/builtin/input/udp/udp.go index 68d050514..00cac513e 100644 --- a/operator/builtin/input/udp/udp.go +++ b/operator/builtin/input/udp/udp.go @@ -143,7 +143,9 @@ func (u *UDPInput) readMessage() (string, net.Addr, error) { // Stop will stop listening for udp messages. func (u *UDPInput) Stop() error { u.cancel() - u.connection.Close() + if err := u.connection.Close(); err != nil { + u.Errorf("failed to close connection, got error: %s", err) + } u.wg.Wait() return nil } diff --git a/operator/builtin/output/file/file.go b/operator/builtin/output/file/file.go index d74df7c68..729223d20 100644 --- a/operator/builtin/output/file/file.go +++ b/operator/builtin/output/file/file.go @@ -74,7 +74,7 @@ type FileOutput struct { // Start will open the output file. func (fo *FileOutput) Start() error { var err error - fo.file, err = os.OpenFile(fo.path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660) + fo.file, err = os.OpenFile(fo.path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600) if err != nil { return err } @@ -88,7 +88,7 @@ func (fo *FileOutput) Start() error { // Stop will close the output file. func (fo *FileOutput) Stop() error { if fo.file != nil { - fo.file.Close() + return fo.file.Close() } return nil } diff --git a/operator/builtin/output/forward/forward.go b/operator/builtin/output/forward/forward.go index b89af6d3c..85022eac3 100644 --- a/operator/builtin/output/forward/forward.go +++ b/operator/builtin/output/forward/forward.go @@ -165,10 +165,11 @@ func (f *ForwardOutput) handleResponse(res *http.Response) error { if err != nil { return errors.NewError("unexpected status code", "", "status", res.Status) } else { - res.Body.Close() + if err := res.Body.Close(); err != nil { + f.Errorf(err.Error()) + } return errors.NewError("unexpected status code", "", "status", res.Status, "body", string(body)) } } - res.Body.Close() - return nil + return res.Body.Close() } diff --git a/operator/builtin/output/newrelic/newrelic.go b/operator/builtin/output/newrelic/newrelic.go index 0dbc8ca64..4c5fa7345 100644 --- a/operator/builtin/output/newrelic/newrelic.go +++ b/operator/builtin/output/newrelic/newrelic.go @@ -237,10 +237,11 @@ func (nro *NewRelicOutput) handleResponse(res *http.Response) error { if err != nil { return errors.NewError("unexpected status code", "", "status", res.Status) } else { - res.Body.Close() + if err := res.Body.Close(); err != nil { + nro.Errorf(err.Error()) + } return errors.NewError("unexpected status code", "", "status", res.Status, "body", string(body)) } } - res.Body.Close() - return nil + return res.Body.Close() } diff --git a/operator/builtin/output/otlp/otlp.go b/operator/builtin/output/otlp/otlp.go index 4ea748db9..f36086fdb 100644 --- a/operator/builtin/output/otlp/otlp.go +++ b/operator/builtin/output/otlp/otlp.go @@ -181,10 +181,11 @@ func (o *OTLPOutput) handleResponse(res *http.Response) error { if err != nil { return errors.NewError("non-success status code", "", "status", fmt.Sprint(res.StatusCode)) } else { - res.Body.Close() + if err := res.Body.Close(); err != nil { + o.Errorf(err.Error()) + } return errors.NewError("non-success status code", "", "status", fmt.Sprint(res.StatusCode), "body", string(body)) } } - res.Body.Close() - return nil + return res.Body.Close() } diff --git a/operator/builtin/parser/time/time.go b/operator/builtin/parser/time/time.go index a424018f6..b1ac6f33f 100644 --- a/operator/builtin/parser/time/time.go +++ b/operator/builtin/parser/time/time.go @@ -58,6 +58,5 @@ func (t *TimeParserOperator) CanOutput() bool { // Process will parse time from an entry. func (t *TimeParserOperator) Process(ctx context.Context, entry *entry.Entry) error { - t.ProcessWith(ctx, entry, t.TimeParser.Parse) - return nil + return t.ProcessWith(ctx, entry, t.TimeParser.Parse) } diff --git a/operator/builtin/transformer/recombine/recombine.go b/operator/builtin/transformer/recombine/recombine.go index 2fed40a13..c3b4b298b 100644 --- a/operator/builtin/transformer/recombine/recombine.go +++ b/operator/builtin/transformer/recombine/recombine.go @@ -3,14 +3,15 @@ package recombine import ( "context" "fmt" + "strings" + "sync" + "time" + "github.com/antonmedv/expr" "github.com/antonmedv/expr/vm" "github.com/observiq/stanza/entry" "github.com/observiq/stanza/operator" "github.com/observiq/stanza/operator/helper" - "strings" - "sync" - "time" ) func init() { @@ -233,7 +234,10 @@ func (r *RecombineOperator) flushCombined() error { } // Set the recombined field on the entry - base.Set(r.combineField, recombined.String()) + err := base.Set(r.combineField, recombined.String()) + if err != nil { + return err + } r.Write(context.Background(), base) r.batch = r.batch[:0] diff --git a/operator/helper/operatortest/operatortest.go b/operator/helper/operatortest/operatortest.go index c48ef6939..fb7189a64 100644 --- a/operator/helper/operatortest/operatortest.go +++ b/operator/helper/operatortest/operatortest.go @@ -32,7 +32,7 @@ type ConfigUnmarshalTest struct { } func configFromFileViaYaml(file string, config interface{}) error { - bytes, err := ioutil.ReadFile(file) + bytes, err := ioutil.ReadFile(file) // #nosec - configs load based on user specified directory if err != nil { return fmt.Errorf("could not find config file: %s", err) } diff --git a/operator/helper/persister_test.go b/operator/helper/persister_test.go index e93247a55..b73a3d939 100644 --- a/operator/helper/persister_test.go +++ b/operator/helper/persister_test.go @@ -20,6 +20,11 @@ func TestPersisterCache(t *testing.T) { func TestPersisterLoad(t *testing.T) { tempDir := testutil.NewTempDir(t) db, err := database.OpenDatabase(filepath.Join(tempDir, "test.db")) + defer func() { + if err := db.Close(); err != nil { + t.Error(err.Error()) + } + }() persister := NewScopedDBPersister(db, "test") persister.Set("key", []byte("value")) diff --git a/plugin/plugin.go b/plugin/plugin.go index 94f81feb5..5d62f8576 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -177,7 +177,7 @@ func splitPluginFile(text []byte) (metadata, template []byte, err error) { // NewPluginFromFile builds a new plugin from a file func NewPluginFromFile(path string) (*Plugin, error) { - contents, err := ioutil.ReadFile(path) + contents, err := ioutil.ReadFile(path) // #nosec - configs load based on user specified directory if err != nil { return nil, fmt.Errorf("could not read plugin file: %s", err) } diff --git a/testutil/util.go b/testutil/util.go index ef34ddc58..ad3761c73 100644 --- a/testutil/util.go +++ b/testutil/util.go @@ -23,7 +23,9 @@ func NewTempDir(t testing.TB) string { } t.Cleanup(func() { - os.RemoveAll(tempDir) + if err := os.RemoveAll(tempDir); err != nil { + t.Errorf(err.Error()) + } }) return tempDir @@ -38,7 +40,9 @@ func NewTestDatabase(t testing.TB) *bbolt.DB { } t.Cleanup(func() { - os.RemoveAll(tempDir) + if err := os.RemoveAll(tempDir); err != nil { + t.Errorf(err.Error()) + } }) db, err := bbolt.Open(filepath.Join(tempDir, "test.db"), 0666, nil) @@ -48,7 +52,9 @@ func NewTestDatabase(t testing.TB) *bbolt.DB { } t.Cleanup(func() { - db.Close() + if err := db.Close(); err != nil { + t.Errorf(err.Error()) + } }) return db