diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index ef2e047..f6a26f6 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -55,7 +55,25 @@ jobs: - name: Unit tests run: | - go test -v -covermode=atomic -coverprofile=coverage.out ./... + # go test -v -covermode=atomic -coverprofile=coverage.out ./... + go test -v -covermode=atomic -coverprofile=coverage.out \ + -run \^TestInputCallbackCtrlC\$ ./ + go test -v -covermode=atomic -coverprofile=coverage.out \ + -run \^TestInputCallbackDangle\$ ./ + go test -v -covermode=atomic -coverprofile=coverage.out \ + -run \^TestInputCallbackInfinite\$ ./ + go test -v -covermode=atomic -coverprofile=coverage.out \ + -run \^TestInputCallbackInfiniteLatency\$ ./ + go test -v -covermode=atomic -coverprofile=coverage.out \ + -run \^TestInputCallbackInfiniteConcurrent\$ ./ + go test -v -covermode=atomic -coverprofile=coverage.out \ + -run \^TestPlugin\$ ./ + go test -v -covermode=atomic -coverprofile=coverage.out \ + -run \^TestInputCallbackInfiniteConcurrent\$ ./ + go test -v -covermode=atomic -coverprofile=coverage.out \ + ./configloader/ + go test -v -covermode=atomic -coverprofile=coverage.out \ + ./output/ - name: Upload coverage to Codecov if: ${{ github.event_name != 'pull_request' }} diff --git a/cshared.go b/cshared.go index afa215d..68bf910 100644 --- a/cshared.go +++ b/cshared.go @@ -6,6 +6,7 @@ package plugin import "C" import ( + "bytes" "context" "errors" "fmt" @@ -16,6 +17,7 @@ import ( "runtime" "strconv" "strings" + "sync" "time" "unsafe" @@ -27,10 +29,19 @@ import ( "github.com/calyptia/plugin/output" ) +const ( + // maxBufferedMessages is the number of messages that will be buffered + // between each fluent-bit interval (approx 1 second). + defaultMaxBufferedMessages = 300000 + // collectInterval is set to the interval present before in core-fluent-bit. + collectInterval = 1000 * time.Nanosecond +) + var ( - unregister func() - cmt *cmetrics.Context - logger Logger + unregister func() + cmt *cmetrics.Context + logger Logger + maxBufferedMessages = defaultMaxBufferedMessages ) // FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description @@ -62,7 +73,7 @@ func FLBPluginRegister(def unsafe.Pointer) int { } // FLBPluginInit this method gets invoked once by the fluent-bit runtime at initialisation phase. -// here all the plugin context should be initialised and any data or flag required for +// here all the plugin context should be initialized and any data or flag required for // plugins to execute the collect or flush callback. // //export FLBPluginInit @@ -94,6 +105,12 @@ func FLBPluginInit(ptr unsafe.Pointer) int { } err = theInput.Init(ctx, fbit) + if maxbuffered := fbit.Conf.String("go.MaxBufferedMessages"); maxbuffered != "" { + maxbuffered, err := strconv.Atoi(maxbuffered) + if err != nil { + maxBufferedMessages = maxbuffered + } + } } else { conf := &flbOutputConfigLoader{ptr: ptr} cmt, err = output.FLBPluginGetCMetricsContext(ptr) @@ -116,10 +133,40 @@ func FLBPluginInit(ptr unsafe.Pointer) int { return input.FLB_OK } -// FLBPluginInputCallback this method gets invoked by the fluent-bit runtime, once the plugin has been -// initialised, the plugin implementation is responsible for handling the incoming data and the context -// that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit -// will not execute further callbacks. +// flbPluginReset is meant to reset the plugin between tests. +func flbPluginReset() { + theInputLock.Lock() + defer theInputLock.Unlock() + + once = sync.Once{} + close(theChannel) + theInput = nil +} + +func testFLBPluginInputCallback() ([]byte, error) { + data := unsafe.Pointer(nil) + var csize C.size_t + + FLBPluginInputCallback(&data, &csize) + + if data == nil { + return []byte{}, nil + } + + defer C.free(data) + return C.GoBytes(data, C.int(csize)), nil +} + +// Lock used to synchronize access to theInput variable. +var theInputLock sync.Mutex + +// FLBPluginInputCallback this method gets invoked by the fluent-bit +// runtime, once the plugin has been initialized, the plugin +// implementation is responsible for handling the incoming data and the +// context that gets past +// +// This function will invoke Collect only once to preserve backward +// compatible behavior. There are unit tests to enforce this behavior. // //export FLBPluginInputCallback func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { @@ -130,50 +177,63 @@ func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int { return input.FLB_RETRY } - var err error once.Do(func() { runCtx, runCancel = context.WithCancel(context.Background()) - theChannel = make(chan Message) - go func() { - err = theInput.Collect(runCtx, theChannel) - }() + theChannel = make(chan Message, maxBufferedMessages) + + theInputLock.Lock() + + go func(theChannel chan<- Message) { + defer theInputLock.Unlock() + + err := theInput.Collect(runCtx, theChannel) + if err != nil { + fmt.Fprintf(os.Stderr, + "collect error: %s\n", err.Error()) + } + }(theChannel) }) - if err != nil { - fmt.Fprintf(os.Stderr, "run: %s\n", err) - return input.FLB_ERROR - } - select { - case msg, ok := <-theChannel: - if !ok { - return input.FLB_OK - } + buf := bytes.NewBuffer([]byte{}) - t := input.FLBTime{Time: msg.Time} - b, err := input.NewEncoder().Encode([]any{t, msg.Record}) - if err != nil { - fmt.Fprintf(os.Stderr, "encode: %s\n", err) - return input.FLB_ERROR + for loop := min(len(theChannel), maxBufferedMessages); loop > 0; loop-- { + select { + case msg, ok := <-theChannel: + if !ok { + return input.FLB_ERROR + } + + t := input.FLBTime{Time: msg.Time} + b, err := input.NewEncoder().Encode([]any{t, msg.Record}) + if err != nil { + fmt.Fprintf(os.Stderr, "encode: %s\n", err) + return input.FLB_ERROR + } + buf.Grow(len(b)) + buf.Write(b) + case <-runCtx.Done(): + err := runCtx.Err() + if err != nil && !errors.Is(err, context.Canceled) { + fmt.Fprintf(os.Stderr, "run: %s\n", err) + return input.FLB_ERROR + } + // enforce a runtime gc, to prevent the thread finalizer on + // fluent-bit to kick in before any remaining data has not been GC'ed + // causing a sigsegv. + defer runtime.GC() + loop = 0 + default: + loop = 0 } + } + if buf.Len() > 0 { + b := buf.Bytes() cdata := C.CBytes(b) - *data = cdata - *csize = C.size_t(len(b)) - - // C.free(unsafe.Pointer(cdata)) - case <-runCtx.Done(): - err := runCtx.Err() - if err != nil && !errors.Is(err, context.Canceled) { - fmt.Fprintf(os.Stderr, "run: %s\n", err) - return input.FLB_ERROR + if csize != nil { + *csize = C.size_t(len(b)) } - // enforce a runtime gc, to prevent the thread finalizer on - // fluent-bit to kick in before any remaining data has not been GC'ed - // causing a sigsegv. - defer runtime.GC() - default: - break } return input.FLB_OK @@ -191,7 +251,7 @@ func FLBPluginInputCleanupCallback(data unsafe.Pointer) int { // plugin in the pipeline, a data pointer, length and a tag are passed to the plugin interface implementation. // //export FLBPluginFlush -//nolint:funlen,gocognit,gocyclo //ignore length requirement for this function, TODO: refactor into smaller functions. +//nolint:funlen //ignore length requirement for this function func FLBPluginFlush(data unsafe.Pointer, clength C.int, ctag *C.char) int { initWG.Wait() diff --git a/cshared_test.go b/cshared_test.go index 1ffdcac..76f2fe7 100644 --- a/cshared_test.go +++ b/cshared_test.go @@ -2,38 +2,401 @@ package plugin import ( "context" + "fmt" + "sync" + "sync/atomic" "testing" "time" "unsafe" + + "github.com/calyptia/plugin/output" ) -type testPluginInputCallback struct{} +type testPluginInputCallbackCtrlC struct{} -func (t testPluginInputCallback) Init(ctx context.Context, fbit *Fluentbit) error { +func (t testPluginInputCallbackCtrlC) Init(ctx context.Context, fbit *Fluentbit) error { return nil } -func (t testPluginInputCallback) Collect(ctx context.Context, ch chan<- Message) error { +func (t testPluginInputCallbackCtrlC) Collect(ctx context.Context, ch chan<- Message) error { return nil } +func init() { + initWG.Done() + registerWG.Done() +} + func TestInputCallbackCtrlC(t *testing.T) { - theInput = testPluginInputCallback{} + defer flbPluginReset() + + theInputLock.Lock() + theInput = testPluginInputCallbackCtrlC{} + theInputLock.Unlock() + cdone := make(chan bool) timeout := time.NewTimer(1 * time.Second) + defer timeout.Stop() + ptr := unsafe.Pointer(nil) - initWG.Done() - registerWG.Done() + go func() { + FLBPluginInputCallback(&ptr, nil) + cdone <- true + }() + + select { + case <-cdone: + timeout.Stop() + runCancel() + case <-timeout.C: + t.Fatalf("timed out ...") + } +} + +var testPluginInputCallbackDangleFuncs atomic.Int64 + +type testPluginInputCallbackDangle struct{} + +func (t testPluginInputCallbackDangle) Init(ctx context.Context, fbit *Fluentbit) error { + return nil +} + +func (t testPluginInputCallbackDangle) Collect(ctx context.Context, ch chan<- Message) error { + testPluginInputCallbackDangleFuncs.Add(1) + ch <- Message{ + Time: time.Now(), + Record: map[string]string{ + "Foo": "BAR", + }, + } + return nil +} + +// TestInputCallbackDangle assures the API will not attempt to invoke +// Collect multiple times. This is inline with backward-compatible +// behavior. +func TestInputCallbackDangle(t *testing.T) { + defer flbPluginReset() + + theInputLock.Lock() + theInput = testPluginInputCallbackDangle{} + theInputLock.Unlock() + + cdone := make(chan bool) + ptr := unsafe.Pointer(nil) + + go func() { + t := time.NewTicker(collectInterval) + defer t.Stop() + + FLBPluginInputCallback(&ptr, nil) + for { + select { + case <-t.C: + FLBPluginInputCallback(&ptr, nil) + case <-cdone: + return + } + } + }() + + timeout := time.NewTimer(5 * time.Second) + + <-timeout.C + timeout.Stop() + runCancel() + cdone <- true + + // Test the assumption that only a single goroutine is + // ingesting records. + if testPluginInputCallbackDangleFuncs.Load() != 1 { + t.Fatalf("Too many callbacks: %d", + testPluginInputCallbackDangleFuncs.Load()) + } +} + +var testPluginInputCallbackInfiniteFuncs atomic.Int64 + +type testPluginInputCallbackInfinite struct{} + +func (t testPluginInputCallbackInfinite) Init(ctx context.Context, fbit *Fluentbit) error { + return nil +} + +func (t testPluginInputCallbackInfinite) Collect(ctx context.Context, ch chan<- Message) error { + testPluginInputCallbackInfiniteFuncs.Add(1) + for { + select { + default: + ch <- Message{ + Time: time.Now(), + Record: map[string]string{ + "Foo": "BAR", + }, + } + // for tests to correctly pass our infinite loop needs + // to return once the context has been finished. + case <-ctx.Done(): + return nil + } + } +} + +// TestInputCallbackInfinite is a test for the main method most plugins +// use where they do not return from the first invocation of collect. +func TestInputCallbackInfinite(t *testing.T) { + defer flbPluginReset() + + theInputLock.Lock() + theInput = testPluginInputCallbackInfinite{} + theInputLock.Unlock() + + cdone := make(chan bool) + cshutdown := make(chan bool) + ptr := unsafe.Pointer(nil) go func() { + t := time.NewTicker(collectInterval) + defer t.Stop() + + for { + select { + case <-t.C: + FLBPluginInputCallback(&ptr, nil) + if ptr != nil { + cdone <- true + return + } + case <-cshutdown: + return + } + } + }() + + timeout := time.NewTimer(10 * time.Second) + defer timeout.Stop() + + select { + case <-cdone: + runCancel() + // make sure Collect is not being invoked after Done(). + time.Sleep(collectInterval * 10) + // Test the assumption that only a single goroutine is + // ingesting records. + if testPluginInputCallbackInfiniteFuncs.Load() != 1 { + t.Fatalf("Too many callbacks: %d", + testPluginInputCallbackInfiniteFuncs.Load()) + } + return + case <-timeout.C: + runCancel() + cshutdown <- true + // This test seems to fail some what frequently because the Collect goroutine + // inside cshared is never being scheduled. + t.Fatalf("timed out ...") + } +} + +type testPluginInputCallbackLatency struct{} + +func (t testPluginInputCallbackLatency) Init(ctx context.Context, fbit *Fluentbit) error { + return nil +} + +func (t testPluginInputCallbackLatency) Collect(ctx context.Context, ch chan<- Message) error { + tick := time.NewTimer(time.Second * 1) + for { + select { + case <-tick.C: + for i := 0; i < 128; i++ { + ch <- Message{ + Time: time.Now(), + Record: map[string]string{ + "Foo": "BAR", + }, + } + } + tick.Reset(time.Second * 1) + case <-ctx.Done(): + return nil + } + } +} + +// TestInputCallbackInfiniteLatency is a test of the latency between +// messages. +func TestInputCallbackLatency(t *testing.T) { + defer flbPluginReset() + + theInputLock.Lock() + theInput = testPluginInputCallbackLatency{} + theInputLock.Unlock() + + cdone := make(chan bool) + cstarted := make(chan bool) + cmsg := make(chan []byte) + + go func() { + t := time.NewTicker(collectInterval) + defer t.Stop() + + buf, _ := testFLBPluginInputCallback() + if len(buf) > 0 { + cmsg <- buf + } + + cstarted <- true + for { + select { + case <-cdone: + fmt.Println("---- collect done") + return + case <-t.C: + buf, _ := testFLBPluginInputCallback() + if len(buf) > 0 { + cmsg <- buf + } + } + } + }() + + <-cstarted + fmt.Println("---- started") + timeout := time.NewTimer(5 * time.Second) + defer timeout.Stop() + + msgs := 0 + + for { + select { + case buf := <-cmsg: + dec := output.NewByteDecoder(buf) + if dec == nil { + t.Fatal("dec is nil") + } + + for { + ret, timestamp, _ := output.GetRecord(dec) + if ret == -1 { + break + } + if ret < 0 { + t.Fatalf("ret is negative: %d", ret) + } + + msgs++ + + ts, ok := timestamp.(output.FLBTime) + if !ok { + t.Fatal() + } + + if time.Since(ts.Time) > time.Millisecond*5 { + t.Errorf("latency too high: %fms", + float64(time.Since(ts.Time)/time.Millisecond)) + } + } + case <-timeout.C: + runCancel() + cdone <- true + + if msgs < 128 { + t.Fatalf("too few messages: %d", msgs) + } + return + } + } +} + +type testInputCallbackInfiniteConcurrent struct{} + +var concurrentWait sync.WaitGroup +var concurrentCountStart atomic.Int64 +var concurrentCountFinish atomic.Int64 + +func (t testInputCallbackInfiniteConcurrent) Init(ctx context.Context, fbit *Fluentbit) error { + return nil +} + +func (t testInputCallbackInfiniteConcurrent) Collect(ctx context.Context, ch chan<- Message) error { + fmt.Printf("---- infinite concurrent collect\n") + + for i := 0; i < 64; i++ { + go func(ch chan<- Message, id int) { + fmt.Printf("---- infinite concurrent started: %d\n", id) + concurrentCountStart.Add(1) + ch <- Message{ + Time: time.Now(), + Record: map[string]string{ + "ID": fmt.Sprintf("%d", id), + }, + } + concurrentCountFinish.Add(1) + concurrentWait.Done() + fmt.Printf("---- infinite concurrent finished: %d\n", id) + }(ch, i) + fmt.Printf("---- infinite concurrent starting: %d\n", i) + } + // for tests to correctly pass our infinite loop needs + // to return once the context has been finished. + for { + select { + case <-ctx.Done(): + return nil + } + } +} + +// TestInputCallbackInfiniteConcurrent is meant to make sure we do not +// break anythin with respect to concurrent ingest. +func TestInputCallbackInfiniteConcurrent(t *testing.T) { + defer flbPluginReset() + + theInputLock.Lock() + theInput = testInputCallbackInfiniteConcurrent{} + theInputLock.Unlock() + + cdone := make(chan bool) + cstarted := make(chan bool) + ptr := unsafe.Pointer(nil) + + concurrentWait.Add(64) + + go func(cstarted chan bool) { + ticker := time.NewTicker(time.Second * 1) + defer ticker.Stop() + FLBPluginInputCallback(&ptr, nil) + cstarted <- true + + for { + select { + case <-ticker.C: + FLBPluginInputCallback(&ptr, nil) + case <-runCtx.Done(): + return + } + } + }(cstarted) + + go func() { + concurrentWait.Wait() cdone <- true }() + <-cstarted + timeout := time.NewTimer(10 * time.Second) + select { case <-cdone: + runCancel() case <-timeout.C: - t.Fail() + runCancel() + // this test seems to timeout semi-frequently... need to get to + // the bottom of it... + t.Fatalf("---- timed out: %d/%d ...", + concurrentCountStart.Load(), + concurrentCountFinish.Load()) } } diff --git a/output/decoder.go b/output/decoder.go index d1b438f..aa9e508 100644 --- a/output/decoder.go +++ b/output/decoder.go @@ -69,6 +69,17 @@ func NewDecoder(data unsafe.Pointer, length int) *FLBDecoder { return dec } +func NewByteDecoder(b []byte) *FLBDecoder { + dec := new(FLBDecoder) + dec.handle = new(codec.MsgpackHandle) + // TODO: handle error. + _ = dec.handle.SetBytesExt(reflect.TypeOf(FLBTime{}), 0, &FLBTime{}) + + dec.mpdec = codec.NewDecoderBytes(b, dec.handle) + + return dec +} + func GetRecord(dec *FLBDecoder) (ret int, ts interface{}, rec map[interface{}]interface{}) { var check error var m interface{}