From dbd3ef0575150d187481d9cf96b4acb8af568164 Mon Sep 17 00:00:00 2001 From: Keith Schmitt <32067685+schmikei@users.noreply.github.com> Date: Wed, 23 Feb 2022 11:41:19 -0500 Subject: [PATCH] Add Counter Output Operator (#570) * add counter_output operator * appease linter for the time being * fix big int addition * add test cases * add doc page for count_output * change to count_output * fix init common * apply PR feedback and add timestamp to count output messages * non-pythonic increment :) * try to fix race condition in test case * update docs to include timestamp for count_output * remove comma * doc update * fix race condition in adding to counter by using atomic.addUint64 * load count atomically as well * access once using atomic --- cmd/stanza/init_common.go | 1 + docs/operators/count_output.md | 77 ++++++++++ operator/builtin/output/count/count.go | 159 ++++++++++++++++++++ operator/builtin/output/count/count_test.go | 129 ++++++++++++++++ 4 files changed, 366 insertions(+) create mode 100644 docs/operators/count_output.md create mode 100644 operator/builtin/output/count/count.go create mode 100644 operator/builtin/output/count/count_test.go diff --git a/cmd/stanza/init_common.go b/cmd/stanza/init_common.go index 054aaed69..7600b598e 100644 --- a/cmd/stanza/init_common.go +++ b/cmd/stanza/init_common.go @@ -42,6 +42,7 @@ import ( _ "github.com/observiq/stanza/operator/builtin/transformer/retain" _ "github.com/observiq/stanza/operator/builtin/transformer/router" + _ "github.com/observiq/stanza/operator/builtin/output/count" _ "github.com/observiq/stanza/operator/builtin/output/drop" _ "github.com/observiq/stanza/operator/builtin/output/elastic" _ "github.com/observiq/stanza/operator/builtin/output/file" diff --git a/docs/operators/count_output.md b/docs/operators/count_output.md new file mode 100644 index 000000000..c1409f0b0 --- /dev/null +++ b/docs/operators/count_output.md @@ -0,0 +1,77 @@ +## `count_output` operator + +The `count_output` operator prints lines of encoded json to stdout or a file detailing the number of entries the output operator has gotten since stanza started running. + +Count Information has this current JSON representation +```json +{ + "entries": , + "elapsedMinutes": , + "entries/minute": , + "timestamp": +} +``` + +### Configuration Fields + +| Field | Default | Description | +| ---------- | -------------- | ---------------------------------------------------------------------------------------------------------------- | +| `id` | `count_output` | A unique identifier for the operator | +| `path` | | A file path to write the count information. If no path is provided then count information is outputted to stdout | +| `duration` | `1m` | The frequency of when to output the count information | + +### Example Configurations + +Configuration + +```yaml +pipeline: + - type: generate_input + count: 500 + - type: count_output +``` + +#### Counting 500 generated lines printed to stdout: + +`./stanza -c ./config.yaml` + +```json +{"level":"info","timestamp":"2021-08-20T20:09:55.057-0400","message":"Starting stanza agent"} +{"level":"info","timestamp":"2021-08-20T20:09:55.057-0400","message":"Stanza agent started"} +{"entries":500,"elapsedMinutes":2,"entries/minute":250, "timestamp":"2021-08-20T20:09:55.057-0400"} +``` + +#### Configuration going to file: +```yaml +pipeline: + - type: generate_input + count: 500 + - type: count_output + path: ./count.json +``` + +`./stanza -c ./config.yml` +> no output +```json +{"level":"info","timestamp":"2021-08-20T20:09:28.314-0400","message":"Starting stanza agent"} +{"level":"info","timestamp":"2021-08-20T20:09:28.314-0400","message":"Stanza agent started"} +``` + +Printing out results of specified file: +```sh +> cat count.json | jq +``` +```json +{ + "entries": 500, + "elapsedMinutes": 1, + "entries/minute": 500, + "timestamp": "2021-08-20T20:09:28.314-0400" +}, +{ + "entries": 500, + "elapsedMinutes": 2, + "entries/minute": 250, + "timestamp": "2021-08-20T20:09:29.414-0400" +} +``` \ No newline at end of file diff --git a/operator/builtin/output/count/count.go b/operator/builtin/output/count/count.go new file mode 100644 index 000000000..9d69aa866 --- /dev/null +++ b/operator/builtin/output/count/count.go @@ -0,0 +1,159 @@ +package counter + +import ( + "context" + "encoding/json" + "fmt" + "math" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/observiq/stanza/entry" + "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" +) + +// CountOutputConfig is the configuration of a count output operator. +type CountOutputConfig struct { + helper.OutputConfig `yaml:",inline"` + Path string `json:"path,omitempty" yaml:"path,omitempty"` + Duration helper.Duration `json:"duration,omitempty" yaml:"duration,omitempty"` +} + +var defaultCounterDuration = helper.NewDuration(1 * time.Minute) + +func init() { + operator.Register("count_output", func() operator.Builder { return NewCounterOutputConfig("") }) +} + +// NewCounterOutputConfig creates the default config for the count_output operator. +func NewCounterOutputConfig(operatorID string) *CountOutputConfig { + return &CountOutputConfig{ + OutputConfig: helper.NewOutputConfig(operatorID, "count_output"), + Duration: defaultCounterDuration, + } +} + +// Build will build an instance of the count_output operator +func (c CountOutputConfig) Build(bc operator.BuildContext) ([]operator.Operator, error) { + outputOperator, err := c.OutputConfig.Build(bc) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(context.Background()) + counterOperator := &CountOutput{ + OutputOperator: outputOperator, + ctx: ctx, + cancel: cancel, + interval: c.Duration.Raw(), + path: c.Path, + numEntries: 0, + wg: sync.WaitGroup{}, + } + + return []operator.Operator{counterOperator}, nil +} + +// CountOutput is an output operator +type CountOutput struct { + helper.OutputOperator + ctx context.Context + interval time.Duration + start time.Time + path string + file *os.File + encoder *json.Encoder + wg sync.WaitGroup + cancel context.CancelFunc + + numEntries uint64 +} + +// Process increments the counter of the output operator +func (co *CountOutput) Process(_ context.Context, _ *entry.Entry) error { + atomic.AddUint64(&co.numEntries, 1) + return nil +} + +// Start begins messaging count output to either stdout or a file +func (co *CountOutput) Start() error { + err := co.determineOutput() + if err != nil { + return err + } + + co.start = time.Now() + co.wg.Add(1) + go co.startCounting() + + return nil +} + +// Stop tells the CountOutput to stop gracefully +func (co *CountOutput) Stop() error { + co.cancel() + co.wg.Wait() + if co.file != nil { + return co.file.Close() + } + return nil +} + +func (co *CountOutput) startCounting() { + defer co.wg.Done() + + ticker := time.NewTicker(co.interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + case <-co.ctx.Done(): + return + } + + err := co.logCount() + if err != nil { + return + } + } +} + +type countObject struct { + Entries uint64 `json:"entries"` + ElapsedMinutes float64 `json:"elapsedMinutes"` + EntriesPerMinute float64 `json:"entries/minute"` + Timestamp string `json:"timestamp"` +} + +func (co *CountOutput) logCount() error { + now := time.Now() + numEntries := atomic.LoadUint64(&co.numEntries) + elapsedMinutes := now.Sub(co.start).Minutes() + entriesPerMinute := float64(numEntries) / math.Max(elapsedMinutes, 1) + msg := &countObject{ + Entries: numEntries, + ElapsedMinutes: elapsedMinutes, + EntriesPerMinute: entriesPerMinute, + Timestamp: now.Format(time.RFC3339), + } + return co.encoder.Encode(msg) +} + +func (co *CountOutput) determineOutput() error { + if co.path == "" { + co.encoder = json.NewEncoder(os.Stdout) + return nil + } + + file, err := os.OpenFile(co.path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600) + if err != nil { + return fmt.Errorf("unable to write counter info to file located at %s: %w", co.path, err) + } + co.file = file + co.encoder = json.NewEncoder(file) + return nil +} diff --git a/operator/builtin/output/count/count_test.go b/operator/builtin/output/count/count_test.go new file mode 100644 index 000000000..baf77b66e --- /dev/null +++ b/operator/builtin/output/count/count_test.go @@ -0,0 +1,129 @@ +package counter + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/observiq/stanza/entry" + "github.com/observiq/stanza/operator/helper" + "github.com/observiq/stanza/testutil" + "github.com/stretchr/testify/require" +) + +func TestBuildValid(t *testing.T) { + cfg := NewCounterOutputConfig("test") + ctx := testutil.NewBuildContext(t) + ops, err := cfg.Build(ctx) + require.NoError(t, err) + op := ops[0] + require.IsType(t, &CountOutput{}, op) +} + +func TestBuildInvalid(t *testing.T) { + cfg := NewCounterOutputConfig("test") + ctx := testutil.NewBuildContext(t) + ctx.Logger = nil + _, err := cfg.Build(ctx) + require.Error(t, err) + require.Contains(t, err.Error(), "build context is missing a logger") +} + +func TestFileCounterOutput(t *testing.T) { + cfg := NewCounterOutputConfig("test") + + tmpFile, err := ioutil.TempFile("", "") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + cfg.Path = tmpFile.Name() + cfg.Duration = helper.NewDuration(1 * time.Second) + + ctx := testutil.NewBuildContext(t) + ops, err := cfg.Build(ctx) + require.NoError(t, err) + + counterOutput := ops[0].(*CountOutput) + + err = counterOutput.Start() + require.NoError(t, err) + defer func() { + err := counterOutput.Stop() + require.NoError(t, err) + }() + + e := entry.New() + err = counterOutput.Process(context.Background(), e) + require.NoError(t, err) + require.Equal(t, counterOutput.numEntries, uint64(1)) + + stat, err := os.Stat(tmpFile.Name()) + require.NoError(t, err) + + intialSize := stat.Size() + + to, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + ticker := time.NewTicker(100 * time.Millisecond) + for { + select { + case <-to.Done(): + require.FailNow(t, "timed out waiting for file to be written to") + case <-ticker.C: + } + size, err := os.Stat(tmpFile.Name()) + require.NoError(t, err) + if size.Size() != intialSize { + break + } + } + + content, err := ioutil.ReadFile(tmpFile.Name()) + require.NoError(t, err) + + var object countObject + err = json.Unmarshal(content, &object) + require.NoError(t, err) + + require.Equal(t, object.Entries, uint64(1)) + require.GreaterOrEqual(t, object.EntriesPerMinute, 0.0) + require.GreaterOrEqual(t, object.ElapsedMinutes, 0.0) +} + +func TestStartStdout(t *testing.T) { + cfg := NewCounterOutputConfig("test") + + ctx := testutil.NewBuildContext(t) + ops, err := cfg.Build(ctx) + require.NoError(t, err) + + counterOutput := ops[0].(*CountOutput) + + err = counterOutput.Start() + defer func() { + err := counterOutput.Stop() + require.NoError(t, err) + }() + require.NoError(t, err) +} + +func TestStartFailure(t *testing.T) { + cfg := NewCounterOutputConfig("test") + cfg.Path = "/a/path/to/a/nonexistent/file/hopefully" + + ctx := testutil.NewBuildContext(t) + ops, err := cfg.Build(ctx) + require.NoError(t, err) + + counterOutput := ops[0].(*CountOutput) + + err = counterOutput.Start() + defer func() { + err := counterOutput.Stop() + require.NoError(t, err) + }() + require.Error(t, err) + require.Contains(t, err.Error(), "unable to write counter info to file") +}