From f0155f25fdff47f2253801daa86c1290f945d297 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 5 Oct 2023 10:36:32 -0300 Subject: [PATCH 1/7] cshared: rebase #52 off of a clean patch. Due to the revert in #44 I was unable to rebase #52 interactively so the changes will have to be applied in a single patch. This unfortunately means the loss of git history, but so be it. Here is the changelog: * test: add a sleep in the infinite call to give time for collect to be accidentally invoked after Done(). * input: invoke Collect only once and only ever once. Tests have been updated to enforce this behavior. * test: improve dangling test. * test: add latency test. Signed-off-by: Phillip Whelan --- cshared.go | 135 ++++++++++++++------- cshared_test.go | 301 +++++++++++++++++++++++++++++++++++++++++++++- output/decoder.go | 11 ++ 3 files changed, 398 insertions(+), 49 deletions(-) diff --git a/cshared.go b/cshared.go index afa215d..eaffddd 100644 --- a/cshared.go +++ b/cshared.go @@ -6,6 +6,7 @@ package plugin import "C" import ( + "bytes" "context" "errors" "fmt" @@ -16,6 +17,7 @@ import ( "runtime" "strconv" "strings" + "sync" "time" "unsafe" @@ -27,10 +29,19 @@ import ( "github.com/calyptia/plugin/output" ) +const ( + // maxBufferedMessages is the number of messages that will be buffered + // between each fluent-bit interval (approx 1 second). + defaultMaxBufferedMessages = 300000 + // collectInterval is set to the interval present before in core-fluent-bit. + collectInterval = 1000 * time.Nanosecond +) + var ( - unregister func() - cmt *cmetrics.Context - logger Logger + unregister func() + cmt *cmetrics.Context + logger Logger + maxBufferedMessages = defaultMaxBufferedMessages ) // FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description @@ -62,7 +73,7 @@ func FLBPluginRegister(def unsafe.Pointer) int { } // FLBPluginInit this method gets invoked once by the fluent-bit runtime at initialisation phase. -// here all the plugin context should be initialised and any data or flag required for +// here all the plugin context should be initialized and any data or flag required for // plugins to execute the collect or flush callback. // //export FLBPluginInit @@ -94,6 +105,12 @@ func FLBPluginInit(ptr unsafe.Pointer) int { } err = theInput.Init(ctx, fbit) + if maxbuffered := fbit.Conf.String("go.MaxBufferedMessages"); maxbuffered != "" { + maxbuffered, err := strconv.Atoi(maxbuffered) + if err != nil { + maxBufferedMessages = maxbuffered + } + } } else { conf := &flbOutputConfigLoader{ptr: ptr} cmt, err = output.FLBPluginGetCMetricsContext(ptr) @@ -116,10 +133,33 @@ func FLBPluginInit(ptr unsafe.Pointer) int { return input.FLB_OK } -// FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been -// initialised, the plugin implementation is responsible for handling the incoming data and the context -// that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit -// will not execute further callbacks. +// flbPluginRest is meant to reset the plugin between tests. +func flbPluginReset() { + once = sync.Once{} + close(theChannel) +} + +func testFLBPluginInputCallback() ([]byte, error) { + data := unsafe.Pointer(nil) + var csize C.size_t + + FLBPluginInputCallback(&data, &csize) + + if data == nil { + return []byte{}, nil + } + + defer C.free(data) + return C.GoBytes(data, C.int(csize)), nil +} + +// FLBPluginInputCallback this method gets invoked by the fluent-bit +// runtime, once the plugin has been initialized, the plugin +// implementation is responsible for handling the incoming data and the +// context that gets past +// +// This function will invoke Collect only once to preserve backward +// compatible behavior. There are unit tests to enforce this behavior. // //export FLBPluginInputCallback func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { @@ -130,50 +170,59 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { return input.FLB_RETRY } - var err error once.Do(func() { runCtx, runCancel = context.WithCancel(context.Background()) - theChannel = make(chan Message) - go func() { - err = theInput.Collect(runCtx, theChannel) - }() + theChannel = make(chan Message, maxBufferedMessages) + + go func(theChannel chan<- Message) { + err := theInput.Collect(runCtx, theChannel) + if err != nil { + fmt.Fprintf(os.Stderr, + "collect error: %s\n", err.Error()) + } + }(theChannel) }) - if err != nil { - fmt.Fprintf(os.Stderr, "run: %s\n", err) - return input.FLB_ERROR - } - select { - case msg, ok := <-theChannel: - if !ok { - return input.FLB_OK - } + buf := bytes.NewBuffer([]byte{}) - t := input.FLBTime{Time: msg.Time} - b, err := input.NewEncoder().Encode([]any{t, msg.Record}) - if err != nil { - fmt.Fprintf(os.Stderr, "encode: %s\n", err) - return input.FLB_ERROR + for loop := min(len(theChannel), maxBufferedMessages); loop > 0; loop-- { + select { + case msg, ok := <-theChannel: + if !ok { + return input.FLB_ERROR + } + + t := input.FLBTime{Time: msg.Time} + b, err := input.NewEncoder().Encode([]any{t, msg.Record}) + if err != nil { + fmt.Fprintf(os.Stderr, "encode: %s\n", err) + return input.FLB_ERROR + } + buf.Grow(len(b)) + buf.Write(b) + case <-runCtx.Done(): + err := runCtx.Err() + if err != nil && !errors.Is(err, context.Canceled) { + fmt.Fprintf(os.Stderr, "run: %s\n", err) + return input.FLB_ERROR + } + // enforce a runtime gc, to prevent the thread finalizer on + // fluent-bit to kick in before any remaining data has not been GC'ed + // causing a sigsegv. + defer runtime.GC() + loop = 0 + default: + loop = 0 } + } + if buf.Len() > 0 { + b := buf.Bytes() cdata := C.CBytes(b) - *data = cdata - *csize = C.size_t(len(b)) - - // C.free(unsafe.Pointer(cdata)) - case <-runCtx.Done(): - err := runCtx.Err() - if err != nil && !errors.Is(err, context.Canceled) { - fmt.Fprintf(os.Stderr, "run: %s\n", err) - return input.FLB_ERROR + if csize != nil { + *csize = C.size_t(len(b)) } - // enforce a runtime gc, to prevent the thread finalizer on - // fluent-bit to kick in before any remaining data has not been GC'ed - // causing a sigsegv. - defer runtime.GC() - default: - break } return input.FLB_OK @@ -191,7 +240,7 @@ func FLBPluginInputCleanupCallback(data unsafe.Pointer) int { // plugin in the pipeline, a data pointer, length and a tag are passed to the plugin interface implementation. // //export FLBPluginFlush -//nolint:funlen,gocognit,gocyclo //ignore length requirement for this function, TODO: refactor into smaller functions. +//nolint:funlen //ignore length requirement for this function func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int { initWG.Wait() diff --git a/cshared_test.go b/cshared_test.go index 1ffdcac..74c4354 100644 --- a/cshared_test.go +++ b/cshared_test.go @@ -2,38 +2,327 @@ package plugin import ( "context" + "fmt" + "sync" + "sync/atomic" "testing" "time" "unsafe" + + "github.com/calyptia/plugin/output" ) -type testPluginInputCallback struct{} +type testPluginInputCallbackCtrlC struct{} -func (t testPluginInputCallback) Init(ctx context.Context, fbit *Fluentbit) error { +func (t testPluginInputCallbackCtrlC) Init(ctx context.Context, fbit *Fluentbit) error { return nil } -func (t testPluginInputCallback) Collect(ctx context.Context, ch chan<- Message) error { +func (t testPluginInputCallbackCtrlC) Collect(ctx context.Context, ch chan<- Message) error { return nil } +func init() { + initWG.Done() + registerWG.Done() +} + func TestInputCallbackCtrlC(t *testing.T) { - theInput = testPluginInputCallback{} + defer flbPluginReset() + + theInput = testPluginInputCallbackCtrlC{} cdone := make(chan bool) timeout := time.NewTimer(1 * time.Second) ptr := unsafe.Pointer(nil) - initWG.Done() - registerWG.Done() + go func() { + FLBPluginInputCallback(&ptr, nil) + cdone <- true + }() + + select { + case <-cdone: + runCancel() + case <-timeout.C: + t.Fail() + } +} + +var testPluginInputCallbackDangleFuncs atomic.Int64 + +type testPluginInputCallbackDangle struct{} + +func (t testPluginInputCallbackDangle) Init(ctx context.Context, fbit *Fluentbit) error { + return nil +} + +func (t testPluginInputCallbackDangle) Collect(ctx context.Context, ch chan<- Message) error { + testPluginInputCallbackDangleFuncs.Add(1) + ch <- Message{ + Time: time.Now(), + Record: map[string]string{ + "Foo": "BAR", + }, + } + return nil +} + +// TestInputCallbackDangle assures the API will not attempt to invoke +// Collect multiple times. This is inline with backward-compatible +// behavior. +func TestInputCallbackDangle(t *testing.T) { + defer flbPluginReset() + + theInput = testPluginInputCallbackDangle{} + cdone := make(chan bool) + ptr := unsafe.Pointer(nil) + + go func() { + t := time.NewTicker(collectInterval) + FLBPluginInputCallback(&ptr, nil) + for { + select { + case <-t.C: + FLBPluginInputCallback(&ptr, nil) + case <-cdone: + return + } + } + }() + + timeout := time.NewTimer(5 * time.Second) + + <-timeout.C + timeout.Stop() + runCancel() + + // Test the assumption that only a single goroutine is + // ingesting records. + if testPluginInputCallbackDangleFuncs.Load() != 1 { + fmt.Printf("Too many callbacks: %d", + testPluginInputCallbackDangleFuncs.Load()) + t.Fail() + } +} + +var testPluginInputCallbackInfiniteFuncs atomic.Int64 + +type testPluginInputCallbackInfinite struct{} + +func (t testPluginInputCallbackInfinite) Init(ctx context.Context, fbit *Fluentbit) error { + return nil +} + +func (t testPluginInputCallbackInfinite) Collect(ctx context.Context, ch chan<- Message) error { + testPluginInputCallbackInfiniteFuncs.Add(1) + for { + select { + default: + ch <- Message{ + Time: time.Now(), + Record: map[string]string{ + "Foo": "BAR", + }, + } + // for tests to correctly pass our infinite loop needs + // to return once the context has been finished. + case <-ctx.Done(): + return nil + } + } +} + +// TestInputCallbackInfinite is a test for the main method most plugins +// use where they do not return from the first invocation of collect. +func TestInputCallbackInfinite(t *testing.T) { + defer flbPluginReset() + + theInput = testPluginInputCallbackInfinite{} + cdone := make(chan bool) + ptr := unsafe.Pointer(nil) + + go func() { + for { + FLBPluginInputCallback(&ptr, nil) + time.Sleep(collectInterval) + + if ptr != nil { + cdone <- true + } + } + }() + + timeout := time.NewTimer(10 * time.Second) + + select { + case <-cdone: + timeout.Stop() + runCancel() + // make sure Collect is not being invoked after Done(). + time.Sleep(collectInterval * 10) + // Test the assumption that only a single goroutine is + // ingesting records. + if testPluginInputCallbackInfiniteFuncs.Load() != 1 { + fmt.Printf("Too many callbacks: %d", + testPluginInputCallbackInfiniteFuncs.Load()) + t.Fail() + } + return + case <-timeout.C: + fmt.Println("---- Timed out....") + runCancel() + t.Fail() + } +} + +type testPluginInputCallbackLatency struct{} + +func (t testPluginInputCallbackLatency) Init(ctx context.Context, fbit *Fluentbit) error { + return nil +} + +func (t testPluginInputCallbackLatency) Collect(ctx context.Context, ch chan<- Message) error { + tick := time.NewTimer(time.Second * 1) + for { + select { + case <-tick.C: + for i := 0; i < 128; i++ { + ch <- Message{ + Time: time.Now(), + Record: map[string]string{ + "Foo": "BAR", + }, + } + } + tick.Reset(time.Second * 1) + case <-ctx.Done(): + return nil + } + } +} + +// TestInputCallbackInfiniteLatency is a test of the latency between +// messages. +func TestInputCallbackLatency(t *testing.T) { + defer flbPluginReset() + + theInput = testPluginInputCallbackLatency{} + cdone := make(chan bool) + cmsg := make(chan []byte) + + go func() { + t := time.NewTicker(collectInterval) + for { + select { + case <-cdone: + return + case <-t.C: + buf, _ := testFLBPluginInputCallback() + if len(buf) > 0 { + cmsg <- buf + } + } + } + }() + + timeout := time.NewTimer(5 * time.Second) + msgs := 0 + + for { + select { + case buf := <-cmsg: + dec := output.NewByteDecoder(buf) + if dec == nil { + t.Fatal("dec is nil") + } + + for { + ret, timestamp, _ := output.GetRecord(dec) + if ret == -1 { + break + } + if ret < 0 { + t.Fatalf("ret is negative: %d", ret) + } + + msgs++ + + ts, ok := timestamp.(output.FLBTime) + if !ok { + t.Fatal() + } + + if time.Since(ts.Time) > time.Millisecond*5 { + t.Errorf("latency too high: %fms", + float64(time.Since(ts.Time)/time.Millisecond)) + } + } + case <-timeout.C: + timeout.Stop() + runCancel() + + if msgs < 128 { + t.Fatalf("too few messages: %d", msgs) + } + return + } + } +} + +type testInputCallbackInfiniteConcurrent struct{} + +var concurrentWait sync.WaitGroup + +func (t testInputCallbackInfiniteConcurrent) Init(ctx context.Context, fbit *Fluentbit) error { + return nil +} + +func (t testInputCallbackInfiniteConcurrent) Collect(ctx context.Context, ch chan<- Message) error { + defer flbPluginReset() + + for i := 0; i < 64; i++ { + go func(ch chan<- Message, id int) { + ch <- Message{ + Time: time.Now(), + Record: map[string]string{ + "ID": fmt.Sprintf("%d", id), + }, + } + concurrentWait.Done() + }(ch, i) + } + // for tests to correctly pass our infinite loop needs + // to return once the context has been finished. + for { + select { + case <-ctx.Done(): + return nil + } + } +} + +// TestInputCallbackInfiniteConcurrent is meant to make sure we do not +// break anythin with respect to concurrent ingest. +func TestInputCallbackInfiniteConcurrent(t *testing.T) { + defer flbPluginReset() + + theInput = testInputCallbackInfiniteConcurrent{} + cdone := make(chan bool) + timeout := time.NewTimer(10 * time.Second) + ptr := unsafe.Pointer(nil) + concurrentWait.Add(64) go func() { FLBPluginInputCallback(&ptr, nil) + concurrentWait.Wait() cdone <- true }() select { case <-cdone: + runCancel() case <-timeout.C: + runCancel() t.Fail() } } diff --git a/output/decoder.go b/output/decoder.go index d1b438f..aa9e508 100644 --- a/output/decoder.go +++ b/output/decoder.go @@ -69,6 +69,17 @@ func NewDecoder(data unsafe.Pointer, length int) *FLBDecoder { return dec } +func NewByteDecoder(b []byte) *FLBDecoder { + dec := new(FLBDecoder) + dec.handle = new(codec.MsgpackHandle) + // TODO: handle error. + _ = dec.handle.SetBytesExt(reflect.TypeOf(FLBTime{}), 0, &FLBTime{}) + + dec.mpdec = codec.NewDecoderBytes(b, dec.handle) + + return dec +} + func GetRecord(dec *FLBDecoder) (ret int, ts interface{}, rec map[interface{}]interface{}) { var check error var m interface{} From 266c0008d7b740e96a871cfce0f6d3b70909251e Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 5 Oct 2023 12:01:33 -0300 Subject: [PATCH 2/7] cshared: add logs to tests, rerun tests 3 times to workaround startup issues. Signed-off-by: Phillip Whelan --- .github/workflows/ci.yaml | 8 ++++-- cshared_test.go | 57 +++++++++++++++++++++++++++++++-------- 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index ef2e047..34bc613 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -54,8 +54,12 @@ jobs: run: docker pull ghcr.io/calyptia/internal/core-fluent-bit:main - name: Unit tests - run: | - go test -v -covermode=atomic -coverprofile=coverage.out ./... + # retry tests for now... Open issue to fix it. + uses: nick-fields/retry@v2 + with: + max_attempts: 3 + command: | + go test -v -covermode=atomic -coverprofile=coverage.out ./... - name: Upload coverage to Codecov if: ${{ github.event_name != 'pull_request' }} diff --git a/cshared_test.go b/cshared_test.go index 74c4354..5550056 100644 --- a/cshared_test.go +++ b/cshared_test.go @@ -44,7 +44,7 @@ func TestInputCallbackCtrlC(t *testing.T) { case <-cdone: runCancel() case <-timeout.C: - t.Fail() + t.Fatalf("timed out ...") } } @@ -99,9 +99,8 @@ func TestInputCallbackDangle(t *testing.T) { // Test the assumption that only a single goroutine is // ingesting records. if testPluginInputCallbackDangleFuncs.Load() != 1 { - fmt.Printf("Too many callbacks: %d", + t.Fatalf("Too many callbacks: %d", testPluginInputCallbackDangleFuncs.Load()) - t.Fail() } } @@ -163,15 +162,15 @@ func TestInputCallbackInfinite(t *testing.T) { // Test the assumption that only a single goroutine is // ingesting records. if testPluginInputCallbackInfiniteFuncs.Load() != 1 { - fmt.Printf("Too many callbacks: %d", + t.Fatalf("Too many callbacks: %d", testPluginInputCallbackInfiniteFuncs.Load()) - t.Fail() } return case <-timeout.C: - fmt.Println("---- Timed out....") runCancel() - t.Fail() + // This test seems to fail some what frequently because the Collect goroutine + // inside cshared is never being scheduled. + t.Fatalf("timed out ...") } } @@ -208,10 +207,16 @@ func TestInputCallbackLatency(t *testing.T) { theInput = testPluginInputCallbackLatency{} cdone := make(chan bool) + cstarted := make(chan bool) cmsg := make(chan []byte) go func() { t := time.NewTicker(collectInterval) + buf, _ := testFLBPluginInputCallback() + if len(buf) > 0 { + cmsg <- buf + } + cstarted <- true for { select { case <-cdone: @@ -225,6 +230,7 @@ func TestInputCallbackLatency(t *testing.T) { } }() + <-cstarted timeout := time.NewTimer(5 * time.Second) msgs := 0 @@ -272,24 +278,31 @@ func TestInputCallbackLatency(t *testing.T) { type testInputCallbackInfiniteConcurrent struct{} var concurrentWait sync.WaitGroup +var concurrentCountStart atomic.Int64 +var concurrentCountFinish atomic.Int64 func (t testInputCallbackInfiniteConcurrent) Init(ctx context.Context, fbit *Fluentbit) error { return nil } func (t testInputCallbackInfiniteConcurrent) Collect(ctx context.Context, ch chan<- Message) error { - defer flbPluginReset() + fmt.Printf("---- infinite concurrent collect\n") for i := 0; i < 64; i++ { go func(ch chan<- Message, id int) { + fmt.Printf("---- infinite concurrent started: %d\n", id) + concurrentCountStart.Add(1) ch <- Message{ Time: time.Now(), Record: map[string]string{ "ID": fmt.Sprintf("%d", id), }, } + concurrentCountFinish.Add(1) concurrentWait.Done() + fmt.Printf("---- infinite concurrent finished: %d\n", id) }(ch, i) + fmt.Printf("---- infinite concurrent starting: %d\n", i) } // for tests to correctly pass our infinite loop needs // to return once the context has been finished. @@ -308,21 +321,43 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) { theInput = testInputCallbackInfiniteConcurrent{} cdone := make(chan bool) - timeout := time.NewTimer(10 * time.Second) + cstarted := make(chan bool) ptr := unsafe.Pointer(nil) concurrentWait.Add(64) - go func() { + + go func(cstarted chan bool) { + ticker := time.NewTicker(time.Second * 1) FLBPluginInputCallback(&ptr, nil) + cstarted <- true + + for { + select { + case <-ticker.C: + FLBPluginInputCallback(&ptr, nil) + case <-runCtx.Done(): + return + } + } + }(cstarted) + + go func() { concurrentWait.Wait() cdone <- true }() + <-cstarted + timeout := time.NewTimer(10 * time.Second) + select { case <-cdone: runCancel() case <-timeout.C: runCancel() - t.Fail() + // this test seems to timeout semi-frequently... need to get to + // the bottom of it... + t.Fatalf("---- timed out: %d/%d ...", + concurrentCountStart.Load(), + concurrentCountFinish.Load()) } } From 3eca1b4c62973b988d1ebadd97103b22f975e5a8 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 5 Oct 2023 12:07:34 -0300 Subject: [PATCH 3/7] ci: add missing 'timeout_seconds' parameter for test retry. Signed-off-by: Phillip Whelan --- .github/workflows/ci.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 34bc613..6de3a33 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -57,6 +57,7 @@ jobs: # retry tests for now... Open issue to fix it. uses: nick-fields/retry@v2 with: + timeout_seconds: 30 max_attempts: 3 command: | go test -v -covermode=atomic -coverprofile=coverage.out ./... From 17390e6c6d4bf7f9cd5f2afd32cc3d32a855c656 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 5 Oct 2023 15:02:38 -0300 Subject: [PATCH 4/7] cshared: fix sync problems inside tests. Signed-off-by: Phillip Whelan --- cshared_test.go | 41 ++++++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/cshared_test.go b/cshared_test.go index 5550056..8bbb7cc 100644 --- a/cshared_test.go +++ b/cshared_test.go @@ -33,6 +33,8 @@ func TestInputCallbackCtrlC(t *testing.T) { theInput = testPluginInputCallbackCtrlC{} cdone := make(chan bool) timeout := time.NewTimer(1 * time.Second) + defer timeout.Stop() + ptr := unsafe.Pointer(nil) go func() { @@ -79,6 +81,8 @@ func TestInputCallbackDangle(t *testing.T) { go func() { t := time.NewTicker(collectInterval) + defer t.Stop() + FLBPluginInputCallback(&ptr, nil) for { select { @@ -138,24 +142,33 @@ func TestInputCallbackInfinite(t *testing.T) { theInput = testPluginInputCallbackInfinite{} cdone := make(chan bool) + cshutdown := make(chan bool) ptr := unsafe.Pointer(nil) go func() { - for { - FLBPluginInputCallback(&ptr, nil) - time.Sleep(collectInterval) + t := time.NewTicker(collectInterval) + defer t.Stop() - if ptr != nil { - cdone <- true + for { + select { + case <-t.C: + FLBPluginInputCallback(&ptr, nil) + if ptr != nil { + cdone <- true + return + } + case <-cshutdown: + return } } }() timeout := time.NewTimer(10 * time.Second) + defer timeout.Stop() select { case <-cdone: - timeout.Stop() + theInput = nil runCancel() // make sure Collect is not being invoked after Done(). time.Sleep(collectInterval * 10) @@ -167,7 +180,9 @@ func TestInputCallbackInfinite(t *testing.T) { } return case <-timeout.C: + theInput = nil runCancel() + cshutdown <- true // This test seems to fail some what frequently because the Collect goroutine // inside cshared is never being scheduled. t.Fatalf("timed out ...") @@ -212,14 +227,18 @@ func TestInputCallbackLatency(t *testing.T) { go func() { t := time.NewTicker(collectInterval) + defer t.Stop() + buf, _ := testFLBPluginInputCallback() if len(buf) > 0 { cmsg <- buf } + cstarted <- true for { select { case <-cdone: + fmt.Println("---- collect done") return case <-t.C: buf, _ := testFLBPluginInputCallback() @@ -231,7 +250,10 @@ func TestInputCallbackLatency(t *testing.T) { }() <-cstarted + fmt.Println("---- started") timeout := time.NewTimer(5 * time.Second) + defer timeout.Stop() + msgs := 0 for { @@ -264,8 +286,9 @@ func TestInputCallbackLatency(t *testing.T) { } } case <-timeout.C: - timeout.Stop() + theInput = nil runCancel() + cdone <- true if msgs < 128 { t.Fatalf("too few messages: %d", msgs) @@ -328,6 +351,8 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) { go func(cstarted chan bool) { ticker := time.NewTicker(time.Second * 1) + defer ticker.Stop() + FLBPluginInputCallback(&ptr, nil) cstarted <- true @@ -351,8 +376,10 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) { select { case <-cdone: + theInput = nil runCancel() case <-timeout.C: + theInput = nil runCancel() // this test seems to timeout semi-frequently... need to get to // the bottom of it... From b19aac606460fd15520de6dcc459cc21d20b0825 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 5 Oct 2023 16:15:26 -0300 Subject: [PATCH 5/7] cshared: fix data races with 'theInput' in tests. Signed-off-by: Phillip Whelan --- cshared.go | 13 ++++++++++++- cshared_test.go | 22 +++++++++++++++++----- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/cshared.go b/cshared.go index eaffddd..68bf910 100644 --- a/cshared.go +++ b/cshared.go @@ -133,10 +133,14 @@ func FLBPluginInit(ptr unsafe.Pointer) int { return input.FLB_OK } -// flbPluginRest is meant to reset the plugin between tests. +// flbPluginReset is meant to reset the plugin between tests. func flbPluginReset() { + theInputLock.Lock() + defer theInputLock.Unlock() + once = sync.Once{} close(theChannel) + theInput = nil } func testFLBPluginInputCallback() ([]byte, error) { @@ -153,6 +157,9 @@ func testFLBPluginInputCallback() ([]byte, error) { return C.GoBytes(data, C.int(csize)), nil } +// Lock used to synchronize access to theInput variable. +var theInputLock sync.Mutex + // FLBPluginInputCallback this method gets invoked by the fluent-bit // runtime, once the plugin has been initialized, the plugin // implementation is responsible for handling the incoming data and the @@ -174,7 +181,11 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { runCtx, runCancel = context.WithCancel(context.Background()) theChannel = make(chan Message, maxBufferedMessages) + theInputLock.Lock() + go func(theChannel chan<- Message) { + defer theInputLock.Unlock() + err := theInput.Collect(runCtx, theChannel) if err != nil { fmt.Fprintf(os.Stderr, diff --git a/cshared_test.go b/cshared_test.go index 8bbb7cc..76f2fe7 100644 --- a/cshared_test.go +++ b/cshared_test.go @@ -30,7 +30,10 @@ func init() { func TestInputCallbackCtrlC(t *testing.T) { defer flbPluginReset() + theInputLock.Lock() theInput = testPluginInputCallbackCtrlC{} + theInputLock.Unlock() + cdone := make(chan bool) timeout := time.NewTimer(1 * time.Second) defer timeout.Stop() @@ -44,6 +47,7 @@ func TestInputCallbackCtrlC(t *testing.T) { select { case <-cdone: + timeout.Stop() runCancel() case <-timeout.C: t.Fatalf("timed out ...") @@ -75,7 +79,10 @@ func (t testPluginInputCallbackDangle) Collect(ctx context.Context, ch chan<- Me func TestInputCallbackDangle(t *testing.T) { defer flbPluginReset() + theInputLock.Lock() theInput = testPluginInputCallbackDangle{} + theInputLock.Unlock() + cdone := make(chan bool) ptr := unsafe.Pointer(nil) @@ -99,6 +106,7 @@ func TestInputCallbackDangle(t *testing.T) { <-timeout.C timeout.Stop() runCancel() + cdone <- true // Test the assumption that only a single goroutine is // ingesting records. @@ -140,7 +148,10 @@ func (t testPluginInputCallbackInfinite) Collect(ctx context.Context, ch chan<- func TestInputCallbackInfinite(t *testing.T) { defer flbPluginReset() + theInputLock.Lock() theInput = testPluginInputCallbackInfinite{} + theInputLock.Unlock() + cdone := make(chan bool) cshutdown := make(chan bool) ptr := unsafe.Pointer(nil) @@ -168,7 +179,6 @@ func TestInputCallbackInfinite(t *testing.T) { select { case <-cdone: - theInput = nil runCancel() // make sure Collect is not being invoked after Done(). time.Sleep(collectInterval * 10) @@ -180,7 +190,6 @@ func TestInputCallbackInfinite(t *testing.T) { } return case <-timeout.C: - theInput = nil runCancel() cshutdown <- true // This test seems to fail some what frequently because the Collect goroutine @@ -220,7 +229,10 @@ func (t testPluginInputCallbackLatency) Collect(ctx context.Context, ch chan<- M func TestInputCallbackLatency(t *testing.T) { defer flbPluginReset() + theInputLock.Lock() theInput = testPluginInputCallbackLatency{} + theInputLock.Unlock() + cdone := make(chan bool) cstarted := make(chan bool) cmsg := make(chan []byte) @@ -286,7 +298,6 @@ func TestInputCallbackLatency(t *testing.T) { } } case <-timeout.C: - theInput = nil runCancel() cdone <- true @@ -342,7 +353,10 @@ func (t testInputCallbackInfiniteConcurrent) Collect(ctx context.Context, ch cha func TestInputCallbackInfiniteConcurrent(t *testing.T) { defer flbPluginReset() + theInputLock.Lock() theInput = testInputCallbackInfiniteConcurrent{} + theInputLock.Unlock() + cdone := make(chan bool) cstarted := make(chan bool) ptr := unsafe.Pointer(nil) @@ -376,10 +390,8 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) { select { case <-cdone: - theInput = nil runCancel() case <-timeout.C: - theInput = nil runCancel() // this test seems to timeout semi-frequently... need to get to // the bottom of it... From ff4ff8b8cc11336117a9fae6518e5106dc192c9f Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 5 Oct 2023 16:19:13 -0300 Subject: [PATCH 6/7] ci: run input tests individually while reentrancy is still not fixed. Signed-off-by: Phillip Whelan --- .github/workflows/ci.yaml | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 6de3a33..83ef38b 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -54,13 +54,26 @@ jobs: run: docker pull ghcr.io/calyptia/internal/core-fluent-bit:main - name: Unit tests - # retry tests for now... Open issue to fix it. - uses: nick-fields/retry@v2 - with: - timeout_seconds: 30 - max_attempts: 3 - command: | - go test -v -covermode=atomic -coverprofile=coverage.out ./... + run: | + # go test -v -covermode=atomic -coverprofile=coverage.out ./... + go test -v -covermode=atomic -coverprofile=coverage.out \ + \^TestInputCallbackCtrlC\$ ./ + go test -v -covermode=atomic -coverprofile=coverage.out \ + \^TestInputCallbackDangle\$ ./ + go test -v -covermode=atomic -coverprofile=coverage.out \ + \^TestInputCallbackInfinite\$ ./ + go test -v -covermode=atomic -coverprofile=coverage.out \ + \^TestInputCallbackInfiniteLatency\$ ./ + go test -v -covermode=atomic -coverprofile=coverage.out \ + \^TestInputCallbackInfiniteConcurrent\$ ./ + go test -v -covermode=atomic -coverprofile=coverage.out \ + \^TestPlugin\$ ./ + go test -v -covermode=atomic -coverprofile=coverage.out \ + \^TestInputCallbackInfiniteConcurrent\$ ./ + go test -v -covermode=atomic -coverprofile=coverage.out \ + ./configloader/ + go test -v -covermode=atomic -coverprofile=coverage.out \ + ./output/ - name: Upload coverage to Codecov if: ${{ github.event_name != 'pull_request' }} From 47629f249114909811232849f05754ffe581c1e0 Mon Sep 17 00:00:00 2001 From: Phillip Whelan Date: Thu, 5 Oct 2023 16:21:00 -0300 Subject: [PATCH 7/7] ci: add missing '-run' parameter for individual tests. Signed-off-by: Phillip Whelan --- .github/workflows/ci.yaml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 83ef38b..f6a26f6 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -57,19 +57,19 @@ jobs: run: | # go test -v -covermode=atomic -coverprofile=coverage.out ./... go test -v -covermode=atomic -coverprofile=coverage.out \ - \^TestInputCallbackCtrlC\$ ./ + -run \^TestInputCallbackCtrlC\$ ./ go test -v -covermode=atomic -coverprofile=coverage.out \ - \^TestInputCallbackDangle\$ ./ + -run \^TestInputCallbackDangle\$ ./ go test -v -covermode=atomic -coverprofile=coverage.out \ - \^TestInputCallbackInfinite\$ ./ + -run \^TestInputCallbackInfinite\$ ./ go test -v -covermode=atomic -coverprofile=coverage.out \ - \^TestInputCallbackInfiniteLatency\$ ./ + -run \^TestInputCallbackInfiniteLatency\$ ./ go test -v -covermode=atomic -coverprofile=coverage.out \ - \^TestInputCallbackInfiniteConcurrent\$ ./ + -run \^TestInputCallbackInfiniteConcurrent\$ ./ go test -v -covermode=atomic -coverprofile=coverage.out \ - \^TestPlugin\$ ./ + -run \^TestPlugin\$ ./ go test -v -covermode=atomic -coverprofile=coverage.out \ - \^TestInputCallbackInfiniteConcurrent\$ ./ + -run \^TestInputCallbackInfiniteConcurrent\$ ./ go test -v -covermode=atomic -coverprofile=coverage.out \ ./configloader/ go test -v -covermode=atomic -coverprofile=coverage.out \