Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logstash output configurable compression level #630

Merged
merged 2 commits into from
Jan 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ https://github.com/elastic/beats/compare/1.0.0...master[Check the HEAD diff]
==== Added

*Affecting all Beats*
- Make logstash output compression level configurable. {pull}630[630]

*Packetbeat*

*Topbeat*
- group all cpu usage per core statistics and export them optionally if cpu_per_core is configured {pull}496[496]
- Group all cpu usage per core statistics and export them optionally if cpu_per_core is configured {pull}496[496]

*Filebeat*
- Add multiline support for combining multiple related lines into one event. {issue}461[461]
Expand Down
3 changes: 3 additions & 0 deletions filebeat/etc/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ output:
# Number of workers per Logstash host.
#worker: 1

# Set gzip compression level.
#compression_level: 3

# Optional load balance the events between the Logstash hosts
#loadbalance: true

Expand Down
6 changes: 6 additions & 0 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,12 @@ The list of known Logstash servers to connect to. All entries in this list can
contain a port number. If no port number is given, the value specified for <<port>>
is used as the default port number.

===== compression_level

Set gzip compression level. Setting this value to values <=0 disables compression.
The compression level must be in the range of 1 (best-speed) to 9 (best compression).
The default value is 3.

===== worker

The number of workers per configured host publishing events to Logstash. This
Expand Down
3 changes: 3 additions & 0 deletions libbeat/etc/libbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ output:
# Number of workers per Logstash host.
#worker: 1

# Set gzip compression level.
#compression_level: 3

# Optional load balance the events between the Logstash hosts
#loadbalance: true

Expand Down
77 changes: 60 additions & 17 deletions libbeat/outputs/logstash/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type lumberjackClient struct {
maxWindowSize int
timeout time.Duration
countTimeoutErr int
compressLevel int
}

const (
Expand All @@ -60,15 +61,38 @@ var (

func newLumberjackClient(
conn TransportClient,
compressLevel int,
maxWindowSize int,
timeout time.Duration,
) *lumberjackClient {
) (*lumberjackClient, error) {

// validate by creating and discarding zlib writer with configured level
if compressLevel > 0 {
tmp := bytes.NewBuffer(nil)
w, err := zlib.NewWriterLevel(tmp, compressLevel)
if err != nil {
return nil, err
}
w.Close()
}

return &lumberjackClient{
TransportClient: conn,
windowSize: defaultStartMaxWindowSize,
timeout: timeout,
maxWindowSize: maxWindowSize,
}
compressLevel: compressLevel,
}, nil
}

func (l *lumberjackClient) Connect(timeout time.Duration) error {
logp.Debug("logstash", "connect")
return l.TransportClient.Connect(timeout)
}

func (l *lumberjackClient) Close() error {
logp.Debug("logstash", "close connection")
return l.TransportClient.Close()
}

func (l *lumberjackClient) PublishEvent(event common.MapStr) error {
Expand Down Expand Up @@ -106,13 +130,17 @@ func (l *lumberjackClient) publishWindowed(events []common.MapStr) (int, error)
return 0, nil
}

logp.Debug("logstash", "Try to publish %v events to logstash with window size %v", len(events), l.windowSize)
batchSize := len(events)
debug("Try to publish %v events to logstash with window size %v",
batchSize, l.windowSize)

// prepare message payload
if len(events) > l.windowSize {
events = events[:l.windowSize]
}
count, payload, err := l.compressEvents(events)

// serialize all raw events into output buffer, removing all events encoding failed for
count, payload, err := l.serializeEvents(events)
if err != nil {
return 0, err
}
Expand All @@ -130,8 +158,12 @@ func (l *lumberjackClient) publishWindowed(events []common.MapStr) (int, error)
return l.onFail(0, err)
}

// send payload
if err = l.sendCompressed(payload); err != nil {
if l.compressLevel > 0 {
err = l.sendCompressed(payload)
} else {
_, err = l.Write(payload)
}
if err != nil {
return l.onFail(0, err)
}

Expand Down Expand Up @@ -194,29 +226,39 @@ func (l *lumberjackClient) onFail(n int, err error) (int, error) {
return n, nil
}

func (l *lumberjackClient) compressEvents(
func (l *lumberjackClient) serializeEvents(
events []common.MapStr,
) (uint32, []byte, error) {
buf := bytes.NewBuffer(nil)

// compress events
compressor, _ := zlib.NewWriterLevel(buf, 3) // todo make compression level configurable?
if l.compressLevel > 0 {
w, _ := zlib.NewWriterLevel(buf, l.compressLevel)
count, err := l.doSerializeEvents(w, events)
if err != nil {
return 0, nil, err
}
if err := w.Close(); err != nil {
debug("Finalizing zlib compression failed with: %s", err)
return 0, nil, err
}
return count, buf.Bytes(), nil
}

count, err := l.doSerializeEvents(buf, events)
return count, buf.Bytes(), err
}

func (l *lumberjackClient) doSerializeEvents(out io.Writer, events []common.MapStr) (uint32, error) {
var sequence uint32
for _, event := range events {
sequence++
err := l.writeDataFrame(event, sequence, compressor)
err := l.writeDataFrame(event, sequence, out)
if err != nil {
logp.Critical("failed to encode event: %v", err)
sequence-- //forget this last broken event and continue
}
}
if err := compressor.Close(); err != nil {
debug("Finalizing zlib compression failed with: %s", err)
return 0, nil, err
}
payload := buf.Bytes()

return sequence, payload, nil
return sequence, nil
}

func (l *lumberjackClient) readACK() (uint32, error) {
Expand Down Expand Up @@ -256,6 +298,7 @@ func (l *lumberjackClient) sendCompressed(payload []byte) error {
if err := l.SetDeadline(time.Now().Add(l.timeout)); err != nil {
return err
}

if _, err := l.Write(codeCompressed); err != nil {
return err
}
Expand Down
17 changes: 11 additions & 6 deletions libbeat/outputs/logstash/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ type mockTransport struct {
control chan mockTransportCommand
}

func newLumberjackTestClient(conn TransportClient) *lumberjackClient {
c, err := newLumberjackClient(conn, 3, testMaxWindowSize, 5*time.Second)
if err != nil {
panic(err)
}
return c
}

func newClientTestDriver(client mode.ProtocolClient) *testClientDriver {
driver := &testClientDriver{
client: client,
Expand Down Expand Up @@ -300,8 +308,7 @@ const testMaxWindowSize = 64

func TestSendZero(t *testing.T) {
transp := newMockTransport()
client := newClientTestDriver(
newLumberjackClient(transp, testMaxWindowSize, 5*time.Second))
client := newClientTestDriver(newLumberjackTestClient(transp))

client.Publish(make([]common.MapStr, 0))

Expand All @@ -315,8 +322,7 @@ func TestSendZero(t *testing.T) {

func TestSimpleEvent(t *testing.T) {
transp := newMockTransport()
client := newClientTestDriver(
newLumberjackClient(transp, testMaxWindowSize, 5*time.Second))
client := newClientTestDriver(newLumberjackTestClient(transp))

event := common.MapStr{"name": "me", "line": 10}
client.Publish([]common.MapStr{event})
Expand Down Expand Up @@ -348,8 +354,7 @@ func TestSimpleEvent(t *testing.T) {

func TestStructuredEvent(t *testing.T) {
transp := newMockTransport()
client := newClientTestDriver(
newLumberjackClient(transp, testMaxWindowSize, 5*time.Second))
client := newClientTestDriver(newLumberjackTestClient(transp))
event := common.MapStr{
"name": "test",
"struct": common.MapStr{
Expand Down
33 changes: 24 additions & 9 deletions libbeat/outputs/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
logstasDefaultMaxTimeout = 90 * time.Second
defaultSendRetries = 3
defaultMaxWindowSize = 1024
defaultCompressionLevel = 3
)

var waitRetry = time.Duration(1) * time.Second
Expand Down Expand Up @@ -72,6 +73,11 @@ func (lj *logstash) init(
maxWindowSize = *config.BulkMaxSize
}

compressLevel := defaultCompressionLevel
if config.CompressionLevel != nil {
compressLevel = *config.CompressionLevel
}

var clients []mode.ProtocolClient
var err error
if useTLS {
Expand All @@ -82,16 +88,12 @@ func (lj *logstash) init(
}

clients, err = mode.MakeClients(config,
makeClientFactory(maxWindowSize, timeout,
func(host string) (TransportClient, error) {
return newTLSClient(host, defaultPort, tlsConfig)
}))
makeClientFactory(maxWindowSize, compressLevel, timeout,
makeTLSClient(defaultPort, tlsConfig)))
} else {
clients, err = mode.MakeClients(config,
makeClientFactory(maxWindowSize, timeout,
func(host string) (TransportClient, error) {
return newTCPClient(host, defaultPort)
}))
makeClientFactory(maxWindowSize, compressLevel, timeout,
makeTCPClient(defaultPort)))
}
if err != nil {
return err
Expand Down Expand Up @@ -133,6 +135,7 @@ func (lj *logstash) init(

func makeClientFactory(
maxWindowSize int,
compressLevel int,
timeout time.Duration,
makeTransp func(string) (TransportClient, error),
) func(string) (mode.ProtocolClient, error) {
Expand All @@ -141,7 +144,19 @@ func makeClientFactory(
if err != nil {
return nil, err
}
return newLumberjackClient(transp, maxWindowSize, timeout), nil
return newLumberjackClient(transp, compressLevel, maxWindowSize, timeout)
}
}

func makeTCPClient(port int) func(string) (TransportClient, error) {
return func(host string) (TransportClient, error) {
return newTCPClient(host, port)
}
}

func makeTLSClient(port int, tls *tls.Config) func(string) (TransportClient, error) {
return func(host string) (TransportClient, error) {
return newTLSClient(host, port, tls)
}
}

Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type MothershipConfig struct {
Pretty *bool `yaml:"pretty"`
TLS *TLSConfig
Worker int
CompressionLevel *int `yaml:"compression_level"`
}

type Outputer interface {
Expand Down
3 changes: 3 additions & 0 deletions packetbeat/etc/packetbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ output:
# Number of workers per Logstash host.
#worker: 1

# Set gzip compression level.
#compression_level: 3

# Optional load balance the events between the Logstash hosts
#loadbalance: true

Expand Down
3 changes: 3 additions & 0 deletions topbeat/etc/topbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ output:
# Number of workers per Logstash host.
#worker: 1

# Set gzip compression level.
#compression_level: 3

# Optional load balance the events between the Logstash hosts
#loadbalance: true

Expand Down
3 changes: 3 additions & 0 deletions winlogbeat/etc/winlogbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ output:
# Number of workers per Logstash host.
#worker: 1

# Set gzip compression level.
#compression_level: 3

# Optional load balance the events between the Logstash hosts
#loadbalance: true

Expand Down