Skip to content

Commit

Permalink
Make selector string casing configurable (elastic#18854)
Browse files Browse the repository at this point in the history
Add support for configuring the string casing in the index/pipeline/key/topic 'Selector'. 

## Why is it important?

Elasticsearch pipeline and index names are required to be lower case only. When used with fields from events this was not always guaranteed, leading us to enforce lower case always (elastic#16081. elastic#6342).
As the code is reused for Kafka topic selection, this unfortunately did lead to a Regression as some users expect strings to allow mixed case (elastic#18640).
With this PR Elasticsearch related resources (e.g. index or pipeline names) are set to lowercase only, while not touching the strings in other outputs.
  • Loading branch information
Steffen Siering authored and melchiormoulin committed Oct 14, 2020
1 parent b76bc60 commit a176fba
Show file tree
Hide file tree
Showing 16 changed files with 526 additions and 137 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix the `translate_sid` processor's handling of unconfigured target fields. {issue}18990[18990] {pull}18991[18991]
- Fixed a service restart failure under Windows. {issue}18914[18914] {pull}18916[18916]
- The `monitoring.elasticsearch.api_key` value is correctly base64-encoded before being sent to the monitoring Elasticsearch cluster. {issue}18939[18939] {pull}18945[18945]
- Fix kafka topic setting not allowing upper case characters. {pull}18854[18854] {issue}18640[18640]
- Fix redis key setting not allowing upper case characters. {pull}18854[18854] {issue}18640[18640]

*Auditbeat*

Expand Down
8 changes: 5 additions & 3 deletions libbeat/idxmgmt/std.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package idxmgmt
import (
"errors"
"fmt"
"strings"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
Expand Down Expand Up @@ -198,6 +199,7 @@ func (s *indexSupport) BuildSelector(cfg *common.Config) (outputs.IndexSelector,
MultiKey: "indices",
EnableSingleOnly: true,
FailEmpty: mode != ilm.ModeEnabled,
Case: outil.SelectorLowerCase,
}

indexSel, err := outil.BuildSelectorFromConfig(selCfg, buildSettings)
Expand Down Expand Up @@ -354,21 +356,21 @@ func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string {
}

if alias, err := events.GetMetaStringValue(*evt, events.FieldMetaAlias); err == nil {
return alias
return strings.ToLower(alias)
}

if idx, err := events.GetMetaStringValue(*evt, events.FieldMetaIndex); err == nil {
ts := evt.Timestamp.UTC()
return fmt.Sprintf("%s-%d.%02d.%02d",
idx, ts.Year(), ts.Month(), ts.Day())
strings.ToLower(idx), ts.Year(), ts.Month(), ts.Day())
}

// This is functionally identical to Meta["alias"], returning the overriding
// metadata as the index name if present. It is currently used by Filebeat
// to send the index for particular inputs to formatted string templates,
// which are then expanded by a processor to the "raw_index" field.
if idx, err := events.GetMetaStringValue(*evt, events.FieldMetaRawIndex); err == nil {
return idx
return strings.ToLower(idx)
}

return ""
Expand Down
44 changes: 44 additions & 0 deletions libbeat/idxmgmt/std_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
want: stable("test-9.9.9"),
},
"without ilm must be lowercase": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "TeSt-%{[agent.version]}"},
want: stable("test-9.9.9"),
},
"event alias without ilm": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
Expand All @@ -147,6 +152,14 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
"alias": "test",
},
},
"event alias without ilm must be lowercae": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
want: stable("test"),
meta: common.MapStr{
"alias": "Test",
},
},
"event index without ilm": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
Expand All @@ -155,11 +168,24 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
"index": "test",
},
},
"event index without ilm must be lowercase": {
ilmCalls: noILM,
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
want: dateIdx("test"),
meta: common.MapStr{
"index": "Test",
},
},
"with ilm": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{"index": "wrong-%{[agent.version]}"},
want: stable("test-9.9.9"),
},
"with ilm must be lowercase": {
ilmCalls: ilmTemplateSettings("Test-9.9.9", "Test-9.9.9"),
cfg: map[string]interface{}{"index": "wrong-%{[agent.version]}"},
want: stable("test-9.9.9"),
},
"event alias wit ilm": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
Expand All @@ -168,6 +194,14 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
"alias": "event-alias",
},
},
"event alias wit ilm must be lowercase": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
want: stable("event-alias"),
meta: common.MapStr{
"alias": "Event-alias",
},
},
"event index with ilm": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{"index": "test-%{[agent.version]}"},
Expand All @@ -186,6 +220,16 @@ func TestDefaultSupport_BuildSelector(t *testing.T) {
},
want: stable("myindex"),
},
"use indices settings must be lowercase": {
ilmCalls: ilmTemplateSettings("test-9.9.9", "test-9.9.9"),
cfg: map[string]interface{}{
"index": "test-%{[agent.version]}",
"indices": []map[string]interface{}{
{"index": "MyIndex"},
},
},
want: stable("myindex"),
},
}
for name, test := range cases {
t.Run(name, func(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"time"

"go.elastic.co/apm"
Expand Down Expand Up @@ -356,7 +357,7 @@ func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error)
return "", errors.New("pipeline metadata is no string")
}

return pipeline, nil
return strings.ToLower(pipeline), nil
}

if pipelineSel != nil {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/client_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func doClientPing(t *testing.T) {
Headers: map[string]string{headerTestField: headerTestValue},
ProxyDisable: proxyDisable != "",
},
Index: outil.MakeSelector(outil.ConstSelectorExpr("test")),
Index: outil.MakeSelector(outil.ConstSelectorExpr("test", outil.SelectorLowerCase)),
}
if proxy != "" {
proxyURL, err := url.Parse(proxy)
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func TestClientWithHeaders(t *testing.T) {
"X-Test": "testing value",
},
},
Index: outil.MakeSelector(outil.ConstSelectorExpr("test")),
Index: outil.MakeSelector(outil.ConstSelectorExpr("test", outil.SelectorLowerCase)),
}, nil)
assert.NoError(t, err)

Expand Down
17 changes: 11 additions & 6 deletions libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,7 @@ func buildSelectors(
return index, pipeline, err
}

pipelineSel, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "pipeline",
MultiKey: "pipelines",
EnableSingleOnly: true,
FailEmpty: false,
})
pipelineSel, err := buildPipelineSelector(cfg)
if err != nil {
return index, pipeline, err
}
Expand All @@ -149,3 +144,13 @@ func buildSelectors(

return index, pipeline, err
}

func buildPipelineSelector(cfg *common.Config) (outil.Selector, error) {
return outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "pipeline",
MultiKey: "pipelines",
EnableSingleOnly: true,
FailEmpty: false,
Case: outil.SelectorLowerCase,
})
}
58 changes: 58 additions & 0 deletions libbeat/outputs/elasticsearch/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"testing"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
)

Expand Down Expand Up @@ -73,3 +75,59 @@ func TestGlobalConnectCallbacksManagement(t *testing.T) {
t.Fatalf("third callback cannot be retrieved")
}
}

func TestPipelineSelection(t *testing.T) {
cases := map[string]struct {
cfg map[string]interface{}
event beat.Event
want string
}{
"no pipline configured": {},
"pipeline configured": {
cfg: map[string]interface{}{"pipeline": "test"},
want: "test",
},
"pipeline must be lowercase": {
cfg: map[string]interface{}{"pipeline": "Test"},
want: "test",
},
"pipeline via event meta": {
event: beat.Event{Meta: common.MapStr{"pipeline": "test"}},
want: "test",
},
"pipeline via event meta must be lowercase": {
event: beat.Event{Meta: common.MapStr{"pipeline": "Test"}},
want: "test",
},
"pipelines setting": {
cfg: map[string]interface{}{
"pipelines": []map[string]interface{}{{"pipeline": "test"}},
},
want: "test",
},
"pipelines setting must be lowercase": {
cfg: map[string]interface{}{
"pipelines": []map[string]interface{}{{"pipeline": "Test"}},
},
want: "test",
},
}

for name, test := range cases {
t.Run(name, func(t *testing.T) {
selector, err := buildPipelineSelector(common.MustNewConfigFrom(test.cfg))
if err != nil {
t.Fatalf("Failed to parse configuration: %v", err)
}

got, err := getPipeline(&test.event, &selector)
if err != nil {
t.Fatalf("Failed to create pipeline name: %v", err)
}

if test.want != got {
t.Errorf("Pipeline name missmatch (want: %v, got: %v)", test.want, got)
}
})
}
}
62 changes: 62 additions & 0 deletions libbeat/outputs/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/internal/testutil"
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down Expand Up @@ -131,3 +132,64 @@ func TestBackoffFunc(t *testing.T) {
})
}
}

func TestTopicSelection(t *testing.T) {
cases := map[string]struct {
cfg map[string]interface{}
event beat.Event
want string
}{
"topic configured": {
cfg: map[string]interface{}{"topic": "test"},
want: "test",
},
"topic must keep case": {
cfg: map[string]interface{}{"topic": "Test"},
want: "Test",
},
"topics setting": {
cfg: map[string]interface{}{
"topics": []map[string]interface{}{{"topic": "test"}},
},
want: "test",
},
"topics setting must keep case": {
cfg: map[string]interface{}{
"topics": []map[string]interface{}{{"topic": "Test"}},
},
want: "Test",
},
"use event field": {
cfg: map[string]interface{}{"topic": "test-%{[field]}"},
event: beat.Event{
Fields: common.MapStr{"field": "from-event"},
},
want: "test-from-event",
},
"use event field must keep case": {
cfg: map[string]interface{}{"topic": "Test-%{[field]}"},
event: beat.Event{
Fields: common.MapStr{"field": "From-Event"},
},
want: "Test-From-Event",
},
}

for name, test := range cases {
t.Run(name, func(t *testing.T) {
selector, err := buildTopicSelector(common.MustNewConfigFrom(test.cfg))
if err != nil {
t.Fatalf("Failed to parse configuration: %v", err)
}

got, err := selector.Select(&test.event)
if err != nil {
t.Fatalf("Failed to create topic name: %v", err)
}

if test.want != got {
t.Errorf("Pipeline name missmatch (want: %v, got: %v)", test.want, got)
}
})
}
}
17 changes: 11 additions & 6 deletions libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,7 @@ func makeKafka(
return outputs.Fail(err)
}

topic, err := outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "topic",
MultiKey: "topics",
EnableSingleOnly: true,
FailEmpty: true,
})
topic, err := buildTopicSelector(cfg)
if err != nil {
return outputs.Fail(err)
}
Expand Down Expand Up @@ -102,3 +97,13 @@ func makeKafka(
}
return outputs.Success(config.BulkMaxSize, retry, client)
}

func buildTopicSelector(cfg *common.Config) (outil.Selector, error) {
return outil.BuildSelectorFromConfig(cfg, outil.Settings{
Key: "topic",
MultiKey: "topics",
EnableSingleOnly: true,
FailEmpty: true,
Case: outil.SelectorKeepCase,
})
}
2 changes: 1 addition & 1 deletion libbeat/outputs/logstash/logstash_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func esConnect(t *testing.T, index string) *esConnection {

host := getElasticsearchHost()
indexFmt := fmtstr.MustCompileEvent(fmt.Sprintf("%s-%%{+yyyy.MM.dd}", index))
indexFmtExpr, _ := outil.FmtSelectorExpr(indexFmt, "")
indexFmtExpr, _ := outil.FmtSelectorExpr(indexFmt, "", outil.SelectorLowerCase)
indexSel := outil.MakeSelector(indexFmtExpr)
index, _ = indexSel.Select(&beat.Event{
Timestamp: ts,
Expand Down
Loading

0 comments on commit a176fba

Please sign in to comment.