diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7764ab53ee5..245d9608c3b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -10,6 +10,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Affecting all Beats* +- The document id fields has been renamed from @metadata.id to @metadata._id {pull}15859[15859] + *Auditbeat* @@ -77,6 +79,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Affecting all Beats* +- Add document_id setting to decode_json_fields processor. {pull}15859[15859] + *Auditbeat* diff --git a/filebeat/docs/inputs/input-common-harvester-options.asciidoc b/filebeat/docs/inputs/input-common-harvester-options.asciidoc index c7f053dfdc0..e75906f7b64 100644 --- a/filebeat/docs/inputs/input-common-harvester-options.asciidoc +++ b/filebeat/docs/inputs/input-common-harvester-options.asciidoc @@ -193,7 +193,7 @@ occur. *`document_id`*:: Option configuration setting that specifies the JSON key to set the document id. If configured, the field will be removed from the original -json document and stored in `@metadata.id` +json document and stored in `@metadata._id` *`ignore_decoding_error`*:: An optional configuration setting that specifies if JSON decoding errors should be logged or not. If set to true, errors will not diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 53c5954d2b0..5f0c47c9878 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -444,7 +444,7 @@ func (h *Harvester) onMessage( if id != "" { meta = common.MapStr{ - "id": id, + "_id": id, } } } else if &text != nil { diff --git a/filebeat/tests/system/test_json.py b/filebeat/tests/system/test_json.py index 04794e5ccee..bea6d9495d1 100644 --- a/filebeat/tests/system/test_json.py +++ b/filebeat/tests/system/test_json.py @@ -254,8 +254,8 @@ def test_id_in_message(self): assert len(output) == 3 for i in xrange(len(output)): - assert("@metadata.id" in output[i]) - assert(output[i]["@metadata.id"] == str(i)) + assert("@metadata._id" in output[i]) + assert(output[i]["@metadata._id"] == str(i)) assert("json.id" not in output[i]) def test_with_generic_filtering(self): diff --git a/libbeat/beat/event.go b/libbeat/beat/event.go index 54d5c9398dc..d156a887a93 100644 --- a/libbeat/beat/event.go +++ b/libbeat/beat/event.go @@ -51,7 +51,7 @@ func (e *Event) SetID(id string) { if e.Meta == nil { e.Meta = common.MapStr{} } - e.Meta["id"] = id + e.Meta["_id"] = id } func (e *Event) GetValue(key string) (interface{}, error) { diff --git a/libbeat/beat/event_test.go b/libbeat/beat/event_test.go index f1026654a0a..789a3e0a994 100644 --- a/libbeat/beat/event_test.go +++ b/libbeat/beat/event_test.go @@ -50,7 +50,7 @@ func TestEventPutGetTimestamp(t *testing.T) { func TestEventMetadata(t *testing.T) { const id = "123" - newMeta := func() common.MapStr { return common.MapStr{"id": id} } + newMeta := func() common.MapStr { return common.MapStr{"_id": id} } t.Run("put", func(t *testing.T) { evt := newEmptyEvent() @@ -75,7 +75,7 @@ func TestEventMetadata(t *testing.T) { t.Run("put sub-key", func(t *testing.T) { evt := newEmptyEvent() - evt.PutValue("@metadata.id", id) + evt.PutValue("@metadata._id", id) assert.Equal(t, newMeta(), evt.Meta) assert.Empty(t, evt.Fields) @@ -85,7 +85,7 @@ func TestEventMetadata(t *testing.T) { evt := newEmptyEvent() evt.Meta = newMeta() - v, err := evt.GetValue("@metadata.id") + v, err := evt.GetValue("@metadata._id") assert.NoError(t, err) assert.Equal(t, id, v) @@ -105,7 +105,7 @@ func TestEventMetadata(t *testing.T) { evt := newEmptyEvent() evt.Meta = newMeta() - err := evt.Delete("@metadata.id") + err := evt.Delete("@metadata._id") assert.NoError(t, err) assert.Empty(t, evt.Meta) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 10541f08013..5c3a1bc7b48 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -437,7 +437,7 @@ func createEventBulkMeta( var id string if m := event.Meta; m != nil { - if tmp := m["id"]; tmp != nil { + if tmp := m["_id"]; tmp != nil { if s, ok := tmp.(string); ok { id = s } else { diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index ca85a0d4bfd..86712a81fee 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -40,6 +40,7 @@ type decodeJSONFields struct { overwriteKeys bool addErrorKey bool processArray bool + documentID string target *string } @@ -50,6 +51,7 @@ type config struct { AddErrorKey bool `config:"add_error_key"` ProcessArray bool `config:"process_array"` Target *string `config:"target"` + DocumentID string `config:"document_id"` } var ( @@ -81,7 +83,15 @@ func NewDecodeJSONFields(c *common.Config) (processors.Processor, error) { return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err) } - f := &decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, overwriteKeys: config.OverwriteKeys, addErrorKey: config.AddErrorKey, processArray: config.ProcessArray, target: config.Target} + f := &decodeJSONFields{ + fields: config.Fields, + maxDepth: config.MaxDepth, + overwriteKeys: config.OverwriteKeys, + addErrorKey: config.AddErrorKey, + processArray: config.ProcessArray, + documentID: config.DocumentID, + target: config.Target, + } return f, nil } @@ -115,6 +125,18 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) { target = *f.target } + var id string + if key := f.documentID; key != "" { + if dict, ok := output.(map[string]interface{}); ok { + if tmp, err := common.MapStr(dict).GetValue(key); err == nil { + if v, ok := tmp.(string); ok { + id = v + common.MapStr(dict).Delete(key) + } + } + } + } + if target != "" { _, err = event.PutValue(target, output) } else { @@ -131,6 +153,13 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) { errs = append(errs, err.Error()) continue } + + if id != "" { + if event.Meta == nil { + event.Meta = common.MapStr{} + } + event.Meta["_id"] = id + } } if len(errs) > 0 { diff --git a/libbeat/processors/actions/decode_json_fields_test.go b/libbeat/processors/actions/decode_json_fields_test.go index 69650ac9a4a..3e801d69e5e 100644 --- a/libbeat/processors/actions/decode_json_fields_test.go +++ b/libbeat/processors/actions/decode_json_fields_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" @@ -94,6 +95,38 @@ func TestInvalidJSONMultiple(t *testing.T) { assert.Equal(t, expected.String(), actual.String()) } +func TestDocumentID(t *testing.T) { + logp.TestingSetup() + + input := common.MapStr{ + "msg": `{"log": "message", "myid": "myDocumentID"}`, + } + + config := common.MustNewConfigFrom(map[string]interface{}{ + "fields": []string{"msg"}, + "document_id": "myid", + }) + + p, err := NewDecodeJSONFields(config) + if err != nil { + logp.Err("Error initializing decode_json_fields") + t.Fatal(err) + } + + actual, err := p.Run(&beat.Event{Fields: input}) + require.NoError(t, err) + + wantFields := common.MapStr{ + "msg": map[string]interface{}{"log": "message"}, + } + wantMeta := common.MapStr{ + "_id": "myDocumentID", + } + + assert.Equal(t, wantFields, actual.Fields) + assert.Equal(t, wantMeta, actual.Meta) +} + func TestValidJSONDepthOne(t *testing.T) { input := common.MapStr{ "msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}", diff --git a/libbeat/processors/actions/docs/decode_json_fields.asciidoc b/libbeat/processors/actions/docs/decode_json_fields.asciidoc index fb86cf24fb0..a62b8169fda 100644 --- a/libbeat/processors/actions/docs/decode_json_fields.asciidoc +++ b/libbeat/processors/actions/docs/decode_json_fields.asciidoc @@ -34,3 +34,6 @@ default value is false. `error` field is going to be part of event with error message. If it set to false, there will not be any error in event's field. Even error occurs while decoding json keys. The default value is false +`document_id`:: (Optional) JSON key to use as the document id. If configured, +the field will be removed from the original json document and stored in +`@metadata._id` diff --git a/libbeat/processors/add_id/add_id_test.go b/libbeat/processors/add_id/add_id_test.go index a7bd9baa8c5..6a3fb5b3a09 100644 --- a/libbeat/processors/add_id/add_id_test.go +++ b/libbeat/processors/add_id/add_id_test.go @@ -36,7 +36,7 @@ func TestDefaultTargetField(t *testing.T) { newEvent, err := p.Run(testEvent) assert.NoError(t, err) - v, err := newEvent.GetValue("@metadata.id") + v, err := newEvent.GetValue("@metadata._id") assert.NoError(t, err) assert.NotEmpty(t, v) } @@ -59,7 +59,7 @@ func TestNonDefaultTargetField(t *testing.T) { assert.NoError(t, err) assert.NotEmpty(t, v) - v, err = newEvent.GetValue("@metadata.id") + v, err = newEvent.GetValue("@metadata._id") assert.NoError(t, err) assert.Empty(t, v) } diff --git a/libbeat/processors/add_id/config.go b/libbeat/processors/add_id/config.go index 40b4d305de6..ca28d48d68e 100644 --- a/libbeat/processors/add_id/config.go +++ b/libbeat/processors/add_id/config.go @@ -29,7 +29,7 @@ type config struct { func defaultConfig() config { return config{ - TargetField: "@metadata.id", + TargetField: "@metadata._id", Type: "elasticsearch", } } diff --git a/libbeat/processors/add_id/docs/add_id.asciidoc b/libbeat/processors/add_id/docs/add_id.asciidoc index 64d475669a0..0d9f402be8c 100644 --- a/libbeat/processors/add_id/docs/add_id.asciidoc +++ b/libbeat/processors/add_id/docs/add_id.asciidoc @@ -11,7 +11,7 @@ processors: The following settings are supported: -`target_field`:: (Optional) Field where the generated ID will be stored. Default is `@metadata.id`. +`target_field`:: (Optional) Field where the generated ID will be stored. Default is `@metadata._id`. `type`:: (Optional) Type of ID to generate. Currently only `elasticsearch` is supported and is the default. The `elasticsearch` type generates IDs using the same algorithm that Elasticsearch uses for auto-generating diff --git a/metricbeat/mb/event_test.go b/metricbeat/mb/event_test.go index 433dae3c2a1..1040102de8c 100644 --- a/metricbeat/mb/event_test.go +++ b/metricbeat/mb/event_test.go @@ -139,7 +139,7 @@ func TestEventConversionToBeatEvent(t *testing.T) { e := mbEvent.BeatEvent(module, metricSet) e = mbEvent.BeatEvent(module, metricSet) - assert.Equal(t, "foobar", e.Meta["id"]) + assert.Equal(t, "foobar", e.Meta["_id"]) assert.Equal(t, timestamp, e.Timestamp) assert.Equal(t, common.MapStr{ "type": "docker",