Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize Journalbeat #8277

Merged

Conversation

kvch
Copy link
Contributor

@kvch kvch commented Sep 10, 2018

This is the first PR to initialize Journalbeat with minimal functionality.

The architecture is mimicing Filebeat so it can be merged into FB in the future. It means it has multiple inputs which can share configuration (backoff, backoff_factor, etc.). Inputs can have multiple readers, each reader reads from a journal specified in the list of paths. The readers are not going to implement the interface Harverster until it's merged into Filebeat, because it would overcomplicate event publishing unnecessarily and would need to duplicate too much Filebeat code.

Checkpointing is copied from Winlogbeat. Once the new registry file is merged, it will be migrated.

Example configuration to read from the beginning of the local journal

journalbeat.inputs:
- paths: []
  seek: head

Features

  • read from local journal, journal file and directory
  • position tracking by using checkpointing as it's done in Winlogbeat
  • seek to "tail", "head", "cursor"
  • minimal E2E tests
  • fields.yml and documentation

Vendored:

  • github.com/coreos/go-systemd/sdjournal

Questions to answer in the future by me

  • Are readers need to be stopped manually when journals deleted/rotated?
  • Is the saved cursor valid after rotation? Yes.

}
}

func (r *Reader) Close() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Reader.Close should have comment or be unexported

"github.com/elastic/beats/libbeat/logp"
)

type Config struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type Config should have comment or be unexported

func (i *Input) Stop() {
}

func (i *Input) Wait() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Input.Wait should have comment or be unexported


}

func (i *Input) Stop() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Input.Stop should have comment or be unexported

}
}

func (i *Input) Run() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Input.Run should have comment or be unexported

"github.com/elastic/beats/libbeat/common"
)

type Config struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type Config should have comment or be unexported

)

const (
_FILE_FLAG_WRITE_THROUGH = 0x80000000

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use ALL_CAPS in Go names; use CamelCase

i.Run()
}

func (bt *Journalbeat) Stop() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Journalbeat.Stop should have comment or be unexported

return bt, nil
}

func (bt *Journalbeat) Run(b *beat.Beat) error {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Journalbeat.Run should have comment or be unexported

checkpoint *checkpoint.Checkpoint // Persists event log state to disk.
}

func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported function New should have comment or be unexported

@kvch
Copy link
Contributor Author

kvch commented Sep 10, 2018

jenkins test this

"github.com/elastic/beats/journalbeat/config"
)

type Journalbeat struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type Journalbeat should have comment or be unexported

"github.com/elastic/beats/journalbeat/config"
)

type Journalbeat struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type Journalbeat should have comment or be unexported

@kvch kvch added the in progress Pull request is currently in progress. label Sep 11, 2018
.travis.yml Outdated
@@ -99,6 +99,12 @@ jobs:
go: $GO_VERSION
stage: test

# Journalbeat
- os: linux
env: TARGETS="-C filebeat testsuite"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/filebeat/journalbeat/ ? 🙂

)

const (
LocalSystemJournalID = "LOCAL_SYSTEM_JOURNAL"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported const LocalSystemJournalID should have comment (or a comment on this block) or be unexported

@ph ph self-assigned this Sep 11, 2018
@kvch kvch removed the in progress Pull request is currently in progress. label Sep 13, 2018
@kvch kvch force-pushed the feature-journalbeat-init-multiple-inputs branch 3 times, most recently from 92c014a to acf063e Compare September 14, 2018 14:36
@kvch kvch mentioned this pull request Sep 17, 2018
23 tasks
@kvch
Copy link
Contributor Author

kvch commented Sep 17, 2018

I have created a meta issue to track matters related to Journalbeat: #8323

@@ -0,0 +1,10 @@
/.idea
/build

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove this line?

required: false
example: 123
description: >
The line number of the code which generated the log message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be also in log.source?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at ECS isn't this field should be named log.source instead? Similar to what we do with Filebeat.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different. log.source is path to the log file the line is read from. code.line is the number of the code line which generated the message.

}

wg.Wait()
bt.checkpoint.Shutdown()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bt.checkpoint.Shutdown() look like a good case for defer that we could set up after the ack handler?

dir := filepath.Dir(c.file)
logp.Info("Creating %s if it does not exist.", dir)
return os.MkdirAll(dir, os.FileMode(0750))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTES: I only did a quick sanity check on them, because theses library are coming from winlogbeat.


func create(path string) (*os.File, error) {
return os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, 0600)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTES: I only did a quick sanity check on them, because theses library are coming from winlogbeat.

if err != nil {
logp.Err("Error connecting: %v", err)
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous comment on returning an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? It's not a constructor. Also, I tried mimicing Input of Filebeat. There is logging on every level in each non-constructor. So I consider this acceptable.

)

func init() {
mage.BeatDescription = "Winlogbeat ships Windows event logs to Elasticsearch or Logstash."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Winlogbeat?

}
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure the above is doing what you are expecting. Adding the Follow implementation in the comment, Note I need to read more on the Reader class but at first look we could have an issue.

func (r *Reader) Follow() <-chan *beat.Event {
	out := make(chan *beat.Event)
	go r.readEntriesFromJournal(out)
	return out
}

The above return a channel that we consume with range, this means we will consume all the events until events are available or the channel is closed, looking at the reader implementation we only close the channel when we received a done signal.

Since the publish to the queue is done in a single goroutine, it is possible that a specific reader in the list make all the other readers starves for resource and the possibility to push publish events?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could fix it in two differents way:

  • Provide a call back on new events and that callback will do the actual push to the queue.
  • Merge N goroutines into a single goroutine and read from that. Similar to this link

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I tested it again for multiple runs. It seems like sometimes harvesters are starved. I haven't noticed it until you pointed it out. Thanks!

done chan struct{}
}

// New creates a new journal reader and moves the FP to the configured position.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this type would benefit from having a scoped logger, it will be useful to know which input created that logger when we are debugging users errors.

journalbeat/reader/journal.go Show resolved Hide resolved
@kvch
Copy link
Contributor Author

kvch commented Sep 24, 2018

Failing tests are unrelated.

}
}

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing a for loop here and check for i.done and calling i.publishAll we should just call i.publishAll. This would simplify the code logic.

for e := range out {
client.Publish(*e)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way the code is structured, publishAll should be a blocking call and we should do the check if is.Done is called here.

Maybe something like the following will work.

func (i *Input) publishAll(client beat.Client) {
	out := make(chan *beat.Event)
	defer close(out)

	var wg sync.WaitGroup
	merge := func(in chan *beat.Event) {
		wg.Add(1)
		go func(in chan *beat.Event, out chan *beat.Event) {
			for {
				select {
				case <-i.Done:
					return
				case out <- c:
				}
			}
		}(in)
	}

	for _, r := range i.readers {
		c := r.Follow()
		merge(c)
	}

	for {
		select {
		case <-is.Done:
			break
		case e <- out:
			client.Publish(*e)
		}
	}

	wg.Wait()
}

@@ -99,6 +99,12 @@ jobs:
go: $GO_VERSION
stage: test

# Journalbeat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a follow up issue or meta issue to make sure we add a Job in Jenkins for it in the near future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added one more element to the TODO list in the meta issue: #8323

description: >
Contains common fields available in all event types.
fields:
- name: coredump
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you expect all these fields like coredump, kernel to be on the top level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because I specify the field names here: https://github.com/elastic/beats/pull/8277/files#diff-e6feda99a2ce30c6c943d4a15664abd4R24

I am OK with moving them to lower levels to fit better into ECS. I haven't found any namespace which could contain these fields. Do you have any suggestions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are they all related to journald? If yes, I wonder if we should prefix them with journald?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have some example json events in the future somewhere which we can use for the docs but should also make it much clearer how the events will look like.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, these are all related to journald. Here is the full list of fields provided by systemd journal: https://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html
But some of the fields are not coming from the journal, but an application appends some fields. For example coredump is provided by systemd-coredump kernel helper. So prefixing with journald might be confusing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all the fields part of systemd? In general I would suggest to first prefix them with "something" and make them more global over time. At the same time not a blocker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I will prefix fields which does not belong anywhere in ECS with journald in a follow up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, lets have a quick chat before you put all the work into prefixing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's too late. :D 80aa2e4

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha, great. Going form prefix to non prefix is always easy (as we have alias), otherway around is hard.

func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config := config.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Error should be lower case.

break loop
case s := <-c.save:
c.lock.Lock()
c.states[s.Path] = s
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using locks around c.states could we use some methods like setState that have the lock inside. Like this we make sure we never forget.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just simply copied checkpointing from Winlogbeat and haven't touched it apart from adjusting the structs which are stored. I would rather not make further changes, because we are going to port everything to the new registry soon.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. @andrewkroh FYI

file, err := create(tempFile)
if os.IsNotExist(err) {
// Try to create directory if it does not exist.
if createDirErr := c.createDir(); createDirErr == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something we should already try on startup? I "think" we do it in Filebeat to warn the user that he might miss permissions to update / create the registry file as in the past we had the issue that FB was running but states could not be written.

Backoff: 1 * time.Second,
BackoffFactor: 2,
MaxBackoff: 60 * time.Second,
Seek: "cursor",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the default be the same as in the config file?

Backoff: 1 * time.Second,
BackoffFactor: 2,
MaxBackoff: 60 * time.Second,
Seek: "tail",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The global and local default should be the same?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I missed changing the global. Thanks for catching it.


// Run connects to the output, collects entries from the readers
// and then publishes the events.
func (i *Input) Run() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should run return an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried following the interface defined in https://github.com/elastic/beats/blob/master/filebeat/input/input.go#L43, so when we are moving it to Filebeat, less refactoring is required. Thus, I would not return an error from this function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably introduce an error at one stage in Filebeat :-)


// Reader reads entries from journal(s).
type Reader struct {
j *sdjournal.Journal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call it journal or something more descriptive? I prefer to have single char variables only for the struct inside methods or for loops in most cases.

@@ -0,0 +1,6 @@
update_time: 2018-09-11T10:06:50.895829905Z
journal_entries:
- path: /home/n/go/src/github.com/elastic/beats/journalbeat/tests/system/input/test.journal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to have such examples in.

}(in)
}

// close output channel after all input channels are merged or beats is stopped
go func() {
wg.Wait()
close(out)
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kvch There is no need to create a goroutine to wait on the WaitGroup, instead it could be done in a defer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I close using defer journalbeat panics with "send on a closed channel". I am not sure how exactly your solution supposed to work. Could you give more context?
I mean OFC I understand defer is called at the end of the function. But how would you rewrite the code so it could work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using defer sometimes works correctly. But most of the time journalbeat panics, either because of a null pointer dereference or sending on a closed channel. So closing the channel after wait is necessary to provide stable behaviour.

@ph
Copy link
Contributor

ph commented Sep 26, 2018

because I am weird I've tried to build it directly undermacos, since this beats require linux and some journald headers files, could we report something better when you are building it with an inferior os :)

go test -i github.com/elastic/beats/journalbeat github.com/elastic/beats/journalbeat/beater github.com/elastic/beats/journalbeat/checkpoint github.com/elastic/beats/journalbeat/cmd github.com/elastic/beats/journalbeat/config github.com/elastic/beats/journalbeat/include github.com/elastic/beats/journalbeat/input github.com/elastic/beats/journalbeat/reader
# github.com/elastic/beats/vendor/github.com/coreos/go-systemd/sdjournal
../vendor/github.com/coreos/go-systemd/sdjournal/journal.go:27:11: fatal error: 'systemd/sd-journal.h' file not found
 #include <systemd/sd-journal.h>
          ^~~~~~~~~~~~~~~~~~~~~~
1 error generated.
make: *** [unit] Error 2

@ph
Copy link
Contributor

ph commented Sep 26, 2018

After bootstrapping a linux vm and installing the libsystemd-dev package I was able to build it.

Testing:

Problems when stopping it with CTRL-C

I haven't look at all the code, but I was not able to exit journalbeat, even with multiple CTRL-C

2018-09-26T17:42:00.460Z        INFO    [monitoring]    log/log.go:144  Non-zero metrics in the last 30s        {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":160,"time":{"ms":5}},"total":{"ticks":170,"time":{"ms":9},"value":170},"user":{"ticks":10,"time":{"ms":4}}},"fd":{"limit":{"hard":1048576,"soft":1024},"open":7},"info":{"ephemeral_id":"9ed707ac-09db-4165-b7a8-2269c6749d4f","uptime":{"ms":150175}},"memstats":{"gc_next":4194304,"memory_alloc":1606832,"memory_total":4686992}},"libbeat":{"config":{"module":{"running":0}},"pipeline":{"clients":1,"events":{"active":0}}},"system":{"load":{"1":0,"15":0.03,"5":0.01,"norm":{"1":0,"15":0.015,"5":0.005}}}}}}
^C2018-09-26T17:42:05.669Z      DEBUG   [service]       service/service.go:51   Received sigterm/sigint, stopping
C2018-09-26T17:44:34.897Z      INFO    [monitoring]    log/log.go:144  Non-zero metrics in the last 30s        {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":130,"time":{"ms":135}},"total":{"ticks":140,"time":{"ms":152},"value":140},"user":{"ticks":10,"time":{"ms":17}}},"fd":{"limit":{"hard":1048576,"soft":1024},"open":5},"info":{"ephemeral_id":"3e49e629-e01c-4d2b-8825-d69edf4389c4","uptime":{"ms":30150}},"memstats":{"gc_next":4194304,"memory_alloc":1818584,"memory_total":3577192,"rss":14954496}},"libbeat":{"config":{"module":{"running":0}},"output":{"events":{"acked":1,"batches":1,"total":1},"type":"console","write":{"bytes":937}},"pipeline":{"clients":1,"events":{"active":0,"published":1,"total":1},"queue":{"acked":1}}},"system":{"cpu":{"cores":2},"load":{"1":0,"15":0.01,"5":0,"norm":{"1":0,"15":0.005,"5":0}}}}}}
2018-09-26T17:45:04.888Z        INFO    [monitoring]    log/log.go:144  Non-zero metrics in the last 30s        {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":140,"time":{"ms":5}},"total":{"ticks":150,"time":{"ms":6},"value":150},"user":{"ticks":10,"time":{"ms":1}}},"fd":{"limit":{"hard":1048576,"soft":1024},"open":5},"info":{"ephemeral_id":"3e49e629-e01c-4d2b-8825-d69edf4389c4","uptime":{"ms":60144}},"memstats":{"gc_next":4194304,"memory_alloc":2181176,"memory_total":3939784}},"libbeat":{"config":{"module":{"running":0}},"pipeline":{"clients":1,"events":{"active":0}}},"system":{"load":{"1":0,"15":0.01,"5":0,"norm":{"1":0,"15":0.005,"5":0}}}}}}

Copy link
Contributor

@ph ph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kvch Good work! I've tested it with the knowledge I have about journald, it appear to work correctly with events. We still have a few things that we can do in this PR and other in followups PR.

What we should addressed in this PR.

  • I was never able to quit journalbeat other than kill -9 the process.
  • The path/globbing need some work, I was confused by the behavior.

What can be addressed in another PR.

  • We need to have a better way to build it, maybe a special docker image with all the correct dependencies similar to packetbeat.
  • Adding Fields support
  • Adding processors support for each input.

journalbeat.inputs:
# Paths that should be crawled and fetched. Glob based paths.
# When empty starts to read from local journal.
- paths: []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it clear that path must be a directly, maybe adding validation to make sure its a directory and not files?

Coming from filebeat I thought it was a wildcard for files and I've used something like this:

/var/log/journal/xxxxxxxxx/*.journal
===


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the YAML file, it's says glob are supported so I've tried with the following:

/var/log/journal/*/

but I've received an error:

 stat /var/log/journal/*/: no such file or directory

Processor: nil,
ACKCount: func(n int) {
i.logger.Infof("journalbeat successfully published %d events", n)
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are missing two behavior that Filebeat has.

  • Adding custom fields to the events with EventMetadata
  • The support of processor per input.

The current code only support processors defined globally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above can be done in another PR.

case v := <-c:
default:
}
for v := range c {
Copy link
Contributor

@ph ph Sep 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our discussion this morning made me think and look at the code.
Instead of using the default route you can do the following, which replace 2 for loop with 1.

select {
case <-i.done:
	return
case v, ok := <-c: // ok is true if fetch is succesful.
	if !ok {
		return 
	}
    out <- v
}

@ph
Copy link
Contributor

ph commented Sep 27, 2018

I've tested the new PR and we can now correctly interrupt the execution of journaldbeat.

@ruflin I think we can merge that in the feature branch and iterate WDYT?

@kvch kvch force-pushed the feature-journalbeat-init-multiple-inputs branch from 6056350 to 88f039f Compare September 28, 2018 08:58
@kvch
Copy link
Contributor Author

kvch commented Sep 28, 2018

Rebased the branch on top of master, so tests should be green.

@ph
Copy link
Contributor

ph commented Sep 28, 2018

Great work @kvch!

@kvch kvch merged commit 09e8e88 into elastic:feature-journalbeat Sep 28, 2018
@@ -126,6 +126,24 @@
"revision": "af9db2027c98c61ecd8e17caa5bd265792b9b9a2",
"revisionTime": "2018-03-18T00:15:26Z"
},
{
"checksumSHA1": "cEszpxh1szqTb440ze4hm/Vfm40=",
"path": "github.com/coreos/go-systemd/sdjournal",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seeing that there are release for go-systemd, I wonder if we should tie it to one?

kvch added a commit that referenced this pull request Oct 16, 2018
This is the first PR to initialize Journalbeat with minimal functionality.

The architecture is mimicing Filebeat so it can be merged into FB in the future. It means it has multiple inputs which can share configuration (`backoff`, `backoff_factor`, etc.). Inputs can have multiple readers, each reader reads from a journal specified in the list of `paths`. The readers are not going to implement the interface `Harverster` until it's merged into Filebeat, because it would overcomplicate event publishing unnecessarily and would need to duplicate too much Filebeat code.

Checkpointing is copied from Winlogbeat. Once the new registry file is merged, it will be migrated.

Example configuration to read from the beginning of the local journal

```yml
journalbeat.inputs:
- paths: []
  seek: head
```

Features
* read from local journal, journal file and directory
* position tracking by using check-pointing as it's done in Winlogbeat
* seek to "tail", "head", "cursor"
* minimal E2E tests
* fields.yml and documentation

Vendored:
* github.com/coreos/go-systemd/sdjournal
kvch added a commit that referenced this pull request Oct 18, 2018
This is the first PR to initialize Journalbeat with minimal functionality.

The architecture is mimicing Filebeat so it can be merged into FB in the future. It means it has multiple inputs which can share configuration (`backoff`, `backoff_factor`, etc.). Inputs can have multiple readers, each reader reads from a journal specified in the list of `paths`. The readers are not going to implement the interface `Harverster` until it's merged into Filebeat, because it would overcomplicate event publishing unnecessarily and would need to duplicate too much Filebeat code.

Checkpointing is copied from Winlogbeat. Once the new registry file is merged, it will be migrated.

Example configuration to read from the beginning of the local journal

```yml
journalbeat.inputs:
- paths: []
  seek: head
```

Features
* read from local journal, journal file and directory
* position tracking by using check-pointing as it's done in Winlogbeat
* seek to "tail", "head", "cursor"
* minimal E2E tests
* fields.yml and documentation

Vendored:
* github.com/coreos/go-systemd/sdjournal
kvch added a commit that referenced this pull request Oct 24, 2018
This is the first PR to initialize Journalbeat with minimal functionality.

The architecture is mimicing Filebeat so it can be merged into FB in the future. It means it has multiple inputs which can share configuration (`backoff`, `backoff_factor`, etc.). Inputs can have multiple readers, each reader reads from a journal specified in the list of `paths`. The readers are not going to implement the interface `Harverster` until it's merged into Filebeat, because it would overcomplicate event publishing unnecessarily and would need to duplicate too much Filebeat code.

Checkpointing is copied from Winlogbeat. Once the new registry file is merged, it will be migrated.

Example configuration to read from the beginning of the local journal

```yml
journalbeat.inputs:
- paths: []
  seek: head
```

Features
* read from local journal, journal file and directory
* position tracking by using check-pointing as it's done in Winlogbeat
* seek to "tail", "head", "cursor"
* minimal E2E tests
* fields.yml and documentation

Vendored:
* github.com/coreos/go-systemd/sdjournal
kvch added a commit that referenced this pull request Oct 24, 2018
* Initialize Journalbeat (#8277)

This is the first PR to initialize Journalbeat with minimal functionality.

The architecture is mimicing Filebeat so it can be merged into FB in the future. It means it has multiple inputs which can share configuration (`backoff`, `backoff_factor`, etc.). Inputs can have multiple readers, each reader reads from a journal specified in the list of `paths`. The readers are not going to implement the interface `Harverster` until it's merged into Filebeat, because it would overcomplicate event publishing unnecessarily and would need to duplicate too much Filebeat code.

Checkpointing is copied from Winlogbeat. Once the new registry file is merged, it will be migrated.

Example configuration to read from the beginning of the local journal

```yml
journalbeat.inputs:
- paths: []
  seek: head
```

Features
* read from local journal, journal file and directory
* position tracking by using check-pointing as it's done in Winlogbeat
* seek to "tail", "head", "cursor"
* minimal E2E tests
* fields.yml and documentation

Vendored:
* github.com/coreos/go-systemd/sdjournal

* Journalbeat matches support && minor additions (#8324)

### Matching support
From now on it's possible to match for journal entry fields in Journalbeat using the new option `matches`. This requires a list of key value pairs separated by "=". The key has to be a journalbeat event key (e.g systemd.unit) and the value is the exact value journal reader needs to find in the entries.

Example configuration which returns NGINX and dhclient entries from the journal:
```yml
include_matches:
  - "systemd.unit=nginx"
  - "process.name=dhclient"
```
### Docker fields
Added docker fields from: https://docs.docker.com/config/containers/logging/journald/
- `container.id`
- `container.id_truncated`
- `container.name`
- `container.image.tag`
- `container.partial`

### Parse timestamp of entries
Journalbeat parses the timestamp of the entry and adds it to the event as `@timestamp`. The time of reading by Journalbeat is saved in `read_timestamp`.

### Save custom fields
Custom fields by various sources are stored under `custom`. Field names are normalized, meaning `"_"` prefix is removed and every letter is lowercase.

### Fields && processors
From now on it is possible to configure `processors` and `fields`, etc on `input` level.

### Metrics

The size of each open reader is reporting in bytes:
```
{
  "journalbeat": {
    "journals": {
      "journal_1": {
         "path": "system.journal",
         "size_in_bytes": 123124214,
       }
    }
}
```

* Minor improvements to Journalbeat (#8618)

* Packaging of journalbeat (#8702)

Journalbeat is going to be built using the new Debian 8 container, because systemd version in Debian 7 is too old (v44 instead of the required v187).

Minor changes:
* add missing X-Pack folder to journalbeat
* do not crosscompile journalbeat due to missing dependencies locally

* Add journalbeat docs (#8735)

* Add journalbeat docs
kvch added a commit to kvch/beats that referenced this pull request Oct 24, 2018
* Initialize Journalbeat (elastic#8277)

This is the first PR to initialize Journalbeat with minimal functionality.

The architecture is mimicing Filebeat so it can be merged into FB in the future. It means it has multiple inputs which can share configuration (`backoff`, `backoff_factor`, etc.). Inputs can have multiple readers, each reader reads from a journal specified in the list of `paths`. The readers are not going to implement the interface `Harverster` until it's merged into Filebeat, because it would overcomplicate event publishing unnecessarily and would need to duplicate too much Filebeat code.

Checkpointing is copied from Winlogbeat. Once the new registry file is merged, it will be migrated.

Example configuration to read from the beginning of the local journal

```yml
journalbeat.inputs:
- paths: []
  seek: head
```

Features
* read from local journal, journal file and directory
* position tracking by using check-pointing as it's done in Winlogbeat
* seek to "tail", "head", "cursor"
* minimal E2E tests
* fields.yml and documentation

Vendored:
* github.com/coreos/go-systemd/sdjournal

* Journalbeat matches support && minor additions (elastic#8324)

From now on it's possible to match for journal entry fields in Journalbeat using the new option `matches`. This requires a list of key value pairs separated by "=". The key has to be a journalbeat event key (e.g systemd.unit) and the value is the exact value journal reader needs to find in the entries.

Example configuration which returns NGINX and dhclient entries from the journal:
```yml
include_matches:
  - "systemd.unit=nginx"
  - "process.name=dhclient"
```
Added docker fields from: https://docs.docker.com/config/containers/logging/journald/
- `container.id`
- `container.id_truncated`
- `container.name`
- `container.image.tag`
- `container.partial`

Journalbeat parses the timestamp of the entry and adds it to the event as `@timestamp`. The time of reading by Journalbeat is saved in `read_timestamp`.

Custom fields by various sources are stored under `custom`. Field names are normalized, meaning `"_"` prefix is removed and every letter is lowercase.

From now on it is possible to configure `processors` and `fields`, etc on `input` level.

The size of each open reader is reporting in bytes:
```
{
  "journalbeat": {
    "journals": {
      "journal_1": {
         "path": "system.journal",
         "size_in_bytes": 123124214,
       }
    }
}
```

* Minor improvements to Journalbeat (elastic#8618)

* Packaging of journalbeat (elastic#8702)

Journalbeat is going to be built using the new Debian 8 container, because systemd version in Debian 7 is too old (v44 instead of the required v187).

Minor changes:
* add missing X-Pack folder to journalbeat
* do not crosscompile journalbeat due to missing dependencies locally

* Add journalbeat docs (elastic#8735)

* Add journalbeat docs

(cherry picked from commit 24d0e08)
kvch added a commit that referenced this pull request Oct 24, 2018
* Add Journalbeat (#8703)

* Initialize Journalbeat (#8277)

This is the first PR to initialize Journalbeat with minimal functionality.

The architecture is mimicing Filebeat so it can be merged into FB in the future. It means it has multiple inputs which can share configuration (`backoff`, `backoff_factor`, etc.). Inputs can have multiple readers, each reader reads from a journal specified in the list of `paths`. The readers are not going to implement the interface `Harverster` until it's merged into Filebeat, because it would overcomplicate event publishing unnecessarily and would need to duplicate too much Filebeat code.

Checkpointing is copied from Winlogbeat. Once the new registry file is merged, it will be migrated.

Example configuration to read from the beginning of the local journal

```yml
journalbeat.inputs:
- paths: []
  seek: head
```

Features
* read from local journal, journal file and directory
* position tracking by using check-pointing as it's done in Winlogbeat
* seek to "tail", "head", "cursor"
* minimal E2E tests
* fields.yml and documentation

Vendored:
* github.com/coreos/go-systemd/sdjournal

* Journalbeat matches support && minor additions (#8324)

From now on it's possible to match for journal entry fields in Journalbeat using the new option `matches`. This requires a list of key value pairs separated by "=". The key has to be a journalbeat event key (e.g systemd.unit) and the value is the exact value journal reader needs to find in the entries.

Example configuration which returns NGINX and dhclient entries from the journal:
```yml
include_matches:
  - "systemd.unit=nginx"
  - "process.name=dhclient"
```
Added docker fields from: https://docs.docker.com/config/containers/logging/journald/
- `container.id`
- `container.id_truncated`
- `container.name`
- `container.image.tag`
- `container.partial`

Journalbeat parses the timestamp of the entry and adds it to the event as `@timestamp`. The time of reading by Journalbeat is saved in `read_timestamp`.

Custom fields by various sources are stored under `custom`. Field names are normalized, meaning `"_"` prefix is removed and every letter is lowercase.

From now on it is possible to configure `processors` and `fields`, etc on `input` level.

The size of each open reader is reporting in bytes:
```
{
  "journalbeat": {
    "journals": {
      "journal_1": {
         "path": "system.journal",
         "size_in_bytes": 123124214,
       }
    }
}
```

* Minor improvements to Journalbeat (#8618)

* Packaging of journalbeat (#8702)

Journalbeat is going to be built using the new Debian 8 container, because systemd version in Debian 7 is too old (v44 instead of the required v187).

Minor changes:
* add missing X-Pack folder to journalbeat
* do not crosscompile journalbeat due to missing dependencies locally

* Add journalbeat docs (#8735)

* Add journalbeat docs

(cherry picked from commit 24d0e08)
@joelika
Copy link

joelika commented Dec 12, 2018

@kvch

Are readers need to be stopped manually when journals deleted/rotated?

I've been evaluating journalbeaet on a RHEL 7 setup, and I noticed that journalbeat stops sending logs to logstash daily. I've been debugging, and it appears as if it happens when journald rotates it's journal.

Is there a way to detect this in journalbeat and restart it? Right now, I have to go in and manually restart journalbeat each day.

I'm using the systemd configuration included in the rpm of journalbeat to manage the service.

@ph
Copy link
Contributor

ph commented Dec 12, 2018

@joelika This is indeed a bug and it should be handled transparently by journalbeat, would you mind creating an issue with it.

@joelika
Copy link

joelika commented Dec 13, 2018

@ph thanks! I opened #9533

DStape pushed a commit to DStape/beats that referenced this pull request Aug 20, 2019
* Initialize Journalbeat (elastic#8277)

This is the first PR to initialize Journalbeat with minimal functionality.

The architecture is mimicing Filebeat so it can be merged into FB in the future. It means it has multiple inputs which can share configuration (`backoff`, `backoff_factor`, etc.). Inputs can have multiple readers, each reader reads from a journal specified in the list of `paths`. The readers are not going to implement the interface `Harverster` until it's merged into Filebeat, because it would overcomplicate event publishing unnecessarily and would need to duplicate too much Filebeat code.

Checkpointing is copied from Winlogbeat. Once the new registry file is merged, it will be migrated.

Example configuration to read from the beginning of the local journal

```yml
journalbeat.inputs:
- paths: []
  seek: head
```

Features
* read from local journal, journal file and directory
* position tracking by using check-pointing as it's done in Winlogbeat
* seek to "tail", "head", "cursor"
* minimal E2E tests
* fields.yml and documentation

Vendored:
* github.com/coreos/go-systemd/sdjournal

* Journalbeat matches support && minor additions (elastic#8324)

### Matching support
From now on it's possible to match for journal entry fields in Journalbeat using the new option `matches`. This requires a list of key value pairs separated by "=". The key has to be a journalbeat event key (e.g systemd.unit) and the value is the exact value journal reader needs to find in the entries.

Example configuration which returns NGINX and dhclient entries from the journal:
```yml
include_matches:
  - "systemd.unit=nginx"
  - "process.name=dhclient"
```
### Docker fields
Added docker fields from: https://docs.docker.com/config/containers/logging/journald/
- `container.id`
- `container.id_truncated`
- `container.name`
- `container.image.tag`
- `container.partial`

### Parse timestamp of entries
Journalbeat parses the timestamp of the entry and adds it to the event as `@timestamp`. The time of reading by Journalbeat is saved in `read_timestamp`.

### Save custom fields
Custom fields by various sources are stored under `custom`. Field names are normalized, meaning `"_"` prefix is removed and every letter is lowercase.

### Fields && processors
From now on it is possible to configure `processors` and `fields`, etc on `input` level.

### Metrics

The size of each open reader is reporting in bytes:
```
{
  "journalbeat": {
    "journals": {
      "journal_1": {
         "path": "system.journal",
         "size_in_bytes": 123124214,
       }
    }
}
```

* Minor improvements to Journalbeat (elastic#8618)

* Packaging of journalbeat (elastic#8702)

Journalbeat is going to be built using the new Debian 8 container, because systemd version in Debian 7 is too old (v44 instead of the required v187).

Minor changes:
* add missing X-Pack folder to journalbeat
* do not crosscompile journalbeat due to missing dependencies locally

* Add journalbeat docs (elastic#8735)

* Add journalbeat docs
DStape pushed a commit to DStape/beats that referenced this pull request Aug 20, 2019
* Add Journalbeat (elastic#8703)

* Initialize Journalbeat (elastic#8277)

This is the first PR to initialize Journalbeat with minimal functionality.

The architecture is mimicing Filebeat so it can be merged into FB in the future. It means it has multiple inputs which can share configuration (`backoff`, `backoff_factor`, etc.). Inputs can have multiple readers, each reader reads from a journal specified in the list of `paths`. The readers are not going to implement the interface `Harverster` until it's merged into Filebeat, because it would overcomplicate event publishing unnecessarily and would need to duplicate too much Filebeat code.

Checkpointing is copied from Winlogbeat. Once the new registry file is merged, it will be migrated.

Example configuration to read from the beginning of the local journal

```yml
journalbeat.inputs:
- paths: []
  seek: head
```

Features
* read from local journal, journal file and directory
* position tracking by using check-pointing as it's done in Winlogbeat
* seek to "tail", "head", "cursor"
* minimal E2E tests
* fields.yml and documentation

Vendored:
* github.com/coreos/go-systemd/sdjournal

* Journalbeat matches support && minor additions (elastic#8324)

From now on it's possible to match for journal entry fields in Journalbeat using the new option `matches`. This requires a list of key value pairs separated by "=". The key has to be a journalbeat event key (e.g systemd.unit) and the value is the exact value journal reader needs to find in the entries.

Example configuration which returns NGINX and dhclient entries from the journal:
```yml
include_matches:
  - "systemd.unit=nginx"
  - "process.name=dhclient"
```
Added docker fields from: https://docs.docker.com/config/containers/logging/journald/
- `container.id`
- `container.id_truncated`
- `container.name`
- `container.image.tag`
- `container.partial`

Journalbeat parses the timestamp of the entry and adds it to the event as `@timestamp`. The time of reading by Journalbeat is saved in `read_timestamp`.

Custom fields by various sources are stored under `custom`. Field names are normalized, meaning `"_"` prefix is removed and every letter is lowercase.

From now on it is possible to configure `processors` and `fields`, etc on `input` level.

The size of each open reader is reporting in bytes:
```
{
  "journalbeat": {
    "journals": {
      "journal_1": {
         "path": "system.journal",
         "size_in_bytes": 123124214,
       }
    }
}
```

* Minor improvements to Journalbeat (elastic#8618)

* Packaging of journalbeat (elastic#8702)

Journalbeat is going to be built using the new Debian 8 container, because systemd version in Debian 7 is too old (v44 instead of the required v187).

Minor changes:
* add missing X-Pack folder to journalbeat
* do not crosscompile journalbeat due to missing dependencies locally

* Add journalbeat docs (elastic#8735)

* Add journalbeat docs

(cherry picked from commit 24d0e08)
leweafan pushed a commit to leweafan/beats that referenced this pull request Apr 28, 2023
This is the first PR to initialize Journalbeat with minimal functionality.

The architecture is mimicing Filebeat so it can be merged into FB in the future. It means it has multiple inputs which can share configuration (`backoff`, `backoff_factor`, etc.). Inputs can have multiple readers, each reader reads from a journal specified in the list of `paths`. The readers are not going to implement the interface `Harverster` until it's merged into Filebeat, because it would overcomplicate event publishing unnecessarily and would need to duplicate too much Filebeat code.

Checkpointing is copied from Winlogbeat. Once the new registry file is merged, it will be migrated.

Example configuration to read from the beginning of the local journal

```yml
journalbeat.inputs:
- paths: []
  seek: head
```

Features
* read from local journal, journal file and directory
* position tracking by using check-pointing as it's done in Winlogbeat
* seek to "tail", "head", "cursor"
* minimal E2E tests
* fields.yml and documentation

Vendored:
* github.com/coreos/go-systemd/sdjournal
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants