Skip to content

Commit

Permalink
x-pack/filebeat/input/cel: add support for http+{unix,npipe} schemes (#…
Browse files Browse the repository at this point in the history
…33712)

Also explicitly add cel to elastic/security-external-integrations
ownership list.
  • Loading branch information
efd6 authored Nov 18, 2022
1 parent 7bffd5a commit cd547c6
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 6 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ CHANGELOG*
/x-pack/filebeat/docs/ # Listed without an owner to avoid maintaining doc ownership for each input and module.
/x-pack/filebeat/input/awscloudwatch/ @elastic/obs-cloud-monitoring
/x-pack/filebeat/input/awss3/ @elastic/obs-cloud-monitoring
/x-pack/filebeat/input/cel/ @elastic/security-external-integrations
/x-pack/filebeat/input/gcppubsub/ @elastic/security-external-integrations
/x-pack/filebeat/input/http_endpoint/ @elastic/security-external-integrations
/x-pack/filebeat/input/httpjson/ @elastic/security-external-integrations
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Modified `aws-s3` input to reduce mutex contention when multiple SQS message are being processed concurrently. {pull}33658[33658]
- Disable "event normalization" processing for the aws-s3 input to reduce allocations. {pull}33673[33673]
- Add Common Expression Language input. {pull}31233[31233]
- Add support for http+unix and http+npipe schemes. {issue}33571[33571] {pull}33610[33610]
- Add support for http+unix and http+npipe schemes in httpjson input. {issue}33571[33571] {pull}33610[33610]
- Add support for http+unix and http+npipe schemes in cel input. {issue}33571[33571] {pull}33712[33712]

*Auditbeat*

Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/docs/inputs/input-cel.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,9 @@ with `auth.oauth2.google.jwt_file` or `auth.oauth2.google.jwt_json`.
The URL of the HTTP API. Required.
The API endpoint may be accessed via unix socket and Windows named pipes by adding `+unix` or `+npipe`
to the URL scheme, for example, `http+unix:///var/socket/`.
[float]
==== `resource.timeout`
Expand Down
51 changes: 46 additions & 5 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
"github.com/elastic/elastic-agent-libs/transport"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
"github.com/elastic/elastic-agent-libs/useragent"
"github.com/elastic/go-concert/ctxtool"
Expand Down Expand Up @@ -640,10 +641,7 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client,
if !wantClient(cfg) {
return nil, nil
}
c, err := cfg.Resource.Transport.Client(
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
)
c, err := cfg.Resource.Transport.Client(clientOptions(cfg.Resource.URL.URL)...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -684,14 +682,57 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client,
}

func wantClient(cfg config) bool {
switch cfg.Resource.URL.Scheme {
switch scheme, _, _ := strings.Cut(cfg.Resource.URL.Scheme, "+"); scheme {
case "http", "https":
return true
default:
return false
}
}

// clientOption returns constructed client configuration options, including
// setting up http+unix and http+npipe transports if requested.
func clientOptions(u *url.URL) []httpcommon.TransportOption {
scheme, trans, ok := strings.Cut(u.Scheme, "+")
var dialer transport.Dialer
switch {
default:
fallthrough
case !ok:
return []httpcommon.TransportOption{
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
}

// We set the host for the unix socket and Windows named
// pipes schemes because the http.Transport expects to
// have a host and will error out if it is not present.
// The values here are just non-zero with a helpful name.
// They are not used in any logic.
case trans == "unix":
u.Host = "unix-socket"
dialer = socketDialer{u.Path}
case trans == "npipe":
u.Host = "windows-npipe"
dialer = npipeDialer{u.Path}
}
u.Scheme = scheme
return []httpcommon.TransportOption{
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
httpcommon.WithBaseDialer(dialer),
}
}

// socketDialer implements transport.Dialer to a constant socket path.
type socketDialer struct {
path string
}

func (d socketDialer) Dial(_, _ string) (net.Conn, error) {
return net.Dial("unix", d.path)
}

func checkRedirect(cfg *ResourceConfig, log *logp.Logger) func(*http.Request, []*http.Request) error {
return func(req *http.Request, via []*http.Request) error {
log.Debug("http client: checking redirect")
Expand Down
21 changes: 21 additions & 0 deletions x-pack/filebeat/input/cel/transport_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !windows

package cel

import (
"errors"
"net"
)

// npipeDialer implements transport.Dialer.
type npipeDialer struct {
path string
}

func (npipeDialer) Dial(_, _ string) (net.Conn, error) {
return nil, errors.New("named pipe only available on windows")
}
23 changes: 23 additions & 0 deletions x-pack/filebeat/input/cel/transport_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build windows

package cel

import (
"net"
"path/filepath"

"github.com/Microsoft/go-winio"
)

// npipeDialer implements transport.Dialer to a constant named pipe path.
type npipeDialer struct {
path string
}

func (d npipeDialer) Dial(_, _ string) (net.Conn, error) {
return winio.DialPipe(`\\.\pipe`+filepath.FromSlash(d.path), nil)
}

0 comments on commit cd547c6

Please sign in to comment.