Skip to content

Commit

Permalink
Add Counter Output Operator (#570)
Browse files Browse the repository at this point in the history
* add counter_output operator

* appease linter for the time being

* fix big int addition

* add test cases

* add doc page for count_output

* change to count_output

* fix init common

* apply PR feedback and add timestamp to count output messages

* non-pythonic increment :)

* try to fix race condition in test case

* update docs to include timestamp for count_output

* remove comma

* doc update

* fix race condition in adding to counter by using atomic.addUint64

* load count atomically as well

* access once using atomic
  • Loading branch information
schmikei authored Feb 23, 2022
1 parent 5ff3f35 commit dbd3ef0
Show file tree
Hide file tree
Showing 4 changed files with 366 additions and 0 deletions.
1 change: 1 addition & 0 deletions cmd/stanza/init_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
_ "github.com/observiq/stanza/operator/builtin/transformer/retain"
_ "github.com/observiq/stanza/operator/builtin/transformer/router"

_ "github.com/observiq/stanza/operator/builtin/output/count"
_ "github.com/observiq/stanza/operator/builtin/output/drop"
_ "github.com/observiq/stanza/operator/builtin/output/elastic"
_ "github.com/observiq/stanza/operator/builtin/output/file"
Expand Down
77 changes: 77 additions & 0 deletions docs/operators/count_output.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
## `count_output` operator

The `count_output` operator prints lines of encoded json to stdout or a file detailing the number of entries the output operator has gotten since stanza started running.

Count Information has this current JSON representation
```json
{
"entries": <number of entries this operator has received>,
"elapsedMinutes": <number of minutes stanza has been running since the start of this operator>,
"entries/minute": <number of entries per minute the output operator received>,
"timestamp": <current time that this message is being recorded formatted in RFC 3339>
}
```

### Configuration Fields

| Field | Default | Description |
| ---------- | -------------- | ---------------------------------------------------------------------------------------------------------------- |
| `id` | `count_output` | A unique identifier for the operator |
| `path` | | A file path to write the count information. If no path is provided then count information is outputted to stdout |
| `duration` | `1m` | The frequency of when to output the count information |

### Example Configurations

Configuration

```yaml
pipeline:
- type: generate_input
count: 500
- type: count_output
```
#### Counting 500 generated lines printed to stdout:
`./stanza -c ./config.yaml`

```json
{"level":"info","timestamp":"2021-08-20T20:09:55.057-0400","message":"Starting stanza agent"}
{"level":"info","timestamp":"2021-08-20T20:09:55.057-0400","message":"Stanza agent started"}
{"entries":500,"elapsedMinutes":2,"entries/minute":250, "timestamp":"2021-08-20T20:09:55.057-0400"}
```

#### Configuration going to file:
```yaml
pipeline:
- type: generate_input
count: 500
- type: count_output
path: ./count.json
```

`./stanza -c ./config.yml`
> no output
```json
{"level":"info","timestamp":"2021-08-20T20:09:28.314-0400","message":"Starting stanza agent"}
{"level":"info","timestamp":"2021-08-20T20:09:28.314-0400","message":"Stanza agent started"}
```

Printing out results of specified file:
```sh
> cat count.json | jq
```
```json
{
"entries": 500,
"elapsedMinutes": 1,
"entries/minute": 500,
"timestamp": "2021-08-20T20:09:28.314-0400"
},
{
"entries": 500,
"elapsedMinutes": 2,
"entries/minute": 250,
"timestamp": "2021-08-20T20:09:29.414-0400"
}
```
159 changes: 159 additions & 0 deletions operator/builtin/output/count/count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package counter

import (
"context"
"encoding/json"
"fmt"
"math"
"os"
"sync"
"sync/atomic"
"time"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/operator/helper"
)

// CountOutputConfig is the configuration of a count output operator.
type CountOutputConfig struct {
helper.OutputConfig `yaml:",inline"`
Path string `json:"path,omitempty" yaml:"path,omitempty"`
Duration helper.Duration `json:"duration,omitempty" yaml:"duration,omitempty"`
}

var defaultCounterDuration = helper.NewDuration(1 * time.Minute)

func init() {
operator.Register("count_output", func() operator.Builder { return NewCounterOutputConfig("") })
}

// NewCounterOutputConfig creates the default config for the count_output operator.
func NewCounterOutputConfig(operatorID string) *CountOutputConfig {
return &CountOutputConfig{
OutputConfig: helper.NewOutputConfig(operatorID, "count_output"),
Duration: defaultCounterDuration,
}
}

// Build will build an instance of the count_output operator
func (c CountOutputConfig) Build(bc operator.BuildContext) ([]operator.Operator, error) {
outputOperator, err := c.OutputConfig.Build(bc)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
counterOperator := &CountOutput{
OutputOperator: outputOperator,
ctx: ctx,
cancel: cancel,
interval: c.Duration.Raw(),
path: c.Path,
numEntries: 0,
wg: sync.WaitGroup{},
}

return []operator.Operator{counterOperator}, nil
}

// CountOutput is an output operator
type CountOutput struct {
helper.OutputOperator
ctx context.Context
interval time.Duration
start time.Time
path string
file *os.File
encoder *json.Encoder
wg sync.WaitGroup
cancel context.CancelFunc

numEntries uint64
}

// Process increments the counter of the output operator
func (co *CountOutput) Process(_ context.Context, _ *entry.Entry) error {
atomic.AddUint64(&co.numEntries, 1)
return nil
}

// Start begins messaging count output to either stdout or a file
func (co *CountOutput) Start() error {
err := co.determineOutput()
if err != nil {
return err
}

co.start = time.Now()
co.wg.Add(1)
go co.startCounting()

return nil
}

// Stop tells the CountOutput to stop gracefully
func (co *CountOutput) Stop() error {
co.cancel()
co.wg.Wait()
if co.file != nil {
return co.file.Close()
}
return nil
}

func (co *CountOutput) startCounting() {
defer co.wg.Done()

ticker := time.NewTicker(co.interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
case <-co.ctx.Done():
return
}

err := co.logCount()
if err != nil {
return
}
}
}

type countObject struct {
Entries uint64 `json:"entries"`
ElapsedMinutes float64 `json:"elapsedMinutes"`
EntriesPerMinute float64 `json:"entries/minute"`
Timestamp string `json:"timestamp"`
}

func (co *CountOutput) logCount() error {
now := time.Now()
numEntries := atomic.LoadUint64(&co.numEntries)
elapsedMinutes := now.Sub(co.start).Minutes()
entriesPerMinute := float64(numEntries) / math.Max(elapsedMinutes, 1)
msg := &countObject{
Entries: numEntries,
ElapsedMinutes: elapsedMinutes,
EntriesPerMinute: entriesPerMinute,
Timestamp: now.Format(time.RFC3339),
}
return co.encoder.Encode(msg)
}

func (co *CountOutput) determineOutput() error {
if co.path == "" {
co.encoder = json.NewEncoder(os.Stdout)
return nil
}

file, err := os.OpenFile(co.path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
return fmt.Errorf("unable to write counter info to file located at %s: %w", co.path, err)
}
co.file = file
co.encoder = json.NewEncoder(file)
return nil
}
129 changes: 129 additions & 0 deletions operator/builtin/output/count/count_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package counter

import (
"context"
"encoding/json"
"io/ioutil"
"os"
"testing"
"time"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator/helper"
"github.com/observiq/stanza/testutil"
"github.com/stretchr/testify/require"
)

func TestBuildValid(t *testing.T) {
cfg := NewCounterOutputConfig("test")
ctx := testutil.NewBuildContext(t)
ops, err := cfg.Build(ctx)
require.NoError(t, err)
op := ops[0]
require.IsType(t, &CountOutput{}, op)
}

func TestBuildInvalid(t *testing.T) {
cfg := NewCounterOutputConfig("test")
ctx := testutil.NewBuildContext(t)
ctx.Logger = nil
_, err := cfg.Build(ctx)
require.Error(t, err)
require.Contains(t, err.Error(), "build context is missing a logger")
}

func TestFileCounterOutput(t *testing.T) {
cfg := NewCounterOutputConfig("test")

tmpFile, err := ioutil.TempFile("", "")
require.NoError(t, err)
defer os.Remove(tmpFile.Name())
cfg.Path = tmpFile.Name()
cfg.Duration = helper.NewDuration(1 * time.Second)

ctx := testutil.NewBuildContext(t)
ops, err := cfg.Build(ctx)
require.NoError(t, err)

counterOutput := ops[0].(*CountOutput)

err = counterOutput.Start()
require.NoError(t, err)
defer func() {
err := counterOutput.Stop()
require.NoError(t, err)
}()

e := entry.New()
err = counterOutput.Process(context.Background(), e)
require.NoError(t, err)
require.Equal(t, counterOutput.numEntries, uint64(1))

stat, err := os.Stat(tmpFile.Name())
require.NoError(t, err)

intialSize := stat.Size()

to, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-to.Done():
require.FailNow(t, "timed out waiting for file to be written to")
case <-ticker.C:
}
size, err := os.Stat(tmpFile.Name())
require.NoError(t, err)
if size.Size() != intialSize {
break
}
}

content, err := ioutil.ReadFile(tmpFile.Name())
require.NoError(t, err)

var object countObject
err = json.Unmarshal(content, &object)
require.NoError(t, err)

require.Equal(t, object.Entries, uint64(1))
require.GreaterOrEqual(t, object.EntriesPerMinute, 0.0)
require.GreaterOrEqual(t, object.ElapsedMinutes, 0.0)
}

func TestStartStdout(t *testing.T) {
cfg := NewCounterOutputConfig("test")

ctx := testutil.NewBuildContext(t)
ops, err := cfg.Build(ctx)
require.NoError(t, err)

counterOutput := ops[0].(*CountOutput)

err = counterOutput.Start()
defer func() {
err := counterOutput.Stop()
require.NoError(t, err)
}()
require.NoError(t, err)
}

func TestStartFailure(t *testing.T) {
cfg := NewCounterOutputConfig("test")
cfg.Path = "/a/path/to/a/nonexistent/file/hopefully"

ctx := testutil.NewBuildContext(t)
ops, err := cfg.Build(ctx)
require.NoError(t, err)

counterOutput := ops[0].(*CountOutput)

err = counterOutput.Start()
defer func() {
err := counterOutput.Stop()
require.NoError(t, err)
}()
require.Error(t, err)
require.Contains(t, err.Error(), "unable to write counter info to file")
}

0 comments on commit dbd3ef0

Please sign in to comment.