Skip to content

Commit

Permalink
Add TLS support for Metricbeat http server (#11611)
Browse files Browse the repository at this point in the history
Add TLS/SSL support to the Metricbeat http/server metricset. This allows users to host an HTTPS server if they configure a certificate and key.

Closes #11457
  • Loading branch information
leopucci authored and andrewkroh committed Apr 4, 2019
1 parent 2ec788f commit 0fb063f
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add AWS s3_request metricset. {pull}10949[10949] {issue}10055[10055]
- Add s3_daily_storage metricset. {pull}10940[10940] {issue}10055[10055]
- Add `coredns` metricbeat module. {pull}10585[10585]
- Add SSL support for Metricbeat HTTP server. {pull}11482[11482] {issue}11457[11457]
- The `elasticsearch.index` metricset (with `xpack.enabled: true`) now collects `refresh.external_total_time_in_millis` fields from Elasticsearch. {pull}11616[11616]

*Packetbeat*
Expand Down
7 changes: 5 additions & 2 deletions metricbeat/helper/server/http/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package http

import "github.com/elastic/beats/libbeat/common/transport/tlscommon"

type HttpConfig struct {
Host string `config:"host"`
Port int `config:"port"`
Host string `config:"host"`
Port int `config:"port"`
TLS *tlscommon.ServerConfig `config:"ssl"`
}

func defaultHttpConfig() HttpConfig {
Expand Down
39 changes: 31 additions & 8 deletions metricbeat/helper/server/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package http

import (
"context"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper/server"
"github.com/elastic/beats/metricbeat/mb"
Expand Down Expand Up @@ -57,6 +59,11 @@ func NewHttpServer(mb mb.BaseMetricSet) (server.Server, error) {
return nil, err
}

tlsConfig, err := tlscommon.LoadTLSServerConfig(config.TLS)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
h := &HttpServer{
done: make(chan struct{}),
Expand All @@ -66,21 +73,32 @@ func NewHttpServer(mb mb.BaseMetricSet) (server.Server, error) {
}

httpServer := &http.Server{
Addr: fmt.Sprintf("%s:%d", config.Host, config.Port),
Addr: net.JoinHostPort(config.Host, strconv.Itoa(int(config.Port))),
Handler: http.HandlerFunc(h.handleFunc),
}
if tlsConfig != nil {
httpServer.TLSConfig = tlsConfig.BuildModuleConfig(config.Host)
}
h.server = httpServer

return h, nil
}

func (h *HttpServer) Start() error {
go func() {

logp.Info("Starting http server on %s", h.server.Addr)
err := h.server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
logp.Critical("Unable to start HTTP server due to error: %v", err)
if h.server.TLSConfig != nil {
logp.Info("Starting HTTPS server on %s", h.server.Addr)
//certificate is already loaded. That's why the parameters are empty
err := h.server.ListenAndServeTLS("", "")
if err != nil && err != http.ErrServerClosed {
logp.Critical("Unable to start HTTPS server due to error: %v", err)
}
} else {
logp.Info("Starting HTTP server on %s", h.server.Addr)
err := h.server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
logp.Critical("Unable to start HTTP server due to error: %v", err)
}
}
}()

Expand Down Expand Up @@ -130,6 +148,11 @@ func (h *HttpServer) handleFunc(writer http.ResponseWriter, req *http.Request) {

case "GET":
writer.WriteHeader(http.StatusOK)
writer.Write([]byte("HTTP Server accepts data via POST"))
if req.TLS != nil {
writer.Write([]byte("HTTPS Server accepts data via POST"))
} else {
writer.Write([]byte("HTTP Server accepts data via POST"))
}

}
}
210 changes: 181 additions & 29 deletions metricbeat/helper/server/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,70 +22,222 @@ package http
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
"strings"
"testing"
"time"

"github.com/elastic/beats/metricbeat/helper/server"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/metricbeat/helper/server"
)

func GetHttpServer(host string, port int) (server.Server, error) {
func TestHTTPServers(t *testing.T) {

var cases = []struct {
testName, inputMessage, connectionType, connectionMethod, expectedOutput string
expectedHTTPCode int
}{
{"HTTP GET", `"@timestamp":"2016-05-23T08:05:34.853Z"`, "HTTP", "GET", "HTTP Server accepts data via POST", 200},
{"HTTPS GET", `"@timestamp":"2016-05-23T08:05:34.853Z"`, "HTTPS", "GET", "HTTPS Server accepts data via POST", 200},
{"HTTP POST", `"@timestamp":"2016-05-23T08:05:34.853Z"`, "HTTP", "POST", `"@timestamp":"2016-05-23T08:05:34.853Z"`, 202},
{"HTTPS POST", `"@timestamp":"2016-05-23T08:05:34.853Z"`, "HTTPS", "POST", `"@timestamp":"2016-05-23T08:05:34.853Z"`, 202},
}

for _, test := range cases {
t.Run(test.testName, func(t *testing.T) {
host := "127.0.0.1"
port := 40050
svc, err := getHTTPServer(t, host, port, test.connectionType)
if err != nil {
t.Error(err)
t.FailNow()
}

svc.Start()
defer svc.Stop()
// make sure server is up before writing data into it.
err = checkServerReady(host, port)
if err != nil {
t.Error(err)
t.FailNow()
}
httpCode, response := writeToServer(t, test.inputMessage, host, port, test.connectionMethod, test.connectionType)

assert.True(t, httpCode == test.expectedHTTPCode)

if test.connectionMethod == "POST" {
msg := <-svc.GetEvents()

assert.True(t, msg.GetEvent() != nil)
ok, _ := msg.GetEvent().HasKey("data")
assert.True(t, ok)
bytes, _ := msg.GetEvent()["data"].([]byte)
httpOutput := string(bytes)
assert.True(t, httpOutput == test.expectedOutput)
} else {
assert.True(t, response == test.expectedOutput)
}

})
}
}

func checkServerReady(host string, port int) error {

const (
checkServerReadyTimeout = 5 * time.Second
checkServerReadyTick = 100 * time.Millisecond
)
var conn net.Conn
var err error

ctx, cancel := context.WithTimeout(context.TODO(), checkServerReadyTimeout)
defer cancel()
ticker := time.NewTicker(checkServerReadyTick)
defer ticker.Stop()
for {
select {
case <-ticker.C:
conn, err = net.Dial("tcp", net.JoinHostPort(host, strconv.Itoa(int(port))))
if conn != nil {
_ = conn.Close()
return nil
}
if err != nil {
return err
}

case <-ctx.Done():
return fmt.Errorf("HTTP server at %s:%d never responded: %+v", host, port, err)
}
}

}

func getHTTPServer(t *testing.T, host string, port int, connectionType string) (server.Server, error) {
ctx, cancel := context.WithCancel(context.Background())
h := &HttpServer{
done: make(chan struct{}),
eventQueue: make(chan server.Event, 1),
ctx: ctx,
stop: cancel,
}

httpServer := &http.Server{
Addr: fmt.Sprintf("%s:%d", host, port),
Addr: net.JoinHostPort(host, strconv.Itoa(int(port))),
Handler: http.HandlerFunc(h.handleFunc),
}
if connectionType == "HTTPS" {
cfg := prepareTLSConfig(t, host)
httpServer.TLSConfig = cfg
}
h.server = httpServer

return h, nil
}

func TestHttpServer(t *testing.T) {
host := "127.0.0.1"
port := 40050
svc, err := GetHttpServer(host, port)
func prepareTLSConfig(t *testing.T, host string) *tls.Config {
certPem := []byte(`-----BEGIN CERTIFICATE-----
MIIDwTCCAqmgAwIBAgIJAONBEV813hm6MA0GCSqGSIb3DQEBCwUAMHcxCzAJBgNV
BAYTAkJSMQswCQYDVQQIDAJTUDEPMA0GA1UEBwwGRlJBTkNBMRAwDgYDVQQKDAdF
TEFTVElDMQswCQYDVQQLDAJPVTERMA8GA1UEAwwIaG9tZS5jb20xGDAWBgkqhkiG
9w0BCQEWCWV1QGV1LmNvbTAeFw0xOTAzMjYxOTMxMjhaFw0yOTAzMjMxOTMxMjha
MHcxCzAJBgNVBAYTAkJSMQswCQYDVQQIDAJTUDEPMA0GA1UEBwwGRlJBTkNBMRAw
DgYDVQQKDAdFTEFTVElDMQswCQYDVQQLDAJPVTERMA8GA1UEAwwIaG9tZS5jb20x
GDAWBgkqhkiG9w0BCQEWCWV1QGV1LmNvbTCCASIwDQYJKoZIhvcNAQEBBQADggEP
ADCCAQoCggEBALOJ2dxpBsQtRvs2hSuUhDsf4w6G3swFqtIXLedPvz1rNuofm75G
dA9pqXiI3hDw2ZuIJZItXE3FfVXxoE/ugsFw6cVLKrnpQ8exIv8K0JNuR22faFcR
LmDx/YLw0wmOnM2maBSaetrM5F4CwoVqDmOwZHs9fbADqthAHrbCAzNTkqnx2B4/
RWaYPbRWlSQ7CrWQE9cNJ/WMdUjznd5H0IiV7k/cHKIbXi3+JNinCWHAACWWS3ig
DjjCZd9lHkDH6qSpNGsQU5y0eiFAiiBVPqDIdVfPRe4pC81z3Dp6Wqs0uHXHYHqB
o3YWkXngTLlMLZtIMF+pWlCJZkscgLjL/N8CAwEAAaNQME4wHQYDVR0OBBYEFBpI
Tu/9mmRqithdHZZMu5jRLHebMB8GA1UdIwQYMBaAFBpITu/9mmRqithdHZZMu5jR
LHebMAwGA1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAGTS+cvN/vGjbkDF
wZRG8xMeHPHzlCWKNEGwZXTTBADrjfnppW5I2f5cDZzg71+UzQSJmBmHKZd+adrW
2GA888CAT+birIE6EAwIyq7ZGe77ymRspugyb7AK46QOKApED3izxId36Tk5/a0P
QY3WOTC0Y4yvz++gbx/uviYDMoHuJl0nIEXqtT9OZ2V2GqCToJu300RV/MIRtk6s
0U1d9CRDkjNolGVbYo2VnDJbZ8LQtJHS5iDeiEztay5Cky4NvVZsbCxrgNrr3h/v
upHEJ28Q7QzMnRC7d/THI6fRW1mG6BuFT3WPW5K7EAfgQDlyyspTDrACrYTuWC+y
013uTlI=
-----END CERTIFICATE-----`)

keyPem := []byte(`-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAs4nZ3GkGxC1G+zaFK5SEOx/jDobezAWq0hct50+/PWs26h+b
vkZ0D2mpeIjeEPDZm4glki1cTcV9VfGgT+6CwXDpxUsquelDx7Ei/wrQk25HbZ9o
VxEuYPH9gvDTCY6czaZoFJp62szkXgLChWoOY7Bkez19sAOq2EAetsIDM1OSqfHY
Hj9FZpg9tFaVJDsKtZAT1w0n9Yx1SPOd3kfQiJXuT9wcohteLf4k2KcJYcAAJZZL
eKAOOMJl32UeQMfqpKk0axBTnLR6IUCKIFU+oMh1V89F7ikLzXPcOnpaqzS4dcdg
eoGjdhaReeBMuUwtm0gwX6laUIlmSxyAuMv83wIDAQABAoIBAD1kY/T0jPXELcN1
LzBpxpWZH8E16TWGspTIjE/Oeyx7XvnL+SulV8Z1cRfgZV8RnLeMZJyJmkiVwXgD
+bebbWbMP4PRYjjURPMh5T+k6RGg4hfgLIOpQlywIuoFg4R/GatQvcJd2Ki861Ii
S3XngCgihxmFO1dWybLMqjQAP6vq01sbctUXYddFd5STInzrceoXwkLjp3gTR1et
FG+Anmzbxp8e2ETXvwuf7eZhVwCJ2DxBt7tx1j5Csuj1LjaVTe5qR7B1oM7/vo0b
LlY9IixAAi62Rrv4YSvMAtMI6mQt+AM/4uBVqoG/ipgkuoQVuQ+M4lGdmEXwEEkz
Ol7SlMECgYEA11tV+ZekVsujBmasTU7TfWtcYtRHh+FSC040bVLiE6XZbuVJ4sSA
TvuUDs+3XM8blnkfVo826WY4+bKkj1PdCFsmG5pm+wnSTPFKWsCtsSyA3ts85t3O
IvcCxXA/1xL9O/UdWfrl2+IJ3yLDEjEU5QTYP34+KDBZM3u6tJzjWe8CgYEA1WwA
8d75h9UQyFXWEOiwJmR6yX7PGkpYE3J7m2p2giEbLm+9no5CEmE9T74k3m0eLZug
g/F1MA/evhXEYho6f+lS9Q0ZdtyU2EFrdvuLlUw6FJIWnaOLlVR/aC6BvAlxLDRb
RUGqDKDjl1Die0s8F1aDHGvNvGaZRN4Z23BRPBECgYBE8pMGA8yzlSKui/SiE5iW
UOcVJQ15rWPNBs62KZED5VdFr9cF6Q+DOfxe+ZWk+xHEDSdBWTylYPrgxpb05E6h
vDzpHXfW64AO7jl18LYrQSpJLzvCVkUG4LpcZ+GohAXbSlCJXFB3I1kxvTli+5/K
6tApE8vmpgQI/ZX6+Te4tQKBgBcQ3C1H5voaOf0c4czkCR2tIGQkk2eI/2nipp9O
a053G4PySbEYOOXZopG6wCtV6bwOJNP9xaeTH4S1v4rGwOnQIsofR1BEWMXilCXA
2/4fxesxOsaAxXY3Mqnk1NqovpWDdxXOGf3RaaeR81hV8kGndPYeZJbnE8uQoYTI
586xAoGBAI2SR17xbgfiQBZxgGqamslz4NqBkZUBs4DIAGMAXS21rW/2bbbRaSii
mGmkdaXx+l077AuO0peX2uBvJAx6PvAVW0qroeOLcCo6EuUGTNVhBej6L9hMwhIO
r0tZLlMt75zcnJBicMbIrrzIGVYMHjT+m1QTGbrGb/tcEIGtmXwO
-----END RSA PRIVATE KEY-----`)

cfg := &tls.Config{
ServerName: host,
MinVersion: tls.VersionTLS12,
MaxVersion: tls.VersionTLS12,
InsecureSkipVerify: true,
}
cfg.Certificates = make([]tls.Certificate, 1)
cert, err := tls.X509KeyPair(certPem, keyPem)
if err != nil {
t.Error(err)
t.FailNow()
}

svc.Start()
defer svc.Stop()
// make sure server is up before writing data into it.
time.Sleep(2 * time.Second)
writeToServer(t, "test1", host, port)
msg := <-svc.GetEvents()

assert.True(t, msg.GetEvent() != nil)
ok, _ := msg.GetEvent().HasKey("data")
assert.True(t, ok)
bytes, _ := msg.GetEvent()["data"].([]byte)
assert.True(t, string(bytes) == "test1")

cfg.Certificates = []tls.Certificate{cert}
return cfg
}

func writeToServer(t *testing.T, message, host string, port int) {
url := fmt.Sprintf("http://%s:%d/", host, port)
func writeToServer(t *testing.T, message, host string, port int, connectionMethod string, connectionType string) (int, string) {
url := fmt.Sprintf("%s://%s:%d/", strings.ToLower(connectionType), host, port)
var str = []byte(message)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(str))
req, err := http.NewRequest(connectionMethod, url, bytes.NewBuffer(str))
req.Header.Set("Content-Type", "text/plain")

client := &http.Client{}
if connectionType == "HTTPS" {
client.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true, // test server certificate is not trusted.
}}
}
resp, err := client.Do(req)
if err != nil {
t.Error(err)
t.FailNow()
}

defer resp.Body.Close()

if connectionMethod == "GET" {
if resp.StatusCode == http.StatusOK {
bodyBytes, err2 := ioutil.ReadAll(resp.Body)
if err2 != nil {
t.Error(err)
t.FailNow()
}
bodyString := string(bodyBytes)
return resp.StatusCode, bodyString
}
}
return resp.StatusCode, ""
}
3 changes: 3 additions & 0 deletions metricbeat/module/http/server/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ This is the server metricset of the module http.

Events sent to the http endpoint will be put by default under the `http.server` prefix. To change this use the `server.paths`
config options. In the example below every request to `/foo` will be put under `http.foo`.
Also consider using secure settings for the server using TLS/SSL as shown

["source","yaml",subs="attributes"]
------------------------------------------------------------------------------
- module: http
metricsets: ["server"]
host: "localhost"
ssl.certificate: "/etc/pki/server/cert.pem"
ssl.key: "/etc/pki/server/cert.key"
port: "8080"
server.paths:
- path: "/foo"
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/tests/system/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def test_server(self):
"host": host,
}])
proc = self.start_beat()
self.wait_until(lambda: self.log_contains("Starting http server on "))
self.wait_until(lambda: self.log_contains("Starting HTTP"))
requests.post("http://" + host + ":" + str(port),
json={'hello': 'world'}, headers={'Content-Type': 'application/json'})
self.wait_until(lambda: self.output_lines() > 0)
Expand Down

0 comments on commit 0fb063f

Please sign in to comment.