diff --git a/operator/buffer/buffer.go b/operator/buffer/buffer.go index ee1326f98..d01c421f2 100644 --- a/operator/buffer/buffer.go +++ b/operator/buffer/buffer.go @@ -71,14 +71,17 @@ func (bc *Config) unmarshal(unmarshal func(interface{}) error) error { } } +// MarshalYAML marshals YAML func (bc Config) MarshalYAML() (interface{}, error) { return bc.Builder, nil } +// MarshalJSON marshals JSON func (bc Config) MarshalJSON() ([]byte, error) { return json.Marshal(bc.Builder) } +// Clearer is an interface that is responsible for clearing entries from a buffer type Clearer interface { MarkAllAsFlushed() error MarkRangeAsFlushed(uint, uint) error diff --git a/operator/buffer/disk.go b/operator/buffer/disk.go index 853952814..0d13cb428 100644 --- a/operator/buffer/disk.go +++ b/operator/buffer/disk.go @@ -316,24 +316,28 @@ func (d *DiskBuffer) Read(dst []*entry.Entry) (f Clearer, i int, err error) { return d.newClearer(newRead), readCount, nil } +// MaxChunkSize returns the max chunk size func (d *DiskBuffer) MaxChunkSize() uint { d.reconfigMutex.RLock() defer d.reconfigMutex.RUnlock() return d.maxChunkSize } +// MaxChunkDelay returns the max chunk delay func (d *DiskBuffer) MaxChunkDelay() time.Duration { d.reconfigMutex.RLock() defer d.reconfigMutex.RUnlock() return d.maxChunkDelay } +// SetMaxChunkSize sets the max chunk size func (d *DiskBuffer) SetMaxChunkSize(size uint) { d.reconfigMutex.Lock() d.maxChunkSize = size d.reconfigMutex.Unlock() } +// SetMaxChunkDelay sets the max chunk delay func (d *DiskBuffer) SetMaxChunkDelay(delay time.Duration) { d.reconfigMutex.Lock() d.maxChunkDelay = delay diff --git a/operator/buffer/memory.go b/operator/buffer/memory.go index e1dc667b3..625047049 100644 --- a/operator/buffer/memory.go +++ b/operator/buffer/memory.go @@ -146,24 +146,28 @@ func (m *MemoryBuffer) ReadWait(ctx context.Context, dst []*entry.Entry) (Cleare return m.newClearer(inFlightIDs[:i]), i, nil } +// MaxChunkSize returns the max chunk size func (m *MemoryBuffer) MaxChunkSize() uint { m.reconfigMutex.RLock() defer m.reconfigMutex.RUnlock() return m.maxChunkSize } +// MaxChunkDelay returns the max chunk delay func (m *MemoryBuffer) MaxChunkDelay() time.Duration { m.reconfigMutex.RLock() defer m.reconfigMutex.RUnlock() return m.maxChunkDelay } +// SetMaxChunkSize sets the max chunk size func (m *MemoryBuffer) SetMaxChunkSize(size uint) { m.reconfigMutex.Lock() m.maxChunkSize = size m.reconfigMutex.Unlock() } +// SetMaxChunkDelay sets the max chunk delay func (m *MemoryBuffer) SetMaxChunkDelay(delay time.Duration) { m.reconfigMutex.Lock() m.maxChunkDelay = delay diff --git a/operator/builtin/input/aws/cloudwatch/cloudwatch_persist.go b/operator/builtin/input/aws/cloudwatch/cloudwatch_persist.go index 8d013007e..1fe88edd4 100644 --- a/operator/builtin/input/aws/cloudwatch/cloudwatch_persist.go +++ b/operator/builtin/input/aws/cloudwatch/cloudwatch_persist.go @@ -7,11 +7,12 @@ import ( "github.com/observiq/stanza/operator/helper" ) +// Persister ensures data is persisted across shutdowns type Persister struct { DB helper.Persister } -// Helper function to get persisted data +// Read is a helper function to get persisted data func (p *Persister) Read(key string) (int64, error) { var startTime int64 buffer := bytes.NewBuffer(p.DB.Get(key)) @@ -22,7 +23,7 @@ func (p *Persister) Read(key string) (int64, error) { return startTime, nil } -// Helper function to set persisted data +// Write is a helper function to set persisted data func (p *Persister) Write(key string, value int64) { var buf = make([]byte, 8) binary.BigEndian.PutUint64(buf, uint64(value)) diff --git a/operator/builtin/input/azure/event_hub.go b/operator/builtin/input/azure/event_hub.go index fed30d4e8..6d4faff49 100644 --- a/operator/builtin/input/azure/event_hub.go +++ b/operator/builtin/input/azure/event_hub.go @@ -9,7 +9,7 @@ import ( "go.uber.org/zap" ) -// Eventhub provides methods for reading events from Azure Event Hub. +// EventHub provides methods for reading events from Azure Event Hub. type EventHub struct { AzureConfig Persist *Persister diff --git a/operator/builtin/input/azure/event_hub_config.go b/operator/builtin/input/azure/event_hub_config.go index 6789e8765..61cfaa982 100644 --- a/operator/builtin/input/azure/event_hub_config.go +++ b/operator/builtin/input/azure/event_hub_config.go @@ -24,6 +24,7 @@ type AzureConfig struct { startAtBeginning bool } +// Build will build an Azure EventHub input operator func (a *AzureConfig) Build(buildContext operator.BuildContext, input helper.InputConfig) error { inputOperator, err := input.Build(buildContext) if err != nil { diff --git a/operator/builtin/input/file/finder.go b/operator/builtin/input/file/finder.go index ab54959d3..bfaa6bc23 100644 --- a/operator/builtin/input/file/finder.go +++ b/operator/builtin/input/file/finder.go @@ -18,6 +18,7 @@ import ( "github.com/bmatcuk/doublestar/v3" ) +// Finder is responsible for find files according to the include and exclude rules type Finder struct { Include []string `mapstructure:"include,omitempty" json:"include,omitempty" yaml:"include,omitempty"` Exclude []string `mapstructure:"exclude,omitempty" json:"exclude,omitempty" yaml:"exclude,omitempty"` diff --git a/operator/builtin/input/http/config.go b/operator/builtin/input/http/config.go index c48fbc9f1..b3894f9b1 100644 --- a/operator/builtin/input/http/config.go +++ b/operator/builtin/input/http/config.go @@ -14,8 +14,13 @@ import ( ) const ( - DefaultTimeout = time.Second * 20 + // DefaultTimeout is the default timeout for reads and writes + DefaultTimeout = time.Second * 20 + + // DefaultIdleTimeout default timeout for idle DefaultIdleTimeout = time.Second * 60 + + // DefaultMaxBodySize default maximum body size. DefaultMaxBodySize = 10000000 // 10 megabyte ) diff --git a/operator/builtin/input/journald/journald.go b/operator/builtin/input/journald/journald.go index 23ac48c8b..b98d40c3b 100644 --- a/operator/builtin/input/journald/journald.go +++ b/operator/builtin/input/journald/journald.go @@ -1,3 +1,4 @@ +//go:build linux // +build linux package journald @@ -25,6 +26,7 @@ func init() { operator.Register("journald_input", func() operator.Builder { return NewJournaldInputConfig("") }) } +// NewJournaldInputConfig creates a new config for Journald Input func NewJournaldInputConfig(operatorID string) *JournaldInputConfig { return &JournaldInputConfig{ InputConfig: helper.NewInputConfig(operatorID, "journald_input"), diff --git a/operator/builtin/parser/csv/csv.go b/operator/builtin/parser/csv/csv.go index 06bed1747..b42f20463 100644 --- a/operator/builtin/parser/csv/csv.go +++ b/operator/builtin/parser/csv/csv.go @@ -116,6 +116,7 @@ func (r *CSVParser) Process(ctx context.Context, e *entry.Entry) error { return r.ParserOperator.ProcessWith(ctx, e, r.parse) } +// ParseFunc is the function that will parse the log entry by CSV type ParseFunc func(interface{}) (interface{}, error) // generateParseFunc returns a parse function for a given header, allowing diff --git a/operator/builtin/transformer/recombine/recombine.go b/operator/builtin/transformer/recombine/recombine.go index d040e84f7..ee2c8fc1d 100644 --- a/operator/builtin/transformer/recombine/recombine.go +++ b/operator/builtin/transformer/recombine/recombine.go @@ -109,10 +109,12 @@ type RecombineOperator struct { batch []*entry.Entry } +// Start will start the processing log entries func (r *RecombineOperator) Start() error { return nil } +// Stop will stop processing log entries func (r *RecombineOperator) Stop() error { r.Lock() defer r.Unlock() @@ -123,6 +125,7 @@ func (r *RecombineOperator) Stop() error { return nil } +// Process processes a log entry to be combined func (r *RecombineOperator) Process(ctx context.Context, e *entry.Entry) error { // Lock the recombine operator because process can't run concurrently r.Lock() diff --git a/operator/helper/bytesize.go b/operator/helper/bytesize.go index 980b6e9de..c23d0a688 100644 --- a/operator/helper/bytesize.go +++ b/operator/helper/bytesize.go @@ -8,14 +8,17 @@ import ( "strings" ) +// ByteSize type for modeling Byte sizes type ByteSize int64 +// UnmarshalJSON unmarshals JSON func (h *ByteSize) UnmarshalJSON(raw []byte) error { return h.unmarshalShared(func(i interface{}) error { return json.Unmarshal(raw, &i) }) } +// UnmarshalYAML unmarshals YAML func (h *ByteSize) UnmarshalYAML(unmarshal func(interface{}) error) error { return h.unmarshalShared(unmarshal) } diff --git a/operator/helper/encoding.go b/operator/helper/encoding.go index 72f248a07..79b616427 100644 --- a/operator/helper/encoding.go +++ b/operator/helper/encoding.go @@ -52,6 +52,7 @@ func (c EncodingConfig) Build(_ operator.BuildContext) (Encoding, error) { }, nil } +// Encoding represents an text encoding type Encoding struct { Encoding encoding.Encoding } diff --git a/operator/helper/multiline.go b/operator/helper/multiline.go index 7dfa2d4e3..86dc1cfdc 100644 --- a/operator/helper/multiline.go +++ b/operator/helper/multiline.go @@ -11,7 +11,7 @@ import ( "golang.org/x/text/encoding" ) -// NewBasicConfig creates a new Multiline config +// NewMultilineConfig creates a new Multiline config func NewMultilineConfig() MultilineConfig { return MultilineConfig{ LineStartPattern: "", diff --git a/operator/helper/parser.go b/operator/helper/parser.go index 7a6995501..88730155d 100644 --- a/operator/helper/parser.go +++ b/operator/helper/parser.go @@ -76,6 +76,7 @@ func (p *ParserOperator) ProcessWith(ctx context.Context, entry *entry.Entry, pa return p.ProcessWithCallback(ctx, entry, parse, nil) } +// ProcessWithCallback processes and entry and executes the call back func (p *ParserOperator) ProcessWithCallback(ctx context.Context, entry *entry.Entry, parse ParseFunction, cb func(*entry.Entry) error) error { // Short circuit if the "if" condition does not match skip, err := p.Skip(ctx, entry) diff --git a/operator/helper/transformer.go b/operator/helper/transformer.go index 12d2a0513..316290fc8 100644 --- a/operator/helper/transformer.go +++ b/operator/helper/transformer.go @@ -100,6 +100,7 @@ func (t *TransformerOperator) HandleEntryError(ctx context.Context, entry *entry return err } +// Skip if entry doesn't match expression func (t *TransformerOperator) Skip(_ context.Context, entry *entry.Entry) (bool, error) { if t.IfExpr == nil { return false, nil diff --git a/operator/helper/writer.go b/operator/helper/writer.go index 9c14dc4da..641acddcf 100644 --- a/operator/helper/writer.go +++ b/operator/helper/writer.go @@ -108,6 +108,7 @@ func (w *WriterOperator) findOperator(operators []operator.Operator, operatorID // OutputIDs is a collection of operator IDs used as outputs. type OutputIDs []string +// WithNamespace adds namespace func (o OutputIDs) WithNamespace(bc operator.BuildContext) OutputIDs { namespacedIDs := make([]string, 0, len(o)) for _, id := range o { diff --git a/plugin/config.go b/plugin/config.go index 242d7a810..1b94df532 100644 --- a/plugin/config.go +++ b/plugin/config.go @@ -22,7 +22,7 @@ type Config struct { Parameters map[string]interface{} `json:",squash" yaml:",squash"` } -// BuildMulti implements operator.MultiBuilder +// Build implements operator.MultiBuilder func (c *Config) Build(bc operator.BuildContext) ([]operator.Operator, error) { if bc.PluginDepth > 10 { return nil, errors.NewError("reached max plugin depth", "ensure that there are no recursive dependencies in plugins") @@ -71,6 +71,7 @@ func (c *Config) yamlOutputs(bc operator.BuildContext) string { return fmt.Sprintf("[%s]", strings.Join(namespacedOutputs, ",")) } +// UnmarshalJSON unmarshals JSON func (c *Config) UnmarshalJSON(raw []byte) error { var m map[string]interface{} if err := json.Unmarshal(raw, &m); err != nil { diff --git a/revive/config.toml b/revive/config.toml index a6e4699ec..d6c275521 100644 --- a/revive/config.toml +++ b/revive/config.toml @@ -27,3 +27,5 @@ warningCode = 1 [rule.unused-parameter] [rule.unreachable-code] [rule.redefines-builtin-id] +[rule.exported] + arguments=["disableStutteringCheck"] diff --git a/version/version.go b/version/version.go index e0710fd3e..02b1e116a 100644 --- a/version/version.go +++ b/version/version.go @@ -3,8 +3,11 @@ package version import "fmt" var ( + // GitCommit set externally of the git commit this was built on GitCommit string - GitTag string + + // GitTag set externally of the git tag this was built on + GitTag string ) // GetVersion returns the version of the stanza library