Skip to content

Commit

Permalink
Merge pull request #54 from calyptia/cosmo0920-enable-execution-for-m…
Browse files Browse the repository at this point in the history
…ulti-instances

Enable execution for multi instances
  • Loading branch information
niedbalski authored Oct 25, 2023
2 parents aa19413 + cbc15ad commit 8dbb935
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 21 deletions.
65 changes: 51 additions & 14 deletions cshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ var (
//export FLBPluginPreRegister
func FLBPluginPreRegister(hotReloading C.int) int {
if hotReloading == C.int(1) {
initWG.Add(1)
registerWG.Add(1)
}

Expand Down Expand Up @@ -93,6 +92,11 @@ func cleanup() int {
runCancel = nil
}

if !theInputLock.TryLock() {
return input.FLB_OK
}
defer theInputLock.Unlock()

if theChannel != nil {
defer close(theChannel)
}
Expand All @@ -106,6 +110,7 @@ func cleanup() int {
//
//export FLBPluginInit
func FLBPluginInit(ptr unsafe.Pointer) int {
initWG.Add(1)
defer initWG.Done()

if theInput == nil && theOutput == nil {
Expand Down Expand Up @@ -186,21 +191,25 @@ func testFLBPluginInputCallback() ([]byte, error) {
// Lock used to synchronize access to theInput variable.
var theInputLock sync.Mutex

// FLBPluginInputCallback this method gets invoked by the fluent-bit
// runtime, once the plugin has been initialized, the plugin
// implementation is responsible for handling the incoming data and the
// context that gets past
//
// This function will invoke Collect only once to preserve backward
// compatible behavior. There are unit tests to enforce this behavior.
func prepareInputCollector() (err error) {
// prepareInputCollector is meant to prepare resources for input collectors
func prepareInputCollector(multiInstance bool) (err error) {
runCtx, runCancel = context.WithCancel(context.Background())
theChannel = make(chan Message, maxBufferedMessages)
if !multiInstance {
theChannel = make(chan Message, maxBufferedMessages)
}

theInputLock.Lock()
if multiInstance {
if theChannel == nil {
theChannel = make(chan Message, maxBufferedMessages)
}
defer theInputLock.Unlock()
}

go func(theChannel chan<- Message) {
defer theInputLock.Unlock()
if !multiInstance {
defer theInputLock.Unlock()
}

go func(theChannel chan<- Message) {
err = theInput.Collect(runCtx, theChannel)
Expand Down Expand Up @@ -231,7 +240,7 @@ func FLBPluginInputPreRun(useHotReload C.int) int {
registerWG.Wait()

var err error
err = prepareInputCollector()
err = prepareInputCollector(true)

if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
Expand All @@ -241,7 +250,6 @@ func FLBPluginInputPreRun(useHotReload C.int) int {
return input.FLB_OK
}


// FLBPluginInputPause this method gets invoked by the fluent-bit runtime, once the plugin has been
// paused, the plugin invoked this method and entering paused state.
//
Expand All @@ -252,6 +260,11 @@ func FLBPluginInputPause() {
runCancel = nil
}

if !theInputLock.TryLock() {
return
}
defer theInputLock.Unlock()

if theChannel != nil {
close(theChannel)
theChannel = nil
Expand All @@ -264,13 +277,34 @@ func FLBPluginInputPause() {
//export FLBPluginInputResume
func FLBPluginInputResume() {
var err error
err = prepareInputCollector()
err = prepareInputCollector(true)

if err != nil {
fmt.Fprintf(os.Stderr, "run: %s\n", err)
}
}

// FLBPluginOutputPreExit this method gets invoked by the fluent-bit runtime, once the plugin has been
// exited, the plugin invoked this method and entering exiting state.
//
//export FLBPluginOutputPreExit
func FLBPluginOutputPreExit() {
if runCancel != nil {
runCancel()
runCancel = nil
}

if !theInputLock.TryLock() {
return
}
defer theInputLock.Unlock()

if theChannel != nil {
close(theChannel)
theChannel = nil
}
}

//export FLBPluginOutputPreRun
func FLBPluginOutputPreRun(useHotReload C.int) int {
registerWG.Wait()
Expand Down Expand Up @@ -306,6 +340,9 @@ func FLBPluginOutputPreRun(useHotReload C.int) int {
// that gets past, for long-living collectors the plugin itself should keep a running thread and fluent-bit
// will not execute further callbacks.
//
// This function will invoke Collect only once to preserve backward
// compatible behavior. There are unit tests to enforce this behavior.
//
//export FLBPluginInputCallback
func FLBPluginInputCallback(data *unsafe.Pointer, csize *C.size_t) int {
initWG.Wait()
Expand Down
11 changes: 5 additions & 6 deletions cshared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func (t testPluginInputCallbackCtrlC) Collect(ctx context.Context, ch chan<- Mes
}

func init() {
initWG.Done()
registerWG.Done()
}

Expand All @@ -41,7 +40,7 @@ func TestInputCallbackCtrlC(t *testing.T) {
ptr := unsafe.Pointer(nil)

// prepare channel for input explicitly.
err := prepareInputCollector()
err := prepareInputCollector(false)
if err != nil {
t.Fail()
return
Expand Down Expand Up @@ -94,7 +93,7 @@ func TestInputCallbackDangle(t *testing.T) {
ptr := unsafe.Pointer(nil)

// prepare channel for input explicitly.
err := prepareInputCollector()
err := prepareInputCollector(false)
if err != nil {
t.Fail()
}
Expand Down Expand Up @@ -170,7 +169,7 @@ func TestInputCallbackInfinite(t *testing.T) {
ptr := unsafe.Pointer(nil)

// prepare channel for input explicitly.
err := prepareInputCollector()
err := prepareInputCollector(false)
if err != nil {
t.Fail()
return
Expand Down Expand Up @@ -258,7 +257,7 @@ func TestInputCallbackLatency(t *testing.T) {
cmsg := make(chan []byte)

// prepare channel for input explicitly.
err := prepareInputCollector()
err := prepareInputCollector(false)
if err != nil {
t.Fail()
return
Expand Down Expand Up @@ -391,7 +390,7 @@ func TestInputCallbackInfiniteConcurrent(t *testing.T) {
concurrentWait.Add(64)

// prepare channel for input explicitly.
err := prepareInputCollector()
err := prepareInputCollector(false)
if err != nil {
t.Fail()
}
Expand Down
1 change: 0 additions & 1 deletion plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ var (

func init() {
registerWG.Add(1)
initWG.Add(1)
theChannel = nil
}

Expand Down

0 comments on commit 8dbb935

Please sign in to comment.