Skip to content

Commit

Permalink
[http_endpoint input] Add support for including headers and preservin…
Browse files Browse the repository at this point in the history
…g original events (#26279)

* adding header support and event.original for the http_endpoint input

* add changelog

* updating docs

* adding tests

* applying comments from PR

* adding more tests, and applying changes related to PR comments

* linting

* stashing changes

* small changes

* Linting

* adding comments from PR

(cherry picked from commit 794df17)
  • Loading branch information
P1llus authored and mergify-bot committed Jun 22, 2021
1 parent b0e5e75 commit 757b2a6
Show file tree
Hide file tree
Showing 7 changed files with 423 additions and 110 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- http_endpoint: Support multiple documents in a single request by POSTing an array or NDJSON format. {pull}25764[25764]
- Add new `parser` to `filestream` input: `container`. {pull}26115[26115]
- Add support for ISO8601 timestamps in Zeek fileset {pull}25564[25564]
- Add possibility to include headers in resulting docs and preserve the original event in http_endpoint input {pull}26279[26279]
- Add `preserve_original_event` option to `o365audit` input. {pull}26273[26273]
- Add `log.flags` to events created by the `aws-s3` input. {pull}26267[26267]
- Add `include_s3_metadata` config option to the `aws-s3` input for including object metadata in events. {pull}26267[26267]
Expand Down
25 changes: 25 additions & 0 deletions x-pack/filebeat/docs/inputs/input-http-endpoint.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ Validate a HMAC signature from a specific header
hmac.prefix: "sha256="
----

Preserving original event and including headers in document
["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
- type: http_endpoint
enabled: true
listen_address: 192.168.1.1
listen_port: 8080
preserve_original_event: true
include_headers: ["TestHeader"]
----

==== Configuration options

The `http_endpoint` input supports the following configuration options plus the
Expand Down Expand Up @@ -182,6 +194,19 @@ This options specific which URL path to accept requests on. Defaults to `/`

This option specifies which prefix the incoming request will be mapped to.

[float]
==== `include_headers`

This options specifies a list of HTTP headers that should be copied from the incoming request and included in the document.
All configured headers will always be canonicalized to match the headers of the incoming request.
For example, `["content-type"]` will become `["Content-Type"]` when the filebeat is running.

[float]
==== `preserve_original_event`

This option copies the raw unmodified body of the incoming request to the event.original field as a string before sending the event to Elasticsearch.


[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

Expand Down
44 changes: 27 additions & 17 deletions x-pack/filebeat/input/http_endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,32 @@ package http_endpoint
import (
"encoding/json"
"errors"
"net/textproto"

"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
)

// Config contains information about httpjson configuration
type config struct {
TLS *tlscommon.ServerConfig `config:"ssl"`
BasicAuth bool `config:"basic_auth"`
Username string `config:"username"`
Password string `config:"password"`
ResponseCode int `config:"response_code" validate:"positive"`
ResponseBody string `config:"response_body"`
ListenAddress string `config:"listen_address"`
ListenPort string `config:"listen_port"`
URL string `config:"url"`
Prefix string `config:"prefix"`
ContentType string `config:"content_type"`
SecretHeader string `config:"secret.header"`
SecretValue string `config:"secret.value"`
HMACHeader string `config:"hmac.header"`
HMACKey string `config:"hmac.key"`
HMACType string `config:"hmac.type"`
HMACPrefix string `config:"hmac.prefix"`
TLS *tlscommon.ServerConfig `config:"ssl"`
BasicAuth bool `config:"basic_auth"`
Username string `config:"username"`
Password string `config:"password"`
ResponseCode int `config:"response_code" validate:"positive"`
ResponseBody string `config:"response_body"`
ListenAddress string `config:"listen_address"`
ListenPort string `config:"listen_port"`
URL string `config:"url"`
Prefix string `config:"prefix"`
ContentType string `config:"content_type"`
SecretHeader string `config:"secret.header"`
SecretValue string `config:"secret.value"`
HMACHeader string `config:"hmac.header"`
HMACKey string `config:"hmac.key"`
HMACType string `config:"hmac.type"`
HMACPrefix string `config:"hmac.prefix"`
IncludeHeaders []string `config:"include_headers"`
PreserveOriginalEvent bool `config:"preserve_original_event"`
}

func defaultConfig() config {
Expand Down Expand Up @@ -78,3 +81,10 @@ func (c *config) Validate() error {

return nil
}

func canonicalizeHeaders(headerConf []string) (includeHeaders []string) {
for i := range headerConf {
headerConf[i] = textproto.CanonicalMIMEHeaderKey(headerConf[i])
}
return headerConf
}
115 changes: 93 additions & 22 deletions x-pack/filebeat/input/http_endpoint/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
package http_endpoint

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
Expand All @@ -23,9 +23,11 @@ type httpHandler struct {
log *logp.Logger
publisher stateless.Publisher

messageField string
responseCode int
responseBody string
messageField string
responseCode int
responseBody string
includeHeaders []string
preserveOriginalEvent bool
}

var (
Expand All @@ -35,14 +37,17 @@ var (

// Triggers if middleware validation returns successful
func (h *httpHandler) apiResponse(w http.ResponseWriter, r *http.Request) {
objs, status, err := httpReadJSON(r.Body)
var headers map[string]interface{}
objs, _, status, err := httpReadJSON(r.Body)
if err != nil {
sendErrorResponse(w, status, err)
return
}

if len(h.includeHeaders) > 0 {
headers = getIncludedHeaders(r, h.includeHeaders)
}
for _, obj := range objs {
h.publishEvent(obj)
h.publishEvent(obj, headers)
}
h.sendResponse(w, h.responseCode, h.responseBody)
}
Expand All @@ -53,13 +58,19 @@ func (h *httpHandler) sendResponse(w http.ResponseWriter, status int, message st
io.WriteString(w, message)
}

func (h *httpHandler) publishEvent(obj common.MapStr) {
func (h *httpHandler) publishEvent(obj common.MapStr, headers common.MapStr) {
event := beat.Event{
Timestamp: time.Now().UTC(),
Fields: common.MapStr{
h.messageField: obj,
},
}
if h.preserveOriginalEvent {
event.PutValue("event.original", obj.String())
}
if len(headers) > 0 {
event.PutValue("headers", headers)
}

h.publisher.Publish(event)
}
Expand All @@ -82,34 +93,94 @@ func sendErrorResponse(w http.ResponseWriter, status int, err error) {
e.Encode(common.MapStr{"message": err.Error()})
}

func httpReadJSON(body io.Reader) (objs []common.MapStr, status int, err error) {
func httpReadJSON(body io.Reader) (objs []common.MapStr, rawMessages []json.RawMessage, status int, err error) {
if body == http.NoBody {
return nil, http.StatusNotAcceptable, errBodyEmpty
return nil, nil, http.StatusNotAcceptable, errBodyEmpty
}
obj, rawMessage, err := decodeJSON(body)
if err != nil {
return nil, nil, http.StatusBadRequest, err
}
return obj, rawMessage, http.StatusOK, err

}

func decodeJSON(body io.Reader) (objs []common.MapStr, rawMessages []json.RawMessage, err error) {
decoder := json.NewDecoder(body)
for idx := 0; ; idx++ {
var obj interface{}
if err := decoder.Decode(&obj); err != nil {
for decoder.More() {
var raw json.RawMessage
if err := decoder.Decode(&raw); err != nil {
if err == io.EOF {
break
}
return nil, http.StatusBadRequest, errors.Wrapf(err, "malformed JSON object at stream position %d", idx)
return nil, nil, errors.Wrapf(err, "malformed JSON object at stream position %d", decoder.InputOffset())
}

var obj interface{}
if err := newJSONDecoder(bytes.NewReader(raw)).Decode(&obj); err != nil {
return nil, nil, errors.Wrapf(err, "malformed JSON object at stream position %d", decoder.InputOffset())
}
switch v := obj.(type) {
case map[string]interface{}:
objs = append(objs, v)
rawMessages = append(rawMessages, raw)
case []interface{}:
for listIdx, listObj := range v {
asMap, ok := listObj.(map[string]interface{})
if !ok {
return nil, http.StatusBadRequest, fmt.Errorf("%v at stream %d index %d", errUnsupportedType, idx, listIdx)
}
objs = append(objs, asMap)
nobjs, nrawMessages, err := decodeJSONArray(bytes.NewReader(raw))
if err != nil {
return nil, nil, errors.Wrapf(err, "recursive error %d", decoder.InputOffset())
}
objs = append(objs, nobjs...)
rawMessages = append(rawMessages, nrawMessages...)
default:
return nil, http.StatusBadRequest, errUnsupportedType
return nil, nil, errUnsupportedType
}
}
return objs, rawMessages, nil
}

func decodeJSONArray(raw *bytes.Reader) (objs []common.MapStr, rawMessages []json.RawMessage, err error) {
dec := newJSONDecoder(raw)
token, err := dec.Token()
if token != json.Delim('[') || err != nil {
return nil, nil, errors.Wrapf(err, "malformed JSON array, not starting with delimiter [ at position: %d", dec.InputOffset())
}

for dec.More() {
var raw json.RawMessage
if err := dec.Decode(&raw); err != nil {
if err == io.EOF {
break
}
return nil, nil, errors.Wrapf(err, "malformed JSON object at stream position %d", dec.InputOffset())
}

var obj interface{}
if err := newJSONDecoder(bytes.NewReader(raw)).Decode(&obj); err != nil {
return nil, nil, errors.Wrapf(err, "malformed JSON object at stream position %d", dec.InputOffset())
}

m, ok := obj.(map[string]interface{})
if ok {
rawMessages = append(rawMessages, raw)
objs = append(objs, m)
}
}
return
}

func getIncludedHeaders(r *http.Request, headerConf []string) (includedHeaders common.MapStr) {
includedHeaders = common.MapStr{}
for _, header := range headerConf {
h, found := r.Header[header]
if found {
includedHeaders.Put(header, h)
}
}
return objs, 0, nil
return includedHeaders
}

func newJSONDecoder(r io.Reader) *json.Decoder {
dec := json.NewDecoder(r)
dec.UseNumber()
return dec
}
Loading

0 comments on commit 757b2a6

Please sign in to comment.