Skip to content

Commit

Permalink
Merge pull request #20 from calyptia/handle-exposed-log-api
Browse files Browse the repository at this point in the history
input: output: Bind log api from fluent-bit
  • Loading branch information
niedbalski authored Jul 5, 2022
2 parents dce564b + 6edd02e commit 79d417f
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 16 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ type dummyPlugin struct {

// Init An instance of the configuration loader will be passed to the Init method so all the required
// configuration entries can be retrieved within the plugin context.
func (plug *dummyPlugin) Init(ctx context.Context, conf plugin.ConfigLoader, metrics plugin.Metrics) error {
plug.foo = conf.String("foo")
func (plug *dummyPlugin) Init(ctx context.Context, fbit *plugin.Fluentbit) error {
plug.foo = fbit.Conf.String("foo")
return nil
}

Expand Down Expand Up @@ -124,8 +124,8 @@ type dummyPlugin struct {
counterExample metric.Counter
}

func (plug *dummyPlugin) Init(ctx context.Context, conf plugin.ConfigLoader, metrics plugin.Metrics) error {
plug.counterExample = metrics.NewCounter("example_metric_total", "Total number of example metrics", "go-test-input-plugin")
func (plug *dummyPlugin) Init(ctx context.Context, fbit *plugin.Fluentbit) error {
plug.counterExample = fbit.Metrics.NewCounter("example_metric_total", "Total number of example metrics", "go-test-input-plugin")
return nil
}

Expand Down
65 changes: 63 additions & 2 deletions cshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

var unregister func()
var cmt *cmetrics.Context
var logger Logger

// FLBPluginRegister registers a plugin in the context of the fluent-bit runtime, a name and description
// can be provided.
Expand Down Expand Up @@ -79,15 +80,27 @@ func FLBPluginInit(ptr unsafe.Pointer) int {
if err != nil {
return input.FLB_ERROR
}
logger = &flbInputLogger{ptr: ptr}
fbit := &Fluentbit{
Conf: conf,
Metrics: makeMetrics(cmt),
Logger: logger,
}

err = theInput.Init(ctx, conf, makeMetrics(cmt))
err = theInput.Init(ctx, fbit)
} else {
conf := &flbOutputConfigLoader{ptr: ptr}
cmt, err = output.FLBPluginGetCMetricsContext(ptr)
if err != nil {
return output.FLB_ERROR
}
err = theOutput.Init(ctx, conf, makeMetrics(cmt))
logger = &flbOutputLogger{ptr: ptr}
fbit := &Fluentbit{
Conf: conf,
Metrics: makeMetrics(cmt),
Logger: logger,
}
err = theOutput.Init(ctx, fbit)
}
if err != nil {
fmt.Fprintf(os.Stderr, "init: %v\n", err)
Expand Down Expand Up @@ -315,6 +328,54 @@ func (f *flbOutputConfigLoader) String(key string) string {
return output.FLBPluginConfigKey(f.ptr, key)
}

type flbInputLogger struct {
ptr unsafe.Pointer
}

func (f *flbInputLogger) Error(format string, a ...any) {
message := fmt.Sprintf(format, a...)
input.FLBPluginLogPrint(f.ptr, input.FLB_LOG_ERROR, message)
}

func (f *flbInputLogger) Warn(format string, a ...any) {
message := fmt.Sprintf(format, a...)
input.FLBPluginLogPrint(f.ptr, input.FLB_LOG_WARN, message)
}

func (f *flbInputLogger) Info(format string, a ...any) {
message := fmt.Sprintf(format, a...)
input.FLBPluginLogPrint(f.ptr, input.FLB_LOG_INFO, message)
}

func (f *flbInputLogger) Debug(format string, a ...any) {
message := fmt.Sprintf(format, a...)
input.FLBPluginLogPrint(f.ptr, input.FLB_LOG_DEBUG, message)
}

type flbOutputLogger struct {
ptr unsafe.Pointer
}

func (f *flbOutputLogger) Error(format string, a ...any) {
message := fmt.Sprintf(format, a...)
output.FLBPluginLogPrint(f.ptr, output.FLB_LOG_ERROR, message)
}

func (f *flbOutputLogger) Warn(format string, a ...any) {
message := fmt.Sprintf(format, a...)
output.FLBPluginLogPrint(f.ptr, output.FLB_LOG_WARN, message)
}

func (f *flbOutputLogger) Info(format string, a ...any) {
message := fmt.Sprintf(format, a...)
output.FLBPluginLogPrint(f.ptr, output.FLB_LOG_INFO, message)
}

func (f *flbOutputLogger) Debug(format string, a ...any) {
message := fmt.Sprintf(format, a...)
output.FLBPluginLogPrint(f.ptr, output.FLB_LOG_DEBUG, message)
}

func makeMetrics(cmp *cmetrics.Context) Metrics {
return &metricbuilder.Builder{
Namespace: "fluentbit",
Expand Down
11 changes: 8 additions & 3 deletions examples/in_gdummy/in_gdummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ func init() {
type gdummyPlugin struct {
counterSuccess metric.Counter
counterFailure metric.Counter
log plugin.Logger
}

func (plug *gdummyPlugin) Init(ctx context.Context, conf plugin.ConfigLoader, metrics plugin.Metrics) error {
plug.counterSuccess = metrics.NewCounter("operation_succeeded_total", "Total number of succeeded operations", "gdummy")
plug.counterFailure = metrics.NewCounter("operation_failed_total", "Total number of failed operations", "gdummy")
func (plug *gdummyPlugin) Init(ctx context.Context, fbit *plugin.Fluentbit) error {
plug.counterSuccess = fbit.Metrics.NewCounter("operation_succeeded_total", "Total number of succeeded operations", "gdummy")
plug.counterFailure = fbit.Metrics.NewCounter("operation_failed_total", "Total number of failed operations", "gdummy")
plug.log = fbit.Logger

return nil
}

Expand All @@ -33,13 +36,15 @@ func (plug gdummyPlugin) Collect(ctx context.Context, ch chan<- plugin.Message)
err := ctx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
plug.counterFailure.Add(1)
plug.log.Error("[gdummy] operation failed")

return err
}

return nil
case <-tick.C:
plug.counterSuccess.Add(1)
plug.log.Debug("[gdummy] operation succeeded")

ch <- plugin.Message{
Time: time.Now(),
Expand Down
11 changes: 11 additions & 0 deletions input/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ struct flb_api {
char *(*input_get_property) (char *, void *);
void *__;
void *(*input_get_cmt_instance) (void *);
void (*log_print) (int, const char*, int, const char*, ...);
int (*input_log_check) (void *, int);
int ___;
};

struct flb_plugin_proxy_context {
Expand Down Expand Up @@ -55,4 +58,12 @@ void *input_get_cmt_instance(void *plugin)
return p->api->input_get_cmt_instance(p->i_ins);
}

void input_log_print_novar(void *plugin, int log_level, const char* message)
{
struct flbgo_input_plugin *p = plugin;
if (p->api->input_log_check(p->i_ins, log_level)) {
p->api->log_print(log_level, NULL, 0, message);
}
}

#endif
6 changes: 6 additions & 0 deletions input/flb_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
#define FLB_PROXY_INPUT_PLUGIN 1
#define FLB_PROXY_GOLANG 11

/* Message Types */
#define FLB_LOG_ERROR 1
#define FLB_LOG_WARN 2
#define FLB_LOG_INFO 3 /* default */
#define FLB_LOG_DEBUG 4

/* This structure is used for registration.
* It matches the one in flb_plugin_proxy.h in fluent-bit source code.
*/
Expand Down
11 changes: 11 additions & 0 deletions input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ const (

FLB_PROXY_INPUT_PLUGIN = C.FLB_PROXY_INPUT_PLUGIN
FLB_PROXY_GOLANG = C.FLB_PROXY_GOLANG

FLB_LOG_ERROR = C.FLB_LOG_ERROR
FLB_LOG_WARN = C.FLB_LOG_WARN
FLB_LOG_INFO = C.FLB_LOG_INFO
FLB_LOG_DEBUG = C.FLB_LOG_DEBUG
)

// Local type to define a plugin definition
Expand Down Expand Up @@ -77,3 +82,9 @@ func FLBPluginGetCMetricsContext(plugin unsafe.Pointer) (*cmetrics.Context, erro
cmt := unsafe.Pointer(ctx)
return cmetrics.NewContextFromCMTPointer(cmt)
}

func FLBPluginLogPrint(plugin unsafe.Pointer, log_level C.int, message string) {
_message := C.CString(message)
C.input_log_print_novar(plugin, log_level, _message)
C.free(unsafe.Pointer(_message))
}
11 changes: 11 additions & 0 deletions output/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ struct flb_api {
char *_;
void *(*output_get_cmt_instance) (void *);
void *__;
void (*log_print) (int, const char*, int, const char*, ...);
int ___;
int (*output_log_check) (void *, int);
};

struct flb_plugin_proxy_context {
Expand Down Expand Up @@ -53,4 +56,12 @@ void *output_get_cmt_instance(void *plugin)
return p->api->output_get_cmt_instance(p->o_ins);
}

void output_log_print_novar(void *plugin, int log_level, const char* message)
{
struct flbgo_output_plugin *p = plugin;
if (p->api->output_log_check(p->o_ins, log_level)) {
p->api->log_print(log_level, NULL, 0, message);
}
}

#endif
6 changes: 6 additions & 0 deletions output/flb_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
#define FLB_PROXY_OUTPUT_PLUGIN 2
#define FLB_PROXY_GOLANG 11

/* Message Types */
#define FLB_LOG_ERROR 1
#define FLB_LOG_WARN 2
#define FLB_LOG_INFO 3 /* default */
#define FLB_LOG_DEBUG 4

/* This structure is used for registration.
* It matches the one in flb_plugin_proxy.h in fluent-bit source code.
*/
Expand Down
11 changes: 11 additions & 0 deletions output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ const (

FLB_PROXY_OUTPUT_PLUGIN = C.FLB_PROXY_OUTPUT_PLUGIN
FLB_PROXY_GOLANG = C.FLB_PROXY_GOLANG

FLB_LOG_ERROR = C.FLB_LOG_ERROR
FLB_LOG_WARN = C.FLB_LOG_WARN
FLB_LOG_INFO = C.FLB_LOG_INFO
FLB_LOG_DEBUG = C.FLB_LOG_DEBUG
)

// Local type to define a plugin definition
Expand Down Expand Up @@ -100,3 +105,9 @@ func FLBPluginGetCMetricsContext(plugin unsafe.Pointer) (*cmetrics.Context, erro
cmt := unsafe.Pointer(ctx)
return cmetrics.NewContextFromCMTPointer(cmt)
}

func FLBPluginLogPrint(plugin unsafe.Pointer, log_level C.int, message string) {
_message := C.CString(message)
C.output_log_print_novar(plugin, log_level, _message)
C.free(unsafe.Pointer(_message))
}
18 changes: 16 additions & 2 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,21 @@ func init() {
initWG.Add(1)
}

type Fluentbit struct {
Conf ConfigLoader
Metrics Metrics
Logger Logger
}

// InputPlugin interface to represent an input fluent-bit plugin.
type InputPlugin interface {
Init(ctx context.Context, conf ConfigLoader, metrics Metrics) error
Init(ctx context.Context, fbit *Fluentbit) error
Collect(ctx context.Context, ch chan<- Message) error
}

// OutputPlugin interface to represent an output fluent-bit plugin.
type OutputPlugin interface {
Init(ctx context.Context, conf ConfigLoader, metrics Metrics) error
Init(ctx context.Context, fbit *Fluentbit) error
Flush(ctx context.Context, ch <-chan Message) error
}

Expand All @@ -52,6 +58,14 @@ type ConfigLoader interface {
String(key string) string
}

// Logger interface to represent a fluent-bit logging mechanism.
type Logger interface {
Error(format string, a ...any)
Warn(format string, a ...any)
Info(format string, a ...any)
Debug(format string, a ...any)
}

// Metrics builder.
type Metrics interface {
NewCounter(name, desc string, labelValues ...string) metric.Counter
Expand Down
10 changes: 7 additions & 3 deletions testdata/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ func init() {
type inputPlugin struct {
foo string
collectCounter metric.Counter
log plugin.Logger
}

func (plug *inputPlugin) Init(ctx context.Context, conf plugin.ConfigLoader, metrics plugin.Metrics) error {
plug.foo = conf.String("foo")
plug.collectCounter = metrics.NewCounter("collect_total", "Total number of collects", "go-test-input-plugin")
func (plug *inputPlugin) Init(ctx context.Context, fbit *plugin.Fluentbit) error {
plug.foo = fbit.Conf.String("foo")
plug.collectCounter = fbit.Metrics.NewCounter("collect_total", "Total number of collects", "go-test-input-plugin")
plug.log = fbit.Logger
return nil
}

Expand All @@ -32,12 +34,14 @@ func (plug inputPlugin) Collect(ctx context.Context, ch chan<- plugin.Message) e
case <-ctx.Done():
err := ctx.Err()
if err != nil && !errors.Is(err, context.Canceled) {
plug.log.Error("[go-test-input-plugin] operation failed")
return err
}

return nil
case <-tick.C:
plug.collectCounter.Add(1)
plug.log.Info("[go-test-input-plugin] operation succeeded")

ch <- plugin.Message{
Time: time.Now(),
Expand Down
9 changes: 7 additions & 2 deletions testdata/output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,31 @@ func init() {

type outputPlugin struct {
flushCounter metric.Counter
log plugin.Logger
}

func (plug *outputPlugin) Init(ctx context.Context, conf plugin.ConfigLoader, metrics plugin.Metrics) error {
plug.flushCounter = metrics.NewCounter("flush_total", "Total number of flushes", "go-test-output-plugin")
func (plug *outputPlugin) Init(ctx context.Context, fbit *plugin.Fluentbit) error {
plug.flushCounter = fbit.Metrics.NewCounter("flush_total", "Total number of flushes", "go-test-output-plugin")
plug.log = fbit.Logger
return nil
}

func (plug outputPlugin) Flush(ctx context.Context, ch <-chan plugin.Message) error {
f, err := os.Create("/fluent-bit/etc/output.txt")
if err != nil {
plug.log.Error("[go-test-output-plugin] operation failed. reason %w", err)
return fmt.Errorf("could not open output.txt: %w", err)
}

defer f.Close()

for msg := range ch {
plug.flushCounter.Add(1)
plug.log.Info("[go-test-output-plugin] operation proceeded")

_, err := fmt.Fprintf(f, "message=\"got record\" tag=%s time=%s record=%+v\n", msg.Tag(), msg.Time.Format(time.RFC3339), msg.Record)
if err != nil {
plug.log.Error("[go-test-output-plugin] operation failed. reason %w", err)
return fmt.Errorf("could not write to output.txt: %w", err)
}
}
Expand Down

0 comments on commit 79d417f

Please sign in to comment.