Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Signed-off-by: Chun Lin Yang <[email protected]>
  • Loading branch information
clyang82 committed Nov 5, 2018
1 parent eab423c commit 1786537
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 3 deletions.
95 changes: 92 additions & 3 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@ package config

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
"time"

"github.com/pkg/errors"
"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
"gopkg.in/olivere/elastic.v5"
elastic "gopkg.in/olivere/elastic.v5"

"github.com/jaegertracing/jaeger/pkg/es"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
Expand All @@ -37,6 +43,7 @@ type Configuration struct {
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
Timeout time.Duration `validate:"min=500"`
BulkSize int
BulkWorkers int
BulkActions int
Expand All @@ -45,6 +52,15 @@ type Configuration struct {
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
TLS TLS
}

// TLS Config
type TLS struct {
Enabled bool
CertPath string
KeyPath string
CaPath string
}

// ClientBuilder creates new es.Client
Expand All @@ -64,7 +80,7 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac
if len(c.Servers) < 1 {
return nil, errors.New("No servers specified")
}
rawClient, err := elastic.NewClient(c.GetConfigs()...)
rawClient, err := elastic.NewClient(c.GetConfigs(logger)...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -188,10 +204,83 @@ func (c *Configuration) GetTagDotReplacement() string {
}

// GetConfigs wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) GetConfigs() []elastic.ClientOptionFunc {
func (c *Configuration) GetConfigs(logger *zap.Logger) []elastic.ClientOptionFunc {

if c.TLS.Enabled {
tlsConfig, err := c.CreateTLSConfig()
if err != nil {
return nil
}
httpClient := &http.Client{
Timeout: c.Timeout,
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}

resp, err := httpClient.Get("https://elasticsearch:9200")
if err != nil {
fmt.Println(err)
}

htmlData, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
}
defer resp.Body.Close()
fmt.Printf("%v\n", resp.Status)
fmt.Printf(string(htmlData))

options := make([]elastic.ClientOptionFunc, 4)
options[0] = elastic.SetHttpClient(httpClient)
options[1] = elastic.SetURL(c.Servers...)
options[2] = elastic.SetSniff(c.Sniffer)
options[3] = elastic.SetScheme("https")
logger.Info("tlsConfig", zap.Any("tlsConfig", tlsConfig))
return options
}
options := make([]elastic.ClientOptionFunc, 3)
options[0] = elastic.SetURL(c.Servers...)
options[1] = elastic.SetBasicAuth(c.Username, c.Password)
options[2] = elastic.SetSniff(c.Sniffer)
return options
}

// CreateTLSConfig creates TLS Configuration to connect with ES Cluster.
func (c *Configuration) CreateTLSConfig() (*tls.Config, error) {
rootCerts, err := c.LoadCertificatesFrom()
if err != nil {
log.Fatalf("Couldn't load root certificate from %s. Got %s.", c.TLS.CaPath, err)
}
if len(c.TLS.CertPath) > 0 && len(c.TLS.KeyPath) > 0 {
clientPrivateKey, err := c.LoadPrivateKeyFrom()
if err != nil {
log.Fatalf("Couldn't setup client authentication. Got %s.", err)
}
return &tls.Config{
RootCAs: rootCerts,
Certificates: []tls.Certificate{*clientPrivateKey},
}, err
}
return nil, err
}

// LoadCertificatesFrom is used to load root certification
func (c *Configuration) LoadCertificatesFrom() (*x509.CertPool, error) {
caCert, err := ioutil.ReadFile(c.TLS.CaPath)
if err != nil {
return nil, err
}
certificates := x509.NewCertPool()
certificates.AppendCertsFromPEM(caCert)
return certificates, nil
}

// LoadPrivateKeyFrom is used to load the private certificate and key for TLS
func (c *Configuration) LoadPrivateKeyFrom() (*tls.Certificate, error) {
privateKey, err := tls.LoadX509KeyPair(c.TLS.CertPath, c.TLS.KeyPath)
if err != nil {
return nil, err
}
return &privateKey, nil
}
30 changes: 30 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ const (
suffixBulkWorkers = ".bulk.workers"
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixTimeout = ".timeout"
suffixTLS = ".tls"
suffixCert = ".tls.cert"
suffixKey = ".tls.key"
suffixCA = ".tls.ca"
suffixIndexPrefix = ".index-prefix"
suffixTagsAsFields = ".tags-as-fields"
suffixTagsAsFieldsAll = suffixTagsAsFields + ".all"
Expand Down Expand Up @@ -119,6 +124,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixServerURLs,
nsConfig.servers,
"The comma-separated list of ElasticSearch servers, must be full url i.e. http://localhost:9200")
flagSet.Duration(
nsConfig.namespace+suffixTimeout,
nsConfig.Timeout,
"Timeout used for queries")
flagSet.Duration(
nsConfig.namespace+suffixMaxSpanAge,
nsConfig.MaxSpanAge,
Expand Down Expand Up @@ -147,6 +156,22 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixBulkFlushInterval,
nsConfig.BulkFlushInterval,
"A time.Duration after which bulk requests are committed, regardless of other tresholds. Set to zero to disable. By default, this is disabled.")
flagSet.Bool(
nsConfig.namespace+suffixTLS,
nsConfig.TLS.Enabled,
"Enable TLS")
flagSet.String(
nsConfig.namespace+suffixCert,
nsConfig.TLS.CertPath,
"Path to TLS certificate file")
flagSet.String(
nsConfig.namespace+suffixKey,
nsConfig.TLS.KeyPath,
"Path to TLS key file")
flagSet.String(
nsConfig.namespace+suffixCA,
nsConfig.TLS.CaPath,
"Path to TLS CA file")
flagSet.String(
nsConfig.namespace+suffixIndexPrefix,
nsConfig.IndexPrefix,
Expand Down Expand Up @@ -185,6 +210,11 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.BulkWorkers = v.GetInt(cfg.namespace + suffixBulkWorkers)
cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions)
cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval)
cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout)
cfg.TLS.Enabled = v.GetBool(cfg.namespace + suffixTLS)
cfg.TLS.CertPath = v.GetString(cfg.namespace + suffixCert)
cfg.TLS.KeyPath = v.GetString(cfg.namespace + suffixKey)
cfg.TLS.CaPath = v.GetString(cfg.namespace + suffixCA)
cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix)
cfg.AllTagsAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll)
cfg.TagsFilePath = v.GetString(cfg.namespace + suffixTagsFile)
Expand Down

0 comments on commit 1786537

Please sign in to comment.