Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beats input support #551

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

multicatch
Copy link

I had been trying to configure logstash-logback-encoder to send logs to Graylog, while reusing existing Beats input that is also used by Filebeat. Unfortunately I couldn't get it to work, as appenders and encoders output plain JSON. Graylog, on the other hand, refused to accept such input, throwing an exception that said "Unknown beats protocol version".

I have analyzed the problem and it seems that Graylog only accepts input in Lumberjack protocol format. I have decided to modify CompositeJsonEncoder so it can be extended with custom payload converters. Those converters can be used to convert whole encoder output to a given format (eg. Beats-compatible format or gzip if someone is willing to implement it).

Those converters can be chosen in the configuration and are optional - if none are specified, everything will work as before.

@philsttr
Copy link
Collaborator

philsttr commented Jul 6, 2021

Thanks for the contribution! Love the idea.

Allow me some time to investigate it. I want to understand the lumberjack protocol a bit, and do some testing myself before merging it.

@multicatch
Copy link
Author

multicatch commented Jul 6, 2021

Ok, no problem, thanks for replying! Just keep in mind that I kept my implementation simple and it does not account windows and other advanced Lumberjack stuff, but is sufficient to send logs as JSON to Graylog. Tested on Graylog 3.3 to be working. I also compared with the raw data from Filebeat’s output and it seems that this implementation is sufficient

@brenuart
Copy link
Collaborator

brenuart commented Jul 7, 2021

A few comments already...

Lumberjack is not only about the data format, it is also a protocol with window and ACK (like TCP). The ultimate objective is to provide a reliable transport mechanism on top of TCP by which the sender is notified when the receiver has effectively secured/processed the data.

Changes are therefore also required to the LogstashTcpSocketAppender to fully support the Lumberjack protocol. The appender will have to keep track of sequences and ACK, be able to retry when the receiver asks and keeping a window of "in flight" events that should, for instance, be replayed when switching to another destination. The Disruptor seems to have a notion of "batches" that may come in handy to support this feature...

@multicatch
Copy link
Author

multicatch commented Jul 7, 2021

Ok, thanks for the feedback, I will fix it in my free time as soon I find some.

@brenuart, Do you have any other comments or suggestions about the rest of the code or the code style?

@multicatch multicatch marked this pull request as draft July 8, 2021 10:11
@multicatch
Copy link
Author

Converted to draft as it will need some rework to fully support Lumberjack

@multicatch multicatch marked this pull request as ready for review July 15, 2021 12:58
@multicatch
Copy link
Author

@philsttr, @brenuart, I have made improvements to fully support Lumberjack v2 with JSON payload, please review my changes

@multicatch multicatch changed the title Encoded payload converter Beats input support & payload converters Jul 15, 2021
@philsttr
Copy link
Collaborator

philsttr commented Jul 18, 2021

Is this for lumberjack protocol v1 or v2 ? Whichever version is used needs to be explicitly stated in documentation, and potentially in the classnames (e.g. LumberjackV2...)

(edit: I now see this is for v2 by looking at the constants. Can you make this more clear in the documentation and class naming?)

Is there a specification for the protocol anywhere? If not, what is the best reference implementation for reverse engineering? What did you use as a guide when implementing this?

Regarding payload converters... I think the current design is "inside-out". Meaning... I conceptually think of the lumberjack encoding wrapping the JSON inside of it. However, the current design has the lumberjack converter inside the json encoder, which seems "inside-out" to me. I think the encoding required by the lumberjack protocol would be better modeled as a separate encoder that delegates to the json encoder. For example, something like this:

<encoder class="net.logstash.logback.encoder.LumberjackEncoder">
    <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"/>
</encoder>

Additionally, I don't think the payload converter pattern is something that is needed in general, since having a wrapping encoder is the preferred pattern for any use case that requires converting the json output.

Another alternative would be to perform the encoding required by the lumberjack protocol inside of the appender itself, without exposing/requiring a separate encoder. Is there ever a need to do lumberjack encoding in something other than TCP? Also, the beats appender and the current lumberjack converter seem very tightly coupled in the current implementation (as evidenced by the installConverter method and the BeatsEncoderWrapper). This is a good argument for performing the lumberjack encoding within the appender itself, since it is required when using the beats appender.

@multicatch multicatch changed the title Beats input support & payload converters Beats input support Jul 18, 2021
@multicatch
Copy link
Author

Thanks for your input! I've updated comments and README to indicate how I implemented this protocol and what is the version of this protocol. I also removed the PayloadConverter interface - yes, you are right, it is rendundant and I after reading the code again I felt like it was unnatural reconfiguring the encoder in the appender like that.

I've tested the changes on Graylog 3.3 with Beats input and it seems to be working properly (sent about 100 messages with window size of 10).

@philsttr
Copy link
Collaborator

Nice! The removal of the PayloadConverter looks much cleaner and self-contained. Thanks for that update.

I have one more major concern to discuss. This implementation claims to implement the full lumberjack v2 protocol, which "aims to provide reliable, application-level, message transport.". However, I don't believe the current implementation fully implements the "reliable" portion of it. Specifically, the sequence numbers of the acks are currently read from the stream, but then ignored. This means that if the appender needs to reconnect for whatever reason, and acks have not been received, those un-acked events could be lost. I believe the appender should resend un-acked events on reconnect to meet the reliability requirement of the lumberjack protocol.

I haven't fully thought through how that would be implemented. Can you take a deeper look at what it would take to properly honor the sequence numbers in the acks, and provide full reliability?

@brenuart
Copy link
Collaborator

brenuart commented Jul 18, 2021

This means that if the appender needs to reconnect for whatever reason, and acks have not been received, those un-acked events could be lost. I believe the appender should resend un-acked events on reconnect to meet the reliability requirement of the lumberjack protocol.

The reliable part of the Lumberjack protocol is indeed currently not implemented. Events can be lost when the connection is unexpectedly lost with the server but can also happen when the appender decides to cleanly shutdown the connection and switch back to another destination (see the various destination strategies).

I already had a look at how this could be implemented a while ago and it seems that the RingBuffer can help us a lot...
A Disruptor processes events by means of a EventProcessor. The appender however uses an EventHandler which is actually wrapped inside a BatchEventProcessor. This kind of processor has access to the SequenceBarrier and is notified whenever the sequence is moved forward. It calls back the EventHandler for every new event received then "acknowledge" all the events to release the corresponding slot from the RingBuffer.

The idea here is to use a custom EventProcessor that would clear the events from the RingBuffer only after the corresponding ACKs are received. The processor should somehow remember the sequence (index in the RingBuffer) of the last sent event and the sequence of the last ACK event. Knowing these sequences, the processor can re-send un-acked events when appropriate by fetching them from the DataProvider (the RingBuffer is a DataProvider).

Just throwing ideas here and sketching a possible approach...

... PayloadConverter ...

The PayloadConverter seems to be tight to the transmission protocol, isn't it? As far as I can tell this component seems to represent the frames of the protocol...
With this in mind, I would probably not build a special Encoder like a LumberJackEncoder either... The role of an Encoder is to convert a log event into another representation - whatever how the resulting payload is transmitted over the wire. We currently have an implicit protocol that write the payload "asis" then add a new line before starting with the next payload. Each frames are basically made of:

<payload bytes>\n

This protocol is implemented in the TcpSocketAppender.

The framing imposed by LumberJack should therefore be kept "private" to the "LumberjackTcpAppender" - and not exposed as an Encoder or anything alike. Users should not even be aware of it...
(Note: this is apparently what is currently be done in the latest commits ;-)

Another option is to introduce the concept of "wire protocol" (njson, lumberjack, etc) and keep a single TcpSocketAppender that would be configured with the desired protocol - this could make the design more flexible and even open the door for additional protocols in the future...

Memory allocation and copy
There is an ongoing effort at reducing the memory allocations and copies required to process events. Whenever possible, we should try to write directly into the output stream rather than going through intermediate byte arrays.

/**
* A queue of currently accepted ACKs from remote
*/
private final BlockingQueue<AckEvent> ackEvents = new ArrayBlockingQueue<>(10);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep a queue of the last 10 received ACKs or is it enough to keep the latest only?
AFAIK an ACK received for sequence x acknowledge all events up to x included.

public byte[] encode(E event) {
if (counter.get() == windowSize) {
try {
ackEvents.take();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The WriteTimeoutRunnable may decide to close the socket whenever it detects a dead peer (that is a write operation taking too long). At this time the EventHandler thread may be waiting for the next ACKs before writing the current event. If the socket is closed, the ACK will never come and the thread will not exit from ackEvents.take() (the AckReaderCallable will "die" because of the IOException received when the socket is closed).

@multicatch
Copy link
Author

Thanks for the suggestions, I would normally just use something like CircularFifoBuffer or something like this and implement it like this:

  1. When socket is (re)opened and buffer is not empty, send the content of the buffer to remote reader.
  2. After receiving ACK (ackEvents.take()), clear it.
    And add any event to the buffer before sending it to the stream.

The disruptor's RingBuffer seems like a really interesting and promising idea, but I need to learn more about its usage to use it properly. I will also try to fix the other issues pointed out, but I have already removed PayloadConverters and kept the Lumberjack wrapper private in the appender.

Thanks for all feedback and suggestions btw.

@brenuart brenuart force-pushed the main branch 3 times, most recently from 8e7c02d to 926c65a Compare October 28, 2021 23:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants