Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolasparada committed Sep 29, 2023
1 parent 3be8ca6 commit 22c68f7
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 15 deletions.
18 changes: 12 additions & 6 deletions plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func testPlugin(t *testing.T, pool *dockertest.Pool) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
t.Cleanup(cancel)

go func() {
go func(t *testing.T) {
for {
if ctx.Err() != nil {
return
Expand All @@ -139,10 +139,11 @@ func testPlugin(t *testing.T, pool *dockertest.Pool) {
}

var got struct {
Foo string `json:"foo"`
Message string `json:"message"`
TmplOut string `json:"tmpl_out"`
MultilineSplit []string `json:"multiline_split"`
Foo string `json:"foo"`
Message string `json:"message"`
TmplOut string `json:"tmpl_out"`
MultilineSplit []string `json:"multiline_split"`
Took time.Duration `json:"took"`
}

err := json.Unmarshal([]byte(line), &got)
Expand All @@ -152,13 +153,18 @@ func testPlugin(t *testing.T, pool *dockertest.Pool) {
assert.Equal(t, "inside double quotes\nnew line", got.TmplOut)
assert.Equal(t, []string{"foo", "bar"}, got.MultilineSplit)

if got.Took > time.Second {
t.Errorf("send many took %s, expected less than 1s", got.Took)
return
}

t.Logf("took %s", time.Since(start))

cancel()
return
}
}
}()
}(t)

<-ctx.Done()

Expand Down
24 changes: 15 additions & 9 deletions testdata/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (plug *inputPlugin) Init(ctx context.Context, fbit *plugin.Fluentbit) error
return nil
}

func (plug inputPlugin) Collect(ctx context.Context, ch chan<- plugin.Message) error {
func (plug inputPlugin) Collect(ctx context.Context, send func(t time.Time, record map[string]any)) error {
tick := time.NewTicker(time.Second)

for {
Expand All @@ -61,15 +61,21 @@ func (plug inputPlugin) Collect(ctx context.Context, ch chan<- plugin.Message) e
plug.collectCounter.Add(1)
plug.log.Info("[go-test-input-plugin] operation succeeded")

ch <- plugin.Message{
Time: time.Now(),
Record: map[string]any{
"message": "hello from go-test-input-plugin",
"foo": plug.foo,
"tmpl_out": buff.String(),
"multiline_split": plug.multilineSplit,
},
start := time.Now()
for i := 0; i < 100; i++ {
send(time.Now().UTC(), map[string]any{
"skipMe": true,
})
}
took := time.Since(start)

send(time.Now().UTC(), map[string]any{
"message": "hello from go-test-input-plugin",
"foo": plug.foo,
"tmpl_out": buff.String(),
"multiline_split": plug.multilineSplit,
"took": took,
})
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions testdata/output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ func (plug outputPlugin) Flush(ctx context.Context, ch <-chan plugin.Message) er
defer f.Close()

for msg := range ch {
var skip struct {
SkipMe bool `json:"skipMe"`
}
if err := json.Unmarshal(msg.Record, &skip); err != nil && skip.SkipMe {
continue
}

err := json.NewEncoder(f).Encode(msg.Record)
if err != nil {
plug.log.Error("[go-test-output-plugin] operation failed. reason %w", err)
Expand Down

0 comments on commit 22c68f7

Please sign in to comment.