Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for polling watcher #126

Merged
merged 2 commits into from
Aug 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/fluentbitoperator/v1alpha2/fluentbit_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
type FluentBitSpec struct {
// Fluent Bit image.
Image string `json:"image,omitempty"`
// Fluent Bit Watcher command line arguments.
Args []string `json:"args,omitempty"`
// Fluent Bit image pull policy.
ImagePullPolicy corev1.PullPolicy `json:"imagePullPolicy,omitempty"`
// Fluent Bit image pull secret
Expand Down
15 changes: 6 additions & 9 deletions cmd/fluent-bit-watcher/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
FROM golang:1.13.6-alpine3.11 as buildergo
RUN mkdir -p /fluent-bit
COPY cmd/fluent-bit-watcher/main.go go.mod /fluent-bit/
WORKDIR /fluent-bit
RUN CGO_ENABLED=0 go build -i -ldflags '-w -s' -o fluent-bit main.go
RUN mkdir -p /code
COPY . /code/
WORKDIR /code
RUN echo $(ls -al /code)
RUN CGO_ENABLED=0 go build -i -ldflags '-w -s' -o /fluent-bit/fluent-bit /code/cmd/fluent-bit-watcher/main.go

# FROM gcr.io/distroless/cc-debian10
FROM fluent/fluent-bit:1.8.3
MAINTAINER KubeSphere <[email protected]>
LABEL Description="Fluent Bit docker image" Vendor="KubeSphere" Version="1.0"

COPY conf/fluent-bit.conf conf/parsers.conf /fluent-bit/etc/
COPY --from=buildergo /fluent-bit/fluent-bit /fluent-bit/bin/fluent-bit-watcher

#
EXPOSE 2020

# Entry point
CMD ["/fluent-bit/bin/fluent-bit-watcher", "-c", "/fluent-bit/etc/fluent-bit.conf"]
ENTRYPOINT ["/fluent-bit/bin/fluent-bit-watcher"]
58 changes: 47 additions & 11 deletions cmd/fluent-bit-watcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"flag"
"math"
"os"
"os/exec"
Expand All @@ -14,14 +15,17 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"kubesphere.io/fluentbit-operator/pkg/filenotify"
)

const (
binPath = "/fluent-bit/bin/fluent-bit"
cfgPath = "/fluent-bit/etc/fluent-bit.conf"
watchDir = "/fluent-bit/config"
MaxDelayTime = time.Minute * 5
ResetTime = time.Minute * 10
defaultBinPath = "/fluent-bit/bin/fluent-bit"
defaultCfgPath = "/fluent-bit/etc/fluent-bit.conf"
defaultWatchDir = "/fluent-bit/config"
defaultPollInterval = 1 * time.Second

MaxDelayTime = 5 * time.Minute
ResetTime = 10 * time.Minute
)

var (
Expand All @@ -33,7 +37,22 @@ var (
timerCancel context.CancelFunc
)

var configPath string
var binPath string
var watchPath string
var poll bool
var pollInterval time.Duration

func main() {

flag.StringVar(&binPath, "b", defaultBinPath, "The fluent bit binary path.")
flag.StringVar(&configPath, "c", defaultCfgPath, "The config file path.")
flag.StringVar(&watchPath, "watch-path", defaultWatchDir, "The path to watch.")
flag.BoolVar(&poll, "poll", false, "Use poll watcher instead of ionotify.")
flag.DurationVar(&pollInterval, "poll-interval", defaultPollInterval, "Poll interval if using poll watcher.")

flag.Parse()

logger = log.NewLogfmtLogger(os.Stdout)

timerCtx, timerCancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -77,14 +96,14 @@ func main() {
}
{
// Watch the config file, if the config file changed, stop Fluent bit.
watcher, err := fsnotify.NewWatcher()
watcher, err := newWatcher(poll, pollInterval)
if err != nil {
_ = level.Error(logger).Log("err", err)
return
}

// Start watcher.
err = watcher.Add(watchDir)
err = watcher.Add(watchPath)
if err != nil {
_ = level.Error(logger).Log("err", err)
return
Expand All @@ -98,7 +117,7 @@ func main() {
select {
case <-cancel:
return nil
case event := <-watcher.Events:
case event := <-watcher.Events():
if !isValidEvent(event) {
continue
}
Expand All @@ -110,7 +129,7 @@ func main() {
stop()
resetTimer()
_ = level.Info(logger).Log("msg", "Config file changed, stopped Fluent Bit")
case <-watcher.Errors:
case <-watcher.Errors():
_ = level.Error(logger).Log("msg", "Watcher stopped")
return nil
}
Expand All @@ -130,9 +149,26 @@ func main() {
_ = level.Info(logger).Log("msg", "See you next time!")
}

func newWatcher(poll bool, interval time.Duration) (filenotify.FileWatcher, error) {
var err error
var watcher filenotify.FileWatcher

if poll {
watcher = filenotify.NewPollingWatcher(interval)
} else {
watcher, err = filenotify.New(interval)
}

if err != nil {
return nil, err
}

return watcher, nil
}

// Inspired by https://github.com/jimmidyson/configmap-reload
func isValidEvent(event fsnotify.Event) bool {
return event.Op&fsnotify.Create == fsnotify.Create
return event.Op == fsnotify.Create || event.Op == fsnotify.Write
}

func start() {
Expand All @@ -144,7 +180,7 @@ func start() {
return
}

cmd = exec.Command(binPath, "-c", cfgPath)
cmd = exec.Command(binPath, "-c", configPath)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions config/crd/bases/logging.kubesphere.io_fluentbits.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,11 @@ spec:
type: array
type: object
type: object
args:
description: Fluent Bit Watcher command line arguments.
items:
type: string
type: array
containerLogRealPath:
description: Container log path
type: string
Expand Down
5 changes: 5 additions & 0 deletions manifests/setup/fluentbit-operator-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1335,6 +1335,11 @@ spec:
type: array
type: object
type: object
args:
description: Fluent Bit Watcher command line arguments.
items:
type: string
type: array
containerLogRealPath:
description: Container log path
type: string
Expand Down
8 changes: 7 additions & 1 deletion manifests/setup/setup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1340,6 +1340,11 @@ spec:
type: array
type: object
type: object
args:
description: Fluent Bit Watcher command line arguments.
items:
type: string
type: array
containerLogRealPath:
description: Container log path
type: string
Expand Down Expand Up @@ -4351,7 +4356,8 @@ spec:
- command:
- /bin/sh
- -c
- set -ex; echo CONTAINER_ROOT_DIR=$(docker info -f '{{.DockerRootDir}}') > /fluentbit-operator/fluent-bit.env
- set -ex; echo CONTAINER_ROOT_DIR=$(docker info -f '{{.DockerRootDir}}')
> /fluentbit-operator/fluent-bit.env
image: docker:19.03
name: setenv
volumeMounts:
Expand Down
53 changes: 53 additions & 0 deletions pkg/filenotify/filenotify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Adapted from https://github.com/gohugoio/hugo
// Apache License 2.0
// Copyright Hugo Authors
//
// Package filenotify provides a mechanism for watching file(s) for changes.
// Generally leans on fsnotify, but provides a poll-based notifier which fsnotify does not support.
// These are wrapped up in a common interface so that either can be used interchangeably in your code.
//
// This package is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License.
// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9
package filenotify

import (
"time"

"github.com/fsnotify/fsnotify"
)

// FileWatcher is an interface for implementing file notification watchers
type FileWatcher interface {
Events() <-chan fsnotify.Event
Errors() <-chan error
Add(name string) error
Remove(name string) error
Close() error
}

// New tries to use an fs-event watcher, and falls back to the poller if there is an error
func New(interval time.Duration) (FileWatcher, error) {
if watcher, err := NewEventWatcher(); err == nil {
return watcher, nil
}
return NewPollingWatcher(interval), nil
}

// NewPollingWatcher returns a poll-based file watcher
func NewPollingWatcher(interval time.Duration) FileWatcher {
return &filePoller{
interval: interval,
done: make(chan struct{}),
events: make(chan fsnotify.Event),
errors: make(chan error),
}
}

// NewEventWatcher returns an fs-event based file watcher
func NewEventWatcher() (FileWatcher, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
return &fsNotifyWatcher{watcher}, nil
}
24 changes: 24 additions & 0 deletions pkg/filenotify/fsnotify.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Adapted from https://github.com/gohugoio/hugo
// Apache License 2.0
// Copyright Hugo Authors
//
// Package filenotify is adapted from https://github.com/moby/moby/tree/master/pkg/filenotify, Apache-2.0 License.
// Hopefully this can be replaced with an external package sometime in the future, see https://github.com/fsnotify/fsnotify/issues/9
package filenotify

import "github.com/fsnotify/fsnotify"

// fsNotifyWatcher wraps the fsnotify package to satisfy the FileNotifier interface
type fsNotifyWatcher struct {
*fsnotify.Watcher
}

// Events returns the fsnotify event channel receiver
func (w *fsNotifyWatcher) Events() <-chan fsnotify.Event {
return w.Watcher.Events
}

// Errors returns the fsnotify error channel receiver
func (w *fsNotifyWatcher) Errors() <-chan error {
return w.Watcher.Errors
}
Loading