Skip to content

A Golang Package to build fluent-bit input plugins

License

Notifications You must be signed in to change notification settings

chronosphereio/calyptia-plugin

Repository files navigation

Go Reference codecov

Fluent-bit plugin

This repository contains the required Golang interfaces and data types required to create Fluent bit Golang plugins.

Architectural overview

Plugins are implemented via proxy callbacks, fluent-bit implements a generic proxy approach that can be used to load and invoke DSO object callbacks.

The following is the state machine of callbacks.

stateDiagram-v2
    state "fluent-bit engine starts" as start
    start --> FLBPluginRegister
    FLBPluginRegister --> FLBPluginInit : Locate configured plugins
    FLBPluginInit --> FLB_ERR
    FLB_ERR --> FLBPluginUnregister
    FLBPluginInit --> FLB_OK
    FLB_OK --> instance

    state "plugin instance" as instance
    instance --> FLBPluginInputCallback: collect() data
    FLBPluginInputCallback --> WriteMsgPack

    instance --> FLBPluginOutputCallback : Flush(data)
    FLBPluginOutputCallback --> WriteMsgPack
    WriteMsgPack --> [*]
Loading

Writing a plugin

Plugins can be written for output and input processing.

As an example, the following are the minimum steps for writing an input plugin. Declaring a plugin requires to implement the InputPlugin interface. Explicitly defining the 2 methods Init and Collect:

package main

import (
	"context"
	"errors"
	"time"

	"github.com/calyptia/plugin"
)

// Plugin needs to be registered as an input type plugin in the initialisation phase
func init() {
	plugin.RegisterInput("go-test-input-plugin", "Golang input plugin for testing", &dummyPlugin{})
}

type dummyPlugin struct {
	foo string
}

// Init An instance of the configuration loader will be passed to the Init method so all the required
// configuration entries can be retrieved within the plugin context.
func (plug *dummyPlugin) Init(ctx context.Context, fbit *plugin.Fluentbit) error {
	plug.foo = fbit.Conf.String("foo")
	return nil
}

// Collect this method will be invoked by the fluent-bit engine after the initialisation is successful
// this method can lock as each plugin its implemented in its own thread. Be aware that the main context
// can be cancelled at any given time, so handle the context properly within this method.
// The *ch* channel parameter, is a channel handled by the runtime that will receive messages from the plugin
// collection, make sure to validate channel closure and to follow the `plugin.Message` struct for messages
// generated by plugins.
func (plug dummyPlugin) Collect(ctx context.Context, ch chan<- plugin.Message) error {
	tick := time.NewTicker(time.Second)

	for {
		select {
		case <-ctx.Done():
			err := ctx.Err()
			if err != nil && !errors.Is(err, context.Canceled) {
				return err
			}

			return nil
		case <-tick.C:
			ch <- plugin.Message{
				Time: time.Now(),
				Record: map[string]string{
					"message": "hello from go-test-input-plugin",
					"foo":     plug.foo,
				},
			}
		}
	}
}

func main() {}

Adding metrics

Plugin can share their metrics over fluent-bit proxy interface. As an example, the following are the minimum steps for sharing plugin's metrics, Using a metric interface to implement the plugin's metrics.

package main

import (
	"context"
	"errors"
	"time"

	"github.com/calyptia/plugin"
	"github.com/calyptia/plugin/metric"
)

type dummyPlugin struct {
	counterExample metric.Counter
}

func (plug *dummyPlugin) Init(ctx context.Context, fbit *plugin.Fluentbit) error {
	plug.counterExample = fbit.Metrics.NewCounter("example_metric_total", "Total number of example metrics", "go-test-input-plugin")
	return nil
}

func (plug dummyPlugin) Collect(ctx context.Context, ch chan<- plugin.Message) error {
	tick := time.NewTicker(time.Second)

	for {
		select {
		case <-ctx.Done():
			err := ctx.Err()
			if err != nil && !errors.Is(err, context.Canceled) {
				return err
			}

			return nil
		case <-tick.C:
			plug.counterExample.Add(1)

			ch <- plugin.Message{
				Time: time.Now(),
				Record: map[string]string{
					"message": "hello from go-test-input-plugin",
					"foo":     plug.foo,
				},
			}
		}
	}
}

func main() {}

Building a plugin

A plugin can be built locally using go build as:

go build -trimpath -buildmode c-shared -o ./bin/go-test-input-plugin.so .

Or compiled to linux/amd64 from another machine using zig (Example working on darwin/arm64).

CGO_ENABLED=1 \
GOOS=linux \
GOARCH=amd64 \
CC="zig cc -target x86_64-linux-gnu -isystem /usr/include -L/usr/lib/x86_64-linux-gnu" \
CXX="zig c++ -target x86_64-linux-gnu -isystem /usr/include -L/usr/lib/x86_64-linux-gnu" \
go build -buildmode=c-shared -trimpath -o ./my-plugin-linux-amd64.so ./...

Or using a Dockerfile as follows:

FROM golang:latest AS builder

WORKDIR /fluent-bit

COPY go.mod .
COPY go.sum .

RUN go mod download
RUN go mod verify

COPY . .

RUN go build -trimpath -buildmode c-shared -o ./bin/go-test-input-plugin.so .

FROM ghcr.io/calyptia/enterprise/advanced:main

COPY --from=builder /fluent-bit/bin/go-test-input-plugin.so /fluent-bit/etc/

ENTRYPOINT [ "/fluent-bit/bin/fluent-bit" ]
CMD [ "/fluent-bit/bin/fluent-bit", "-c", "/fluent-bit/etc/fluent-bit.conf" ]

Then create a fluent-bit.conf as follows:

[SERVICE]
    flush           1
    log_level       info
    plugins_file    /fluent-bit/etc/plugins.conf

[INPUT]
    Name go-test-input-plugin
    Tag  test-input
    foo  bar

Also a plugins.conf definition has to be provided, as follows:

[PLUGINS]
    Path /fluent-bit/etc/go-test-input-plugin.so

Run the docker container as follows:

docker build -t my-fluent-bit-plugin:main .
docker run --platform=linux/amd64 -v $(pwd)/fluent-bit.conf:/fluent-bit/etc/fluent-bit.conf:ro -v $(pwd)/plugins.conf:/fluent-bit/etc/plugins.conf:ro my-fluent-bit-plugin:main

For further examples, please check the examples or testdata folders.

Running tests

Running the local tests must be doable with:

go test -v ./...

Contributing

Please feel free to open PR(s) on this repository and to report any bugs of feature requests on the GitHub issues page.