Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TLS support for HTTP Endpoint of Collector server #2798

Merged
merged 9 commits into from
Feb 8, 2021
20 changes: 15 additions & 5 deletions cmd/collector/app/builder_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,18 @@ const (
collectorZipkinAllowedHeaders = "collector.zipkin.allowed-headers"
)

var tlsFlagsConfig = tlscfg.ServerFlagsConfig{
var tlsGRPCFlagsConfig = tlscfg.ServerFlagsConfig{
Prefix: "collector.grpc",
ShowEnabled: true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should remove this option, it is never false

$ grx 'ShowEnabled: ' .
./cmd/collector/app/builder_flags.go:45:	ShowEnabled:  true,
./cmd/agent/app/reporter/grpc/flags.go:36:	ShowEnabled:    true,
./cmd/query/app/flags.go:52:	ShowEnabled:  true,
./cmd/query/app/flags.go:58:	ShowEnabled:  true,
./plugin/storage/cassandra/options.go:250:		ShowEnabled:    true,
./plugin/storage/es/options.go:139:		ShowEnabled:    true,
./pkg/config/tlscfg/flags_test.go:51:				ShowEnabled:    true,
./pkg/config/tlscfg/flags_test.go:98:				ShowEnabled:  true,
./pkg/kafka/auth/config.go:94:		ShowEnabled:    true,
./pkg/kafka/auth/options.go:113:		ShowEnabled:    true,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want me to do this as a part of the same PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

up to you

ShowClientCA: true,
}

var tlsHTTPFlagsConfig = tlscfg.ServerFlagsConfig{
Prefix: "collector.http",
ShowEnabled: true,
ShowClientCA: true,
}

// CollectorOptions holds configuration for collector
type CollectorOptions struct {
// DynQueueSizeMemory determines how much memory to use for the queue
Expand All @@ -58,8 +64,10 @@ type CollectorOptions struct {
CollectorHTTPHostPort string
// CollectorGRPCHostPort is the host:port address that the collector service listens in on for gRPC requests
CollectorGRPCHostPort string
// TLS configures secure transport
TLS tlscfg.Options
// TLSGRPC configures secure transport for gRPC endpoint to collect spans
TLSGRPC tlscfg.Options
// TLSHTTP configures secure transport for HTTP endpoint to collect spans
TLSHTTP tlscfg.Options
// CollectorTags is the string representing collector tags to append to each and every span
CollectorTags map[string]string
// CollectorZipkinHTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests
Expand All @@ -86,7 +94,8 @@ func AddFlags(flags *flag.FlagSet) {
func AddOTELJaegerFlags(flags *flag.FlagSet) {
flags.String(CollectorHTTPHostPort, ports.PortToHostPort(ports.CollectorHTTP), "The host:port (e.g. 127.0.0.1:14268 or :14268) of the collector's HTTP server")
flags.String(CollectorGRPCHostPort, ports.PortToHostPort(ports.CollectorGRPC), "The host:port (e.g. 127.0.0.1:14250 or :14250) of the collector's GRPC server")
tlsFlagsConfig.AddFlags(flags)
tlsGRPCFlagsConfig.AddFlags(flags)
tlsHTTPFlagsConfig.AddFlags(flags)
}

// AddOTELZipkinFlags adds flag that are exposed by OTEL Zipkin receiver
Expand All @@ -105,7 +114,8 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) *CollectorOptions {
cOpts.CollectorTags = flags.ParseJaegerTags(v.GetString(collectorTags))
cOpts.CollectorZipkinAllowedOrigins = v.GetString(collectorZipkinAllowedOrigins)
cOpts.CollectorZipkinAllowedHeaders = v.GetString(collectorZipkinAllowedHeaders)
cOpts.TLS = tlsFlagsConfig.InitFromViper(v)
cOpts.TLSGRPC = tlsGRPCFlagsConfig.InitFromViper(v)
cOpts.TLSHTTP = tlsHTTPFlagsConfig.InitFromViper(v)

return cOpts
}
23 changes: 15 additions & 8 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ type Collector struct {
spanHandlers *SpanHandlers

// state, read only
hServer *http.Server
zkServer *http.Server
grpcServer *grpc.Server
tlsCloser io.Closer
hServer *http.Server
zkServer *http.Server
grpcServer *grpc.Server
tlsGRPCCertWatcherCloser io.Closer
tlsHTTPCertWatcherCloser io.Closer
}

// CollectorParams to construct a new Jaeger Collector.
Expand Down Expand Up @@ -88,7 +89,7 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
HostPort: builderOpts.CollectorGRPCHostPort,
Handler: c.spanHandlers.GRPCHandler,
TLSConfig: builderOpts.TLS,
TLSConfig: builderOpts.TLSGRPC,
SamplingStore: c.strategyStore,
Logger: c.logger,
})
Expand All @@ -100,6 +101,7 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{
HostPort: builderOpts.CollectorHTTPHostPort,
Handler: c.spanHandlers.JaegerBatchesHandler,
TLSConfig: builderOpts.TLSHTTP,
HealthCheck: c.hCheck,
MetricsFactory: c.metricsFactory,
SamplingStore: c.strategyStore,
Expand All @@ -110,7 +112,8 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
}
c.hServer = httpServer

c.tlsCloser = &builderOpts.TLS
c.tlsGRPCCertWatcherCloser = &builderOpts.TLSGRPC
c.tlsHTTPCertWatcherCloser = &builderOpts.TLSHTTP
zkServer, err := server.StartZipkinServer(&server.ZipkinServerParams{
HostPort: builderOpts.CollectorZipkinHTTPHostPort,
Handler: c.spanHandlers.ZipkinSpansHandler,
Expand Down Expand Up @@ -165,8 +168,12 @@ func (c *Collector) Close() error {
c.logger.Error("failed to close span processor.", zap.Error(err))
}

if err := c.tlsCloser.Close(); err != nil {
c.logger.Error("failed to close TLS certificate watcher", zap.Error(err))
if err := c.tlsGRPCCertWatcherCloser.Close(); err != nil {
c.logger.Error("failed to close gRPC TLS certificate watcher", zap.Error(err))
}

if err := c.tlsHTTPCertWatcherCloser.Close(); err != nil {
c.logger.Error("failed to close HTTP TLS certificate watcher", zap.Error(err))
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
Expand Down
20 changes: 18 additions & 2 deletions cmd/collector/app/server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import (
"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
clientcfgHandler "github.com/jaegertracing/jaeger/pkg/clientcfg/clientcfghttp"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/httpmetrics"
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
)

// HTTPServerParams to construct a new Jaeger Collector HTTP Server
type HTTPServerParams struct {
TLSConfig tlscfg.Options
HostPort string
Handler handler.JaegerBatchesHandler
SamplingStore strategystore.StrategyStore
Expand All @@ -44,12 +46,20 @@ type HTTPServerParams struct {
func StartHTTPServer(params *HTTPServerParams) (*http.Server, error) {
params.Logger.Info("Starting jaeger-collector HTTP server", zap.String("http host-port", params.HostPort))

server := &http.Server{Addr: params.HostPort}
if params.TLSConfig.Enabled {
tlsCfg, err := params.TLSConfig.Config(params.Logger) // This checks if the certificates are correctly provided
if err != nil {
return nil, err
}
server.TLSConfig = tlsCfg
}

listener, err := net.Listen("tcp", params.HostPort)
if err != nil {
return nil, err
}

server := &http.Server{Addr: params.HostPort}
serveHTTP(server, listener, params)

return server, nil
Expand All @@ -74,7 +84,13 @@ func serveHTTP(server *http.Server, listener net.Listener, params *HTTPServerPar
recoveryHandler := recoveryhandler.NewRecoveryHandler(params.Logger, true)
server.Handler = httpmetrics.Wrap(recoveryHandler(r), params.MetricsFactory)
go func() {
if err := server.Serve(listener); err != nil {
var err error
if params.TLSConfig.Enabled {
err = server.ServeTLS(listener, "", "")
} else {
err = server.Serve(listener)
}
if err != nil {
if err != http.ErrServerClosed {
params.Logger.Error("Could not start HTTP collector", zap.Error(err))
}
Expand Down
200 changes: 200 additions & 0 deletions cmd/collector/app/server/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,27 @@
package server

import (
"crypto/tls"
"fmt"
"net"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/ports"
)

var testCertKeyLocation = "../../../../pkg/config/tlscfg/testdata"

// test wrong port number
func TestFailToListenHTTP(t *testing.T) {
logger, _ := zap.NewDevelopment()
Expand All @@ -39,6 +47,25 @@ func TestFailToListenHTTP(t *testing.T) {
assert.EqualError(t, err, "listen tcp: address -1: invalid port")
}

func TestCreateTLSHTTPServerError(t *testing.T) {
logger, _ := zap.NewDevelopment()
tlsCfg := tlscfg.Options{
Enabled: true,
CertPath: "invalid/path",
KeyPath: "invalid/path",
ClientCAPath: "invalid/path",
}

params := &HTTPServerParams{
HostPort: fmt.Sprintf(":%d", ports.CollectorHTTP),
HealthCheck: healthcheck.New(),
Logger: logger,
TLSConfig: tlsCfg,
}
_, err := StartHTTPServer(params)
assert.NotNil(t, err)
}

func TestSpanCollectorHTTP(t *testing.T) {
logger, _ := zap.NewDevelopment()
params := &HTTPServerParams{
Expand All @@ -58,3 +85,176 @@ func TestSpanCollectorHTTP(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, response)
}

func TestSpanCollectorHTTPS(t *testing.T) {

testCases := []struct {
name string
TLS tlscfg.Options
clientTLS tlscfg.Options
expectError bool
expectClientError bool
expectServerFail bool
}{
{
name: "should fail with TLS client to untrusted TLS server",
TLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
},
clientTLS: tlscfg.Options{
Enabled: true,
ServerName: "example.com",
},
expectError: true,
expectClientError: true,
expectServerFail: false,
},
{
name: "should fail with TLS client to trusted TLS server with incorrect hostname",
TLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
},
clientTLS: tlscfg.Options{
Enabled: true,
CAPath: testCertKeyLocation + "/example-CA-cert.pem",
ServerName: "nonEmpty",
},
expectError: true,
expectClientError: true,
expectServerFail: false,
},
{
name: "should pass with TLS client to trusted TLS server with correct hostname",
TLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
},
clientTLS: tlscfg.Options{
Enabled: true,
CAPath: testCertKeyLocation + "/example-CA-cert.pem",
ServerName: "example.com",
},
expectError: false,
expectClientError: false,
expectServerFail: false,
},
{
name: "should fail with TLS client without cert to trusted TLS server requiring cert",
TLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem",
},
clientTLS: tlscfg.Options{
Enabled: true,
CAPath: testCertKeyLocation + "/example-CA-cert.pem",
ServerName: "example.com",
},
expectError: false,
expectServerFail: false,
expectClientError: true,
},
{
name: "should pass with TLS client with cert to trusted TLS server requiring cert",
TLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
KeyPath: testCertKeyLocation + "/example-server-key.pem",
ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem",
},
clientTLS: tlscfg.Options{
Enabled: true,
CAPath: testCertKeyLocation + "/example-CA-cert.pem",
ServerName: "example.com",
CertPath: testCertKeyLocation + "/example-client-cert.pem",
KeyPath: testCertKeyLocation + "/example-client-key.pem",
},
expectError: false,
expectServerFail: false,
expectClientError: false,
},
{
name: "should fail with TLS client without cert to trusted TLS server requiring cert from a different CA",
TLS: tlscfg.Options{
Enabled: true,
CertPath: testCertKeyLocation + "/example-server-cert.pem",
KeyPath: testCertKeyLocation + "/example-server-key.pem",
ClientCAPath: testCertKeyLocation + "/wrong-CA-cert.pem", // NB: wrong CA
},
clientTLS: tlscfg.Options{
Enabled: true,
CAPath: testCertKeyLocation + "/example-CA-cert.pem",
ServerName: "example.com",
CertPath: testCertKeyLocation + "/example-client-cert.pem",
KeyPath: testCertKeyLocation + "/example-client-key.pem",
},
expectError: false,
expectServerFail: false,
expectClientError: true,
},
}

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
logger, _ := zap.NewDevelopment()
params := &HTTPServerParams{
HostPort: fmt.Sprintf(":%d", ports.CollectorHTTP),
Handler: handler.NewJaegerSpanHandler(logger, &mockSpanProcessor{}),
SamplingStore: &mockSamplingStore{},
MetricsFactory: metricstest.NewFactory(time.Hour),
HealthCheck: healthcheck.New(),
Logger: logger,
TLSConfig: test.TLS,
}

server, err := StartHTTPServer(params)

if test.expectServerFail {
require.Error(t, err)
}
defer server.Close()

require.NoError(t, err)
clientTLSCfg, err0 := test.clientTLS.Config(zap.NewNop())
require.NoError(t, err0)
dialer := &net.Dialer{Timeout: 2 * time.Second}
conn, clientError := tls.DialWithDialer(dialer, "tcp", "localhost:"+fmt.Sprintf("%d", ports.CollectorHTTP), clientTLSCfg)
var clientClose func() error
clientClose = nil
if conn != nil {
clientClose = conn.Close
}

if test.expectError {
require.Error(t, clientError)
} else {
require.NoError(t, clientError)
}

if clientClose != nil {
require.Nil(t, clientClose())
}

client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: clientTLSCfg,
},
}

response, requestError := client.Post("https://localhost:"+fmt.Sprintf("%d", ports.CollectorHTTP), "", nil)

if test.expectClientError {
require.Error(t, requestError)
} else {
require.NoError(t, requestError)
require.NotNil(t, response)
}
})
}
}
Loading