Skip to content

Commit

Permalink
Implement zlib compression option.
Browse files Browse the repository at this point in the history
  • Loading branch information
zackwine authored and PettitWesley committed Sep 14, 2020
1 parent dd7ce3b commit a28de66
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 12 deletions.
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ If you think you’ve found a potential security issue, please do not post it in
* `experimental_concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default `experimental_concurrency` is set to 0 and records are flushed in Fluent Bit's single thread. This means that requests to Kinesis will block the execution of Fluent Bit. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `experimental_concurrency` limit is reached calls to Flush will return a retry code. The upper limit of the `experimental_concurrency` option is `10`. WARNING: Enabling `experimental_concurrency` can lead to data loss if the retry count is reached. Enabling concurrency will increase resource usage (memory and CPU).
* `experimental_concurrency_retries`: Specify a limit to the number of retries concurrent goroutines will attempt. By default `4` retries will be attempted before records are dropped.
* `aggregation`: Setting `aggregation` to `true` will enable KPL aggregation of records sent to Kinesis. This feature isn't compatible with the `partition_key` feature. See the KPL aggregation section below for more details.
* `compression`: Setting `compression` to `zlib` will enable zlib compression of each record. By default this feature is disabled and records are not compressed.

### Permissions

Expand Down Expand Up @@ -129,3 +130,38 @@ More specifically, increasing the flush value will ensure the most records are a
aggregation true
append_newline true
```

### ZLIB Compression

Enabling `zlib` compression will compress each record individually reducing the network bandwidth required to send logs. Using this feature in conjunction with `aggregation` can greatly reduce the number of Kinesis shards required.

Compression Advantages:

- Reduces network bandwidth required
- Reduces Kinesis shard count in some scenarios

Compression Disadvantages:

- Fluentbit will require more CPU and memory to send records
- A consumer must decompress the records


Example config:

```
[SERVICE]
Flush 20
[INPUT]
Name forward
Listen 0.0.0.0
Port 24224
[OUTPUT]
Name kinesis
Match *
region us-west-2
stream my-kinesis-stream-name
compression zlib
append_newline true
```
15 changes: 13 additions & 2 deletions fluent-bit-kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
logKey := output.FLBPluginConfigKey(ctx, "log_key")
logrus.Infof("[kinesis %d] plugin parameter log_key = '%s'", pluginID, logKey)
aggregation := output.FLBPluginConfigKey(ctx, "aggregation")
logrus.Infof("[kinesis %d] plugin parameter aggregation = %s", pluginID, aggregation)
logrus.Infof("[kinesis %d] plugin parameter aggregation = '%s'", pluginID, aggregation)
compression := output.FLBPluginConfigKey(ctx, "compression")
logrus.Infof("[kinesis %d] plugin parameter compression = '%s'", pluginID, compression)

if stream == "" || region == "" {
return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID)
Expand Down Expand Up @@ -145,7 +147,16 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
concurrencyRetriesInt = defaultConcurrentRetries
}

return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, concurrencyInt, concurrencyRetriesInt, isAggregate, appendNL, pluginID)
var comp kinesis.CompressionType
if strings.ToLower(compression) == string(kinesis.CompressionZlib) {
comp = kinesis.CompressionZlib
} else if strings.ToLower(compression) == string(kinesis.CompressionNone) || compression == "" {
comp = kinesis.CompressionNone
} else {
return nil, fmt.Errorf("[kinesis %d] Invalid 'compression' value (%s) specified, must be 'zlib', 'none', or undefined", pluginID, compression)
}

return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, concurrencyInt, concurrencyRetriesInt, isAggregate, appendNL, comp, pluginID)
}

// The "export" comments have syntactic meaning
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ require (
github.com/aws/aws-sdk-go v1.34.13
github.com/fluent/fluent-bit-go v0.0.0-20200707230002-2a28684e2382
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.4.1
github.com/json-iterator/go v1.1.10
github.com/lestrrat-go/strftime v1.0.3
github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.5.1
google.golang.org/protobuf v1.25.0
)
68 changes: 68 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/aws/amazon-kinesis-firehose-for-fluent-bit v1.4.1 h1:Vq+RfIAuyzw5t1J4sts7rZdBLRWm2SB0r79OQsvdk40=
github.com/aws/amazon-kinesis-firehose-for-fluent-bit v1.4.1/go.mod h1:2hixxBh6Xygvxe6x/PSUTphh8dZ0WptVty98ftZIhAU=
github.com/aws/aws-sdk-go v1.33.14 h1:ucjyVEvtIdtn4acf+RKsgk6ybAYeMLXpGZeqoVvi7Kk=
Expand All @@ -12,17 +14,41 @@ github.com/aws/aws-sdk-go v1.34.9 h1:cUGBW9CVdi0mS7K1hDzxIqTpfeWhpoQiguq81M1tjK0
github.com/aws/aws-sdk-go v1.34.9/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go v1.34.13 h1:wwNWSUh4FGJxXVOVVNj2lWI8wTe5hK8sGWlK7ziEcgg=
github.com/aws/aws-sdk-go v1.34.13/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fluent/fluent-bit-go v0.0.0-20200707230002-2a28684e2382 h1:gV8vs5nuIWEbCn/EnHa8/2VEbFcGpchUhqftnxHSOrI=
github.com/fluent/fluent-bit-go v0.0.0-20200707230002-2a28684e2382/go.mod h1:L92h+dgwElEyUuShEwjbiHjseW410WIcNz+Bjutc8YQ=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is=
github.com/lestrrat-go/strftime v1.0.3 h1:qqOPU7y+TM8Y803I8fG9c/DyKG3xH/xkng6keC1015Q=
github.com/lestrrat-go/strftime v1.0.3/go.mod h1:E1nN3pCbtMSu1yjSVeyuRFVm/U0xoR76fd03sz+Qz4g=
Expand All @@ -33,24 +59,66 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/sirupsen/logrus v1.6.0 h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
42 changes: 41 additions & 1 deletion kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package kinesis

import (
"bytes"
"compress/zlib"
"fmt"
"math/rand"
"os"
Expand Down Expand Up @@ -67,6 +68,16 @@ type random struct {
buffer []byte
}

// CompressionType indicates the type of compression to apply to each record
type CompressionType string

const (
// CompressionNone disables compression
CompressionNone CompressionType = "none"
// CompressionZlib enables zlib compression
CompressionZlib = "zlib"
)

// OutputPlugin sends log records to kinesis
type OutputPlugin struct {
// The name of the stream that you want log records sent to
Expand Down Expand Up @@ -95,10 +106,11 @@ type OutputPlugin struct {
isAggregate bool
aggregator *aggregate.Aggregator
aggregatePartitionKey string
compression CompressionType
}

// NewOutputPlugin creates an OutputPlugin object
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeFmt, logKey string, concurrency, retryLimit int, isAggregate, appendNewline bool, pluginID int) (*OutputPlugin, error) {
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeFmt, logKey string, concurrency, retryLimit int, isAggregate, appendNewline bool, compression CompressionType, pluginID int) (*OutputPlugin, error) {
client, err := newPutRecordsClient(roleARN, region, kinesisEndpoint, stsEndpoint)
if err != nil {
return nil, err
Expand Down Expand Up @@ -153,6 +165,7 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEnd
concurrencyRetryLimit: retryLimit,
isAggregate: isAggregate,
aggregator: aggregator,
compression: compression,
}, nil
}

Expand Down Expand Up @@ -397,6 +410,13 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface
data = append(data, []byte("\n")...)
}

if outputPlugin.compression == CompressionZlib {
data, err = zlibCompress(data)
if err != nil {
return nil, err
}
}

if len(data)+len(partitionKey) > maximumRecordSize {
return nil, fmt.Errorf("Log record greater than max size allowed by Kinesis")
}
Expand Down Expand Up @@ -512,6 +532,26 @@ func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interfa
return outputPlugin.randomString()
}

func zlibCompress(data []byte) ([]byte, error) {
var b bytes.Buffer

if data == nil {
return nil, fmt.Errorf("No data to compress. 'nil' value passed as data")
}

zw := zlib.NewWriter(&b)
_, err := zw.Write(data)
if err != nil {
return data, err
}
err = zw.Close()
if err != nil {
return data, err
}

return b.Bytes(), nil
}

// stringOrByteArray returns the string value if the input is a string or byte array otherwise an empty string
func stringOrByteArray(v interface{}) string {
switch t := v.(type) {
Expand Down
33 changes: 24 additions & 9 deletions kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient, isAggregate
}

return &OutputPlugin{
stream: "stream",
client: client,
dataKeys: "",
partitionKey: "",
timer: timer,
PluginID: 0,
random: random,
stream: "stream",
client: client,
dataKeys: "",
partitionKey: "",
timer: timer,
PluginID: 0,
random: random,
concurrencyRetryLimit: concurrencyRetryLimit,
isAggregate: isAggregate,
aggregator: aggregator,
isAggregate: isAggregate,
aggregator: aggregator,
}, nil
}

Expand Down Expand Up @@ -168,3 +168,18 @@ func TestAddRecordWithConcurrency(t *testing.T) {
retCode = outputPlugin.FlushConcurrent(len(records), records)
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected FlushConcurrent return code to be FLB_OK")
}

func TestZlibCompression(t *testing.T) {

testData := []byte("Test Data: This is test data for compression. This data is needs to have with some repetitive values, so compression is effective.")

compressedBuf, err := zlibCompress(testData)
assert.Equal(t, err, nil, "Expected successful compression of data")
assert.Lessf(t, len(compressedBuf), len(testData), "Compressed data buffer should contain fewer bytes")
}

func TestZlibCompressionEmpty(t *testing.T) {

_, err := zlibCompress(nil)
assert.NotEqual(t, err, nil, "'nil' data should return an error")
}

0 comments on commit a28de66

Please sign in to comment.