Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support reloading ES client's password from file #4342

Merged
merged 15 commits into from
Sep 9, 2023
16 changes: 14 additions & 2 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Configuration struct {
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
TokenFilePath string `mapstructure:"token_file"`
PasswordFilePath string `mapstructure:"password_file"`
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
Expand Down Expand Up @@ -310,6 +311,17 @@ func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOp
Timeout: c.Timeout,
}
options = append(options, elastic.SetHttpClient(httpClient))

if c.Password != "" && c.PasswordFilePath != "" {
return nil, fmt.Errorf("both Password and PasswordFilePath are set")
}
if c.PasswordFilePath != "" {
passwordFromFile, err := loadTokenFromFile(c.PasswordFilePath)
if err != nil {
return nil, fmt.Errorf("failed to load password from file: %w", err)
}
c.Password = passwordFromFile
}
options = append(options, elastic.SetBasicAuth(c.Username, c.Password))

if c.SendGetBodyAs != "" {
Expand Down Expand Up @@ -396,7 +408,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe
if c.AllowTokenFromContext {
logger.Warn("Token file and token propagation are both enabled, token from file won't be used")
}
tokenFromFile, err := loadToken(c.TokenFilePath)
tokenFromFile, err := loadTokenFromFile(c.TokenFilePath)
if err != nil {
return nil, err
}
Expand All @@ -412,7 +424,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe
return transport, nil
}

func loadToken(path string) (string, error) {
func loadTokenFromFile(path string) (string, error) {
b, err := os.ReadFile(filepath.Clean(path))
if err != nil {
return "", err
Expand Down
10 changes: 5 additions & 5 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (

// DependencyStore handles all queries and insertions to ElasticSearch dependencies
type DependencyStore struct {
client es.Client
client func() es.Client
logger *zap.Logger
dependencyIndexPrefix string
indexDateLayout string
Expand All @@ -48,7 +48,7 @@ type DependencyStore struct {

// DependencyStoreParams holds constructor parameters for NewDependencyStore
type DependencyStoreParams struct {
Client es.Client
Client func() es.Client
Logger *zap.Logger
IndexPrefix string
IndexDateLayout string
Expand Down Expand Up @@ -84,15 +84,15 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D

// CreateTemplates creates index templates.
func (s *DependencyStore) CreateTemplates(dependenciesTemplate string) error {
_, err := s.client.CreateTemplate("jaeger-dependencies").Body(dependenciesTemplate).Do(context.Background())
_, err := s.client().CreateTemplate("jaeger-dependencies").Body(dependenciesTemplate).Do(context.Background())
if err != nil {
return err
}
return nil
}

func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, dependencies []model.DependencyLink) {
s.client.Index().Index(indexName).Type(dependencyType).
s.client().Index().Index(indexName).Type(dependencyType).
BodyJson(&dbmodel.TimeDependencies{
Timestamp: ts,
Dependencies: dbmodel.FromDomainDependencies(dependencies),
Expand All @@ -102,7 +102,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe
// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
indices := s.getReadIndices(endTs, lookback)
searchResult, err := s.client.Search(indices...).
searchResult, err := s.client().Search(indices...).
Size(s.maxDocCount).
Query(buildTSQuery(endTs, lookback)).
IgnoreUnavailable(true).
Expand Down
5 changes: 3 additions & 2 deletions plugin/storage/es/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/mocks"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage/dependencystore"
Expand All @@ -51,7 +52,7 @@ func withDepStorage(indexPrefix, indexDateLayout string, maxDocCount int, fn fun
logger: logger,
logBuffer: logBuffer,
storage: NewDependencyStore(DependencyStoreParams{
Client: client,
Client: func() es.Client { return client },
Logger: logger,
IndexPrefix: indexPrefix,
IndexDateLayout: indexDateLayout,
Expand All @@ -78,7 +79,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) {
for _, testCase := range testCases {
client := &mocks.Client{}
r := NewDependencyStore(DependencyStoreParams{
Client: client,
Client: func() es.Client { return client },
Logger: zap.NewNop(),
IndexPrefix: testCase.prefix,
IndexDateLayout: "2006-01-02",
Expand Down
115 changes: 94 additions & 21 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@
package es

import (
"errors"
"flag"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync/atomic"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
Expand All @@ -27,6 +32,7 @@ import (

"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/pkg/fswatcher"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin"
esDepStore "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore"
Expand Down Expand Up @@ -57,9 +63,12 @@ type Factory struct {
newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)

primaryConfig *config.Configuration
primaryClient es.Client
archiveConfig *config.Configuration
archiveClient es.Client

primaryClient atomic.Pointer[es.Client]
archiveClient atomic.Pointer[es.Client]

watchers []*fswatcher.FSWatcher
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -87,62 +96,87 @@ func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) {
func (f *Factory) InitFromOptions(o Options) {
f.Options = &o
f.primaryConfig = f.Options.GetPrimary()
if cfg := f.Options.Get(archiveNamespace); cfg != nil {
f.archiveConfig = cfg
}
f.archiveConfig = f.Options.Get(archiveNamespace)
}

// Initialize implements storage.Factory
// Initialize implements storage.Factory.
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create primary Elasticsearch client: %w", err)
}
f.primaryClient = primaryClient
f.primaryClient.Store(&primaryClient)

if f.primaryConfig.PasswordFilePath != "" {
primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger)
if err != nil {
return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err)
}
f.watchers = append(f.watchers, primaryWatcher)
}

if f.archiveConfig.Enabled {
f.archiveClient, err = f.newClientFn(f.archiveConfig, logger, metricsFactory)
archiveClient, err := f.newClientFn(f.archiveConfig, logger, metricsFactory)
if err != nil {
return fmt.Errorf("failed to create archive Elasticsearch client: %w", err)
}
f.archiveClient.Store(&archiveClient)

if f.archiveConfig.PasswordFilePath != "" {
archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.PasswordFilePath}, f.onArchivePasswordChange, f.logger)
if err != nil {
return fmt.Errorf("failed to create watcher for archive ES client's password: %w", err)
}
f.watchers = append(f.watchers, archiveWatcher)
}
}

return nil
}

func (f *Factory) getPrimaryClient() es.Client {
return *(f.primaryClient.Load())
}

func (f *Factory) getArchiveClient() es.Client {
return *f.archiveClient.Load()
}

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return createSpanReader(f.primaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer)
return createSpanReader(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer)
}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.primaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return createDependencyReader(f.primaryClient, f.primaryConfig, f.logger)
return createDependencyReader(f.getPrimaryClient, f.primaryConfig, f.logger)
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanReader(f.archiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer)
return createSpanReader(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer)
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanWriter(f.archiveClient, f.archiveConfig, true, f.metricsFactory, f.logger)
return createSpanWriter(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger)
}

func createSpanReader(
client es.Client,
clientFn func() es.Client,
cfg *config.Configuration,
archive bool,
mFactory metrics.Factory,
Expand All @@ -153,7 +187,7 @@ func createSpanReader(
return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping")
}
return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{
Client: client,
Client: clientFn,
MaxDocCount: cfg.MaxDocCount,
MaxSpanAge: cfg.MaxSpanAge,
IndexPrefix: cfg.IndexPrefix,
Expand All @@ -172,7 +206,7 @@ func createSpanReader(
}

func createSpanWriter(
client es.Client,
clientFn func() es.Client,
cfg *config.Configuration,
archive bool,
mFactory metrics.Factory,
Expand Down Expand Up @@ -202,7 +236,7 @@ func createSpanWriter(
return nil, err
}
writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{
Client: client,
Client: clientFn,
IndexPrefix: cfg.IndexPrefix,
SpanIndexDateLayout: cfg.IndexDateLayoutSpans,
ServiceIndexDateLayout: cfg.IndexDateLayoutServices,
Expand All @@ -226,12 +260,12 @@ func createSpanWriter(
}

func createDependencyReader(
client es.Client,
clientFn func() es.Client,
cfg *config.Configuration,
logger *zap.Logger,
) (dependencystore.Reader, error) {
reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{
Client: client,
Client: clientFn,
Logger: logger,
IndexPrefix: cfg.IndexPrefix,
IndexDateLayout: cfg.IndexDateLayoutDependencies,
Expand All @@ -245,8 +279,47 @@ var _ io.Closer = (*Factory)(nil)

// Close closes the resources held by the factory
func (f *Factory) Close() error {
var errs []error
for _, w := range f.watchers {
errs = append(errs, w.Close())
}
if cfg := f.Options.Get(archiveNamespace); cfg != nil {
cfg.TLS.Close()
errs = append(errs, cfg.TLS.Close())
}
errs = append(errs, f.Options.GetPrimary().TLS.Close())
return errors.Join(errs...)
}

func (f *Factory) onPrimaryPasswordChange() {
f.onClientPasswordChange(f.primaryConfig, &f.primaryClient)
}

func (f *Factory) onArchivePasswordChange() {
f.onClientPasswordChange(f.archiveConfig, &f.archiveClient)
}

func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atomic.Pointer[es.Client]) {
newPassword, err := loadTokenFromFile(cfg.PasswordFilePath)
if err != nil {
f.logger.Error("failed to reload password for Elasticsearch client", zap.Error(err))
return
}
f.logger.Sugar().Infof("loaded new password of length %d from file", len(newPassword))
newCfg := *cfg // copy by value
newCfg.Password = newPassword
newCfg.PasswordFilePath = "" // avoid error that both are set
primaryClient, err := f.newClientFn(&newCfg, f.logger, f.metricsFactory)
if err != nil {
f.logger.Error("failed to recreate Elasticsearch client with new password", zap.Error(err))
} else {
client.Store(&primaryClient)
}
}

func loadTokenFromFile(path string) (string, error) {
b, err := os.ReadFile(filepath.Clean(path))
if err != nil {
return "", err
}
return f.Options.GetPrimary().TLS.Close()
return strings.TrimRight(string(b), "\r\n"), nil
}
Loading