diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index fb03ab9bf1a3..30aa45584961 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -109,3 +109,6 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...master[Check the HEAD di *Winlogbeat* *Functionbeat* +*Journalbeat* + +- Journalbeat requires at least systemd v233 in order to follow entries after journal changes (rotation, vacuum). diff --git a/NOTICE.txt b/NOTICE.txt index 07145abc5741..56fa8db63e20 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -241,8 +241,8 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -------------------------------------------------------------------- Dependency: github.com/coreos/go-systemd -Version: v17 -Revision: 39ca1b05acc7ad1220e09f133283b8859a8b71ab +Version: v18 +Revision: 9002847aa1425fb6ac49077c0a630b3b67e0fbfd License type (autodetected): Apache-2.0 ./vendor/github.com/coreos/go-systemd/LICENSE: -------------------------------------------------------------------- diff --git a/journalbeat/include/seccomp_linux_386.go b/journalbeat/include/seccomp_linux_386.go deleted file mode 100644 index a22b1e2e4fd4..000000000000 --- a/journalbeat/include/seccomp_linux_386.go +++ /dev/null @@ -1,122 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Code generated by seccomp-profiler - DO NOT EDIT. - -// +build linux,386 - -package include - -import ( - "github.com/elastic/go-seccomp-bpf" - - beat "github.com/elastic/beats/libbeat/common/seccomp" -) - -func init() { - beat.MustRegisterPolicy(&seccomp.Policy{ - DefaultAction: seccomp.ActionErrno, - Syscalls: []seccomp.SyscallGroup{ - { - Action: seccomp.ActionAllow, - Names: []string{ - "_llseek", - "access", - "brk", - "clock_gettime", - "clone", - "close", - "dup", - "dup2", - "epoll_create", - "epoll_create1", - "epoll_ctl", - "epoll_wait", - "exit", - "exit_group", - "fchdir", - "fchmod", - "fchown32", - "fcntl", - "fcntl64", - "fdatasync", - "flock", - "fstat64", - "fsync", - "ftruncate64", - "futex", - "getcwd", - "getdents", - "getdents64", - "getpid", - "getppid", - "getrandom", - "getrlimit", - "getrusage", - "gettid", - "gettimeofday", - "ioctl", - "kill", - "lstat64", - "madvise", - "mincore", - "mkdirat", - "mmap2", - "mprotect", - "munmap", - "nanosleep", - "open", - "openat", - "pipe", - "pipe2", - "poll", - "pread64", - "prlimit64", - "pwrite64", - "read", - "readlink", - "readlinkat", - "rename", - "renameat", - "rt_sigaction", - "rt_sigprocmask", - "rt_sigreturn", - "sched_getaffinity", - "sched_yield", - "sendfile64", - "set_robust_list", - "set_thread_area", - "setitimer", - "sigaltstack", - "socketcall", - "splice", - "stat", - "stat64", - "tgkill", - "time", - "tkill", - "uname", - "unlink", - "unlinkat", - "wait4", - "write", - "writev", - }, - }, - }, - }) -} diff --git a/journalbeat/include/seccomp_linux_amd64.go b/journalbeat/include/seccomp_linux_amd64.go deleted file mode 100644 index 683d3d576c13..000000000000 --- a/journalbeat/include/seccomp_linux_amd64.go +++ /dev/null @@ -1,135 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -// Code generated by seccomp-profiler - DO NOT EDIT. - -// +build linux,amd64 - -package include - -import ( - "github.com/elastic/go-seccomp-bpf" - - beat "github.com/elastic/beats/libbeat/common/seccomp" -) - -func init() { - beat.MustRegisterPolicy(&seccomp.Policy{ - DefaultAction: seccomp.ActionErrno, - Syscalls: []seccomp.SyscallGroup{ - { - Action: seccomp.ActionAllow, - Names: []string{ - "accept", - "accept4", - "access", - "arch_prctl", - "bind", - "clock_gettime", - "clone", - "close", - "connect", - "dup", - "dup2", - "epoll_create", - "epoll_create1", - "epoll_ctl", - "epoll_pwait", - "exit", - "exit_group", - "fchdir", - "fchmod", - "fchown", - "fcntl", - "fdatasync", - "flock", - "fstat", - "fsync", - "ftruncate", - "futex", - "getcwd", - "getdents", - "getdents64", - "getpeername", - "getpid", - "getppid", - "getrandom", - "getrlimit", - "getrusage", - "getsockname", - "getsockopt", - "gettid", - "gettimeofday", - "ioctl", - "kill", - "listen", - "lseek", - "lstat", - "madvise", - "mincore", - "mkdirat", - "mmap", - "mprotect", - "munmap", - "nanosleep", - "newfstatat", - "open", - "openat", - "pipe", - "pipe2", - "poll", - "pread64", - "pwrite64", - "read", - "readlink", - "readlinkat", - "recvfrom", - "recvmmsg", - "recvmsg", - "rename", - "renameat", - "rt_sigaction", - "rt_sigprocmask", - "rt_sigreturn", - "sched_getaffinity", - "sched_yield", - "sendfile", - "sendmmsg", - "sendmsg", - "sendto", - "set_robust_list", - "setitimer", - "setsockopt", - "shutdown", - "sigaltstack", - "socket", - "splice", - "stat", - "tgkill", - "time", - "tkill", - "uname", - "unlink", - "unlinkat", - "wait4", - "write", - "writev", - }, - }, - }, - }) -} diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index b59d475316a6..70d9bcb6874d 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -189,10 +189,10 @@ func (i *Input) publishAll() { // Stop stops all readers of the input. func (i *Input) Stop() { - i.client.Close() for _, r := range i.readers { r.Close() } + i.client.Close() } // Wait waits until all readers are done. diff --git a/journalbeat/reader/journal.go b/journalbeat/reader/journal.go index 482aaa83efba..fac946110200 100644 --- a/journalbeat/reader/journal.go +++ b/journalbeat/reader/journal.go @@ -42,12 +42,11 @@ import ( // Reader reads entries from journal(s). type Reader struct { - journal *sdjournal.Journal - config Config - done chan struct{} - logger *logp.Logger - backoff backoff.Backoff - changesChan chan int + journal *sdjournal.Journal + config Config + done chan struct{} + logger *logp.Logger + backoff backoff.Backoff } // New creates a new journal reader and moves the FP to the configured position. @@ -97,21 +96,16 @@ func newReader(logger *logp.Logger, done chan struct{}, c Config, journal *sdjou } r := &Reader{ - journal: journal, - config: c, - done: done, - logger: logger, - backoff: backoff.NewExpBackoff(done, c.Backoff, c.MaxBackoff), - changesChan: make(chan int), + journal: journal, + config: c, + done: done, + logger: logger, + backoff: backoff.NewExpBackoff(done, c.Backoff, c.MaxBackoff), } r.seek(state.Cursor) instance.AddJournalToMonitor(c.Path, journal) - // waiting for journal changes are done in a separate gorountine - // when the reader is closed, it is stopped - go r.waitForChange() - return r, nil } @@ -188,84 +182,57 @@ func (r *Reader) seek(cursor string) { // Next waits until a new event shows up and returns it. // It blocks until an event is returned or an error occurs. func (r *Reader) Next() (*beat.Event, error) { - event, err := r.readEvent() - if err != nil { - return nil, err - } - if event != nil { - return event, nil - } - - return r.waitUntilNewEventOrError() -} - -func (r *Reader) waitUntilNewEventOrError() (*beat.Event, error) { for { select { case <-r.done: return nil, nil - case c := <-r.changesChan: - switch c { - // no changes - case sdjournal.SD_JOURNAL_NOP: - // new entries are added or the journal has changed (e.g. vacuum, rotate) - case sdjournal.SD_JOURNAL_APPEND, sdjournal.SD_JOURNAL_INVALIDATE: - event, err := r.readEvent() - if err != nil { - return nil, err - } - - if event == nil { - r.backoff.Wait() - continue - } - - r.backoff.Reset() - return event, nil - default: - if c < 0 { - return nil, fmt.Errorf("error while waiting for event: %+v", syscall.Errno(-c)) - } - - r.logger.Errorf("Unknown return code from Wait: %d\n", c) - } + default: } - } -} -func (r *Reader) waitForChange() { - for { - // try to return before waiting for event - select { - case <-r.done: - return - default: + c, err := r.journal.Next() + if err != nil && err != io.EOF { + return nil, err } - c := r.journal.Wait(100 * time.Millisecond) - select { - case <-r.done: - return - case r.changesChan <- c: + // error while reading next entry + if c < 0 { + return nil, fmt.Errorf("error while reading next entry %+v", syscall.Errno(-c)) } - } -} -func (r *Reader) readEvent() (*beat.Event, error) { - n, err := r.journal.Next() - if err != nil && err != io.EOF { - return nil, err - } + // no new entry, so wait + if c == 0 { + hasNewEntry, err := r.checkForNewEvents() + if err != nil { + return nil, err + } + if !hasNewEntry { + continue + } + } - for n == 1 { entry, err := r.journal.GetEntry() if err != nil { return nil, err } event := r.toEvent(entry) + return event, nil } - return nil, nil +} + +func (r *Reader) checkForNewEvents() (bool, error) { + c := r.journal.Wait(100 * time.Millisecond) + switch c { + case sdjournal.SD_JOURNAL_NOP: + return false, nil + // new entries are added or the journal has changed (e.g. vacuum, rotate) + case sdjournal.SD_JOURNAL_APPEND, sdjournal.SD_JOURNAL_INVALIDATE: + return true, nil + default: + } + + r.logger.Errorf("Unknown return code from Wait: %d\n", c) + return false, nil } // toEvent creates a beat.Event from journal entries. @@ -321,5 +288,4 @@ func (r *Reader) convertNamedField(fc fieldConversion, value string) interface{} func (r *Reader) Close() { instance.StopMonitoringJournal(r.config.Path) r.journal.Close() - close(r.changesChan) } diff --git a/libbeat/common/seccomp/policy_linux_amd64.go b/libbeat/common/seccomp/policy_linux_amd64.go index 00e5353b4f9c..a131e7f3c344 100644 --- a/libbeat/common/seccomp/policy_linux_amd64.go +++ b/libbeat/common/seccomp/policy_linux_amd64.go @@ -54,6 +54,7 @@ func init() { "fdatasync", "flock", "fstat", + "fstatfs", "fsync", "ftruncate", "futex", @@ -66,6 +67,7 @@ func init() { "getpid", "getppid", "getrandom", + "getrlimit", "getrusage", "getsockname", "getsockopt", @@ -93,6 +95,7 @@ func init() { "pipe", "pipe2", "poll", + "ppoll", "pread64", "pselect6", "pwrite64", @@ -119,6 +122,7 @@ func init() { "shutdown", "sigaltstack", "socket", + "splice", "stat", "statfs", "sysinfo", diff --git a/vendor/vendor.json b/vendor/vendor.json index 221a06217dda..d858864327a1 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -498,10 +498,10 @@ { "checksumSHA1": "cEszpxh1szqTb440ze4hm/Vfm40=", "path": "github.com/coreos/go-systemd/sdjournal", - "revision": "39ca1b05acc7ad1220e09f133283b8859a8b71ab", - "revisionTime": "2018-05-11T13:34:05Z", - "version": "v17", - "versionExact": "v17" + "revision": "9002847aa1425fb6ac49077c0a630b3b67e0fbfd", + "revisionTime": "2018-10-31T08:50:51Z", + "version": "v18", + "versionExact": "v18" }, { "checksumSHA1": "nwll5KjH9SKrWxB45OmN09wdipI=",