Skip to content

Commit

Permalink
x-pack/filebeat/input/http_endpoint: allow user to place events at do…
Browse files Browse the repository at this point in the history
…c root
  • Loading branch information
efd6 committed May 13, 2024
1 parent 9aa59da commit bf8e701
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 6 deletions.
13 changes: 12 additions & 1 deletion x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ Custom response example:
prefix: "json"
----

Map request to root of document example:
["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
- type: http_endpoint
enabled: true
listen_address: 192.168.1.1
listen_port: 8080
prefix: "."
----

Multiple endpoints example:
["source","yaml",subs="attributes"]
----
Expand Down Expand Up @@ -306,7 +317,7 @@ This options specific which URL path to accept requests on. Defaults to `/`
[float]
==== `prefix`

This option specifies which prefix the incoming request will be mapped to.
This option specifies which prefix the incoming request will be mapped to. If `prefix` is "`.`", the request will be mapped to the root of the resulting document.

[float]
==== `include_headers`
Expand Down
12 changes: 7 additions & 5 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,15 @@ func (h *handler) sendResponse(w http.ResponseWriter, status int, message string
func (h *handler) publishEvent(obj, headers mapstr.M, acker *batchACKTracker) error {
event := beat.Event{
Timestamp: time.Now().UTC(),
Fields: mapstr.M{},
Private: acker,
}
if h.messageField == "." {
event.Fields = obj
} else {
if _, err := event.PutValue(h.messageField, obj); err != nil {
return fmt.Errorf("failed to put data into event key %q: %w", h.messageField, err)
}
}
if h.preserveOriginalEvent {
event.Fields["event"] = mapstr.M{
"original": obj.String(),
Expand All @@ -319,10 +325,6 @@ func (h *handler) publishEvent(obj, headers mapstr.M, acker *batchACKTracker) er
event.Fields["headers"] = headers
}

if _, err := event.PutValue(h.messageField, obj); err != nil {
return fmt.Errorf("failed to put data into event key %q: %w", h.messageField, err)
}

h.publish(event)
return nil
}
Expand Down
20 changes: 20 additions & 0 deletions x-pack/filebeat/input/http_endpoint/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,26 @@ func Test_apiResponse(t *testing.T) {
wantStatus: http.StatusOK,
wantResponse: `{"message": "success"}`,
},
{
name: "single_event_root",
conf: func() config {
c := defaultConfig()
c.Prefix = "."
return c
}(),
request: func() *http.Request {
req := httptest.NewRequest(http.MethodPost, "/", bytes.NewBufferString(`{"id":0}`))
req.Header.Set("Content-Type", "application/json")
return req
}(),
events: []mapstr.M{
{
"id": int64(0),
},
},
wantStatus: http.StatusOK,
wantResponse: `{"message": "success"}`,
},
{
name: "single_event_gzip",
conf: defaultConfig(),
Expand Down

0 comments on commit bf8e701

Please sign in to comment.