From 9ff837a2364bc55d91c1d808d3833f3aa20583c8 Mon Sep 17 00:00:00 2001 From: haanhvu Date: Fri, 21 Apr 2023 23:04:23 +0700 Subject: [PATCH 01/14] Swap ES client when password changes Signed-off-by: haanhvu --- pkg/es/config/config.go | 17 ++++- plugin/storage/es/dependencystore/storage.go | 10 +-- .../es/dependencystore/storage_test.go | 5 +- plugin/storage/es/factory.go | 66 +++++++++++++++---- plugin/storage/es/factory_test.go | 1 + plugin/storage/es/options.go | 6 ++ plugin/storage/es/spanstore/reader.go | 8 +-- plugin/storage/es/spanstore/reader_test.go | 28 ++++---- .../storage/es/spanstore/service_operation.go | 10 +-- plugin/storage/es/spanstore/writer.go | 12 ++-- plugin/storage/es/spanstore/writer_test.go | 27 ++++---- .../storage/integration/elasticsearch_test.go | 7 +- 12 files changed, 133 insertions(+), 64 deletions(-) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 48460a681f2..fe37e7f40b6 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -49,6 +49,7 @@ type Configuration struct { Username string `mapstructure:"username"` Password string `mapstructure:"password" json:"-"` TokenFilePath string `mapstructure:"token_file"` + PasswordFilePath string `mapstructure:"password_file"` AllowTokenFromContext bool `mapstructure:"-"` Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"` @@ -296,6 +297,10 @@ func (c *Configuration) TagKeysAsFields() ([]string, error) { // getConfigOptions wraps the configs to feed to the ElasticSearch client init func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOptionFunc, error) { + if c.Password != "" && c.PasswordFilePath != "" { + return nil, fmt.Errorf("both Password and PasswordFilePath are set") + } + options := []elastic.ClientOptionFunc{ elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer), // Disable health check when token from context is allowed, this is because at this time @@ -310,6 +315,14 @@ func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOp Timeout: c.Timeout, } options = append(options, elastic.SetHttpClient(httpClient)) + + if c.PasswordFilePath != "" { + passwordFromFile, err := loadFileContent(c.PasswordFilePath) + if err != nil { + return nil, fmt.Errorf("failed to load password from file: %w", err) + } + c.Password = passwordFromFile + } options = append(options, elastic.SetBasicAuth(c.Username, c.Password)) if c.SendGetBodyAs != "" { @@ -396,7 +409,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe if c.AllowTokenFromContext { logger.Warn("Token file and token propagation are both enabled, token from file won't be used") } - tokenFromFile, err := loadToken(c.TokenFilePath) + tokenFromFile, err := loadFileContent(c.TokenFilePath) if err != nil { return nil, err } @@ -412,7 +425,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe return transport, nil } -func loadToken(path string) (string, error) { +func loadFileContent(path string) (string, error) { b, err := os.ReadFile(filepath.Clean(path)) if err != nil { return "", err diff --git a/plugin/storage/es/dependencystore/storage.go b/plugin/storage/es/dependencystore/storage.go index 8a4203ae4f9..521f2e7b799 100644 --- a/plugin/storage/es/dependencystore/storage.go +++ b/plugin/storage/es/dependencystore/storage.go @@ -38,7 +38,7 @@ const ( // DependencyStore handles all queries and insertions to ElasticSearch dependencies type DependencyStore struct { - client es.Client + client func() es.Client logger *zap.Logger dependencyIndexPrefix string indexDateLayout string @@ -48,7 +48,7 @@ type DependencyStore struct { // DependencyStoreParams holds constructor parameters for NewDependencyStore type DependencyStoreParams struct { - Client es.Client + Client func() es.Client Logger *zap.Logger IndexPrefix string IndexDateLayout string @@ -84,7 +84,7 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D // CreateTemplates creates index templates. func (s *DependencyStore) CreateTemplates(dependenciesTemplate string) error { - _, err := s.client.CreateTemplate("jaeger-dependencies").Body(dependenciesTemplate).Do(context.Background()) + _, err := s.client().CreateTemplate("jaeger-dependencies").Body(dependenciesTemplate).Do(context.Background()) if err != nil { return err } @@ -92,7 +92,7 @@ func (s *DependencyStore) CreateTemplates(dependenciesTemplate string) error { } func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, dependencies []model.DependencyLink) { - s.client.Index().Index(indexName).Type(dependencyType). + s.client().Index().Index(indexName).Type(dependencyType). BodyJson(&dbmodel.TimeDependencies{ Timestamp: ts, Dependencies: dbmodel.FromDomainDependencies(dependencies), @@ -102,7 +102,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe // GetDependencies returns all interservice dependencies func (s *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { indices := s.getReadIndices(endTs, lookback) - searchResult, err := s.client.Search(indices...). + searchResult, err := s.client().Search(indices...). Size(s.maxDocCount). Query(buildTSQuery(endTs, lookback)). IgnoreUnavailable(true). diff --git a/plugin/storage/es/dependencystore/storage_test.go b/plugin/storage/es/dependencystore/storage_test.go index 6b27f1cc5d5..86ad2177f44 100644 --- a/plugin/storage/es/dependencystore/storage_test.go +++ b/plugin/storage/es/dependencystore/storage_test.go @@ -29,6 +29,7 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/storage/dependencystore" @@ -51,7 +52,7 @@ func withDepStorage(indexPrefix, indexDateLayout string, maxDocCount int, fn fun logger: logger, logBuffer: logBuffer, storage: NewDependencyStore(DependencyStoreParams{ - Client: client, + Client: func() es.Client { return client }, Logger: logger, IndexPrefix: indexPrefix, IndexDateLayout: indexDateLayout, @@ -78,7 +79,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) { for _, testCase := range testCases { client := &mocks.Client{} r := NewDependencyStore(DependencyStoreParams{ - Client: client, + Client: func() es.Client { return client }, Logger: zap.NewNop(), IndexPrefix: testCase.prefix, IndexDateLayout: "2006-01-02", diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index d0f78c294f3..727ff14c9f1 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -16,15 +16,18 @@ package es import ( + "errors" "flag" "fmt" "io" + "sync/atomic" "github.com/spf13/viper" "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/pkg/es/config" + "github.com/jaegertracing/jaeger/pkg/fswatcher" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/plugin" esDepStore "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore" @@ -54,9 +57,11 @@ type Factory struct { newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) primaryConfig *config.Configuration - primaryClient es.Client + primaryClient atomic.Pointer[es.Client] archiveConfig *config.Configuration - archiveClient es.Client + archiveClient atomic.Pointer[es.Client] + + watchers []*fswatcher.FSWatcher } // NewFactory creates a new Factory. @@ -96,13 +101,29 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) if err != nil { return fmt.Errorf("failed to create primary Elasticsearch client: %w", err) } - f.primaryClient = primaryClient + f.primaryClient.Store(&primaryClient) + + f.watchers = make([]*fswatcher.FSWatcher, 2) + primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger) + if err != nil { + return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err) + } + f.watchers[0] = primaryWatcher + if f.archiveConfig.Enabled { - f.archiveClient, err = f.newClientFn(f.archiveConfig, logger, metricsFactory) + archiveClient, err := f.newClientFn(f.archiveConfig, logger, metricsFactory) if err != nil { return fmt.Errorf("failed to create archive Elasticsearch client: %w", err) } + f.archiveClient.Store(&archiveClient) + + archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.PasswordFilePath}, f.onArchivePasswordChange, f.logger) + if err != nil { + return fmt.Errorf("failed to create watcher for archive ES client's password: %w", err) + } + f.watchers[1] = archiveWatcher } + return nil } @@ -140,7 +161,7 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { func createSpanReader( mFactory metrics.Factory, logger *zap.Logger, - client es.Client, + client atomic.Pointer[es.Client], cfg *config.Configuration, archive bool, ) (spanstore.Reader, error) { @@ -148,7 +169,7 @@ func createSpanReader( return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping") } return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{ - Client: client, + Client: func() es.Client { return *client.Load() }, Logger: logger, MetricsFactory: mFactory, MaxDocCount: cfg.MaxDocCount, @@ -168,7 +189,7 @@ func createSpanReader( func createSpanWriter( mFactory metrics.Factory, logger *zap.Logger, - client es.Client, + client atomic.Pointer[es.Client], cfg *config.Configuration, archive bool, ) (spanstore.Writer, error) { @@ -196,7 +217,7 @@ func createSpanWriter( return nil, err } writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{ - Client: client, + Client: func() es.Client { return *client.Load() }, Logger: logger, MetricsFactory: mFactory, IndexPrefix: cfg.IndexPrefix, @@ -221,11 +242,11 @@ func createSpanWriter( func createDependencyReader( logger *zap.Logger, - client es.Client, + client atomic.Pointer[es.Client], cfg *config.Configuration, ) (dependencystore.Reader, error) { reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{ - Client: client, + Client: func() es.Client { return *client.Load() }, Logger: logger, IndexPrefix: cfg.IndexPrefix, IndexDateLayout: cfg.IndexDateLayoutDependencies, @@ -239,8 +260,29 @@ var _ io.Closer = (*Factory)(nil) // Close closes the resources held by the factory func (f *Factory) Close() error { + var errs []error + for _, w := range f.watchers { + errs = append(errs, w.Close()) + } if cfg := f.Options.Get(archiveNamespace); cfg != nil { - cfg.TLS.Close() + errs = append(errs, cfg.TLS.Close()) + } + errs = append(errs, f.Options.GetPrimary().TLS.Close()) + return errors.Join(errs...) +} + +func (f *Factory) onPrimaryPasswordChange() { + primaryClient, err := f.newClientFn(f.primaryConfig, f.logger, f.metricsFactory) + if err != nil { + f.logger.Error("failed to recreate primary Elasticsearch client from new password", zap.Error(err)) + } + f.primaryClient.Swap(&primaryClient) +} + +func (f *Factory) onArchivePasswordChange() { + archiveClient, err := f.newClientFn(f.archiveConfig, f.logger, f.metricsFactory) + if err != nil { + f.logger.Error("failed to recreate archive Elasticsearch client from new password", zap.Error(err)) } - return f.Options.GetPrimary().TLS.Close() + f.archiveClient.Swap(&archiveClient) } diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index fc0a2c0d0ec..9405497275f 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -217,6 +217,7 @@ func TestArchiveDisabled(t *testing.T) { func TestArchiveEnabled(t *testing.T) { f := NewFactory() + f.primaryConfig = &escfg.Configuration{} f.archiveConfig = &escfg.Configuration{Enabled: true} f.newClientFn = (&mockClientBuilder{}).NewClient err := f.Initialize(metrics.NullFactory, zap.NewNop()) diff --git a/plugin/storage/es/options.go b/plugin/storage/es/options.go index 5824b300dfc..91fcfc38a15 100644 --- a/plugin/storage/es/options.go +++ b/plugin/storage/es/options.go @@ -34,6 +34,7 @@ const ( suffixSniffer = ".sniffer" suffixSnifferTLSEnabled = ".sniffer-tls-enabled" suffixTokenPath = ".token-file" + suffixPasswordPath = ".password-file" suffixServerURLs = ".server-urls" suffixRemoteReadClusters = ".remote-read-clusters" suffixMaxSpanAge = ".max-span-age" @@ -162,6 +163,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) { nsConfig.namespace+suffixTokenPath, nsConfig.TokenFilePath, "Path to a file containing bearer token. This flag also loads CA if it is specified.") + flagSet.String( + nsConfig.namespace+suffixPasswordPath, + nsConfig.PasswordFilePath, + "Path to a file containing password. This file is watched for changes.") flagSet.Bool( nsConfig.namespace+suffixSniffer, nsConfig.Sniffer, @@ -302,6 +307,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) { cfg.Username = v.GetString(cfg.namespace + suffixUsername) cfg.Password = v.GetString(cfg.namespace + suffixPassword) cfg.TokenFilePath = v.GetString(cfg.namespace + suffixTokenPath) + cfg.PasswordFilePath = v.GetString(cfg.namespace + suffixPasswordPath) cfg.Sniffer = v.GetBool(cfg.namespace + suffixSniffer) cfg.SnifferTLSEnabled = v.GetBool(cfg.namespace + suffixSnifferTLSEnabled) cfg.Servers = strings.Split(stripWhiteSpace(v.GetString(cfg.namespace+suffixServerURLs)), ",") diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 8d41af16926..5cb7447a2ff 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -92,7 +92,7 @@ var ( // SpanReader can query for and load traces from ElasticSearch type SpanReader struct { - client es.Client + client func() es.Client logger *zap.Logger // The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day, // this will be rounded down to UTC 00:00 of that day. @@ -113,7 +113,7 @@ type SpanReader struct { // SpanReaderParams holds constructor params for NewSpanReader type SpanReaderParams struct { - Client es.Client + Client func() es.Client Logger *zap.Logger MaxSpanAge time.Duration MaxDocCount int @@ -401,7 +401,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st } // set traceIDs to empty traceIDs = nil - results, err := s.client.MultiSearch().Add(searchRequests...).Index(indices...).Do(ctx) + results, err := s.client().MultiSearch().Add(searchRequests...).Index(indices...).Do(ctx) if err != nil { logErrorToSpan(childSpan, err) return nil, err @@ -563,7 +563,7 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra boolQuery := s.buildFindTraceIDsQuery(traceQuery) jaegerIndices := s.timeRangeIndices(s.spanIndexPrefix, s.spanIndexDateLayout, traceQuery.StartTimeMin, traceQuery.StartTimeMax, s.spanIndexRolloverFrequency) - searchService := s.client.Search(jaegerIndices...). + searchService := s.client().Search(jaegerIndices...). Size(0). // set to 0 because we don't want actual documents. Aggregation(traceIDAggregation, aggregation). IgnoreUnavailable(true). diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index c5d174dec1b..16656c4e38e 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -33,6 +33,7 @@ import ( "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" @@ -97,7 +98,7 @@ func withSpanReader(fn func(r *spanReaderTest)) { logger: logger, logBuffer: logBuffer, reader: NewSpanReader(SpanReaderParams{ - Client: client, + Client: func() es.Client { return client }, Logger: zap.NewNop(), MaxSpanAge: 0, IndexPrefix: "", @@ -116,7 +117,7 @@ func withArchiveSpanReader(readAlias bool, fn func(r *spanReaderTest)) { logger: logger, logBuffer: logBuffer, reader: NewSpanReader(SpanReaderParams{ - Client: client, + Client: func() es.Client { return client }, Logger: zap.NewNop(), MaxSpanAge: 0, IndexPrefix: "", @@ -163,6 +164,7 @@ func TestNewSpanReader(t *testing.T) { func TestSpanReaderIndices(t *testing.T) { client := &mocks.Client{} + clientFn := func() es.Client { return client } logger, _ := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) date := time.Date(2019, 10, 10, 5, 0, 0, 0, time.UTC) @@ -177,56 +179,56 @@ func TestSpanReaderIndices(t *testing.T) { }{ { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, }, indices: []string{spanIndex + spanDataLayoutFormat, serviceIndex + serviceDataLayoutFormat}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", UseReadWriteAliases: true, }, indices: []string{spanIndex + "read", serviceIndex + "read"}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndex + serviceDataLayoutFormat}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", UseReadWriteAliases: true, }, indices: []string{"foo:-" + spanIndex + "read", "foo:-" + serviceIndex + "read"}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: true, }, indices: []string{spanIndex + archiveIndexSuffix, serviceIndex + archiveIndexSuffix}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: true, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveIndexSuffix}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveReadIndexSuffix}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, }, indices: []string{ @@ -240,7 +242,7 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ @@ -254,7 +256,7 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: false, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ @@ -268,7 +270,7 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: true, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ diff --git a/plugin/storage/es/spanstore/service_operation.go b/plugin/storage/es/spanstore/service_operation.go index 71ad0c2ee39..d2303196e6c 100644 --- a/plugin/storage/es/spanstore/service_operation.go +++ b/plugin/storage/es/spanstore/service_operation.go @@ -39,14 +39,14 @@ const ( // ServiceOperationStorage stores service to operation pairs. type ServiceOperationStorage struct { - client es.Client + client func() es.Client logger *zap.Logger serviceCache cache.Cache } // NewServiceOperationStorage returns a new ServiceOperationStorage. func NewServiceOperationStorage( - client es.Client, + client func() es.Client, logger *zap.Logger, cacheTTL time.Duration, ) *ServiceOperationStorage { @@ -72,7 +72,7 @@ func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *dbmodel.Span cacheKey := hashCode(service) if !keyInCache(cacheKey, s.serviceCache) { - s.client.Index().Index(indexName).Type(serviceType).Id(cacheKey).BodyJson(service).Add() + s.client().Index().Index(indexName).Type(serviceType).Id(cacheKey).BodyJson(service).Add() writeCache(cacheKey, s.serviceCache) } } @@ -80,7 +80,7 @@ func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *dbmodel.Span func (s *ServiceOperationStorage) getServices(context context.Context, indices []string, maxDocCount int) ([]string, error) { serviceAggregation := getServicesAggregation(maxDocCount) - searchService := s.client.Search(indices...). + searchService := s.client().Search(indices...). Size(0). // set to 0 because we don't want actual documents. IgnoreUnavailable(true). Aggregation(servicesAggregation, serviceAggregation) @@ -110,7 +110,7 @@ func (s *ServiceOperationStorage) getOperations(context context.Context, indices serviceQuery := elastic.NewTermQuery(serviceName, service) serviceFilter := getOperationsAggregation(maxDocCount) - searchService := s.client.Search(indices...). + searchService := s.client().Search(indices...). Size(0). Query(serviceQuery). IgnoreUnavailable(true). diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index 589e3464d86..c9c584a6615 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -45,7 +45,7 @@ type serviceWriter func(string, *dbmodel.Span) // SpanWriter is a wrapper around elastic.Client type SpanWriter struct { - client es.Client + client func() es.Client logger *zap.Logger writerMetrics spanWriterMetrics // TODO: build functions to wrap around each Do fn indexCache cache.Cache @@ -56,7 +56,7 @@ type SpanWriter struct { // SpanWriterParams holds constructor parameters for NewSpanWriter type SpanWriterParams struct { - Client es.Client + Client func() es.Client Logger *zap.Logger MetricsFactory metrics.Factory IndexPrefix string @@ -107,11 +107,11 @@ func (s *SpanWriter) CreateTemplates(spanTemplate, serviceTemplate, indexPrefix if indexPrefix != "" && !strings.HasSuffix(indexPrefix, "-") { indexPrefix += "-" } - _, err := s.client.CreateTemplate(indexPrefix + "jaeger-span").Body(spanTemplate).Do(context.Background()) + _, err := s.client().CreateTemplate(indexPrefix + "jaeger-span").Body(spanTemplate).Do(context.Background()) if err != nil { return err } - _, err = s.client.CreateTemplate(indexPrefix + "jaeger-service").Body(serviceTemplate).Do(context.Background()) + _, err = s.client().CreateTemplate(indexPrefix + "jaeger-service").Body(serviceTemplate).Do(context.Background()) if err != nil { return err } @@ -159,7 +159,7 @@ func (s *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error { // Close closes SpanWriter func (s *SpanWriter) Close() error { - return s.client.Close() + return s.client().Close() } func keyInCache(key string, c cache.Cache) bool { @@ -175,5 +175,5 @@ func (s *SpanWriter) writeService(indexName string, jsonSpan *dbmodel.Span) { } func (s *SpanWriter) writeSpan(indexName string, jsonSpan *dbmodel.Span) { - s.client.Index().Index(indexName).Type(spanType).BodyJson(&jsonSpan).Add() + s.client().Index().Index(indexName).Type(spanType).BodyJson(&jsonSpan).Add() } diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go index 69f1dea89fe..5c637c409b5 100644 --- a/plugin/storage/es/spanstore/writer_test.go +++ b/plugin/storage/es/spanstore/writer_test.go @@ -29,6 +29,7 @@ import ( "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel" @@ -50,7 +51,7 @@ func withSpanWriter(fn func(w *spanWriterTest)) { client: client, logger: logger, logBuffer: logBuffer, - writer: NewSpanWriter(SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory, SpanIndexDateLayout: "2006-01-02", ServiceIndexDateLayout: "2006-01-02"}), + writer: NewSpanWriter(SpanWriterParams{Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, SpanIndexDateLayout: "2006-01-02", ServiceIndexDateLayout: "2006-01-02"}), } fn(w) } @@ -59,6 +60,7 @@ var _ spanstore.Writer = &SpanWriter{} // check API conformance func TestSpanWriterIndices(t *testing.T) { client := &mocks.Client{} + clientFn := func() es.Client { return client } logger, _ := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) date := time.Now() @@ -72,49 +74,49 @@ func TestSpanWriterIndices(t *testing.T) { }{ { params: SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: false, }, indices: []string{spanIndex + spanDataLayoutFormat, serviceIndex + serviceDataLayoutFormat}, }, { params: SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, UseReadWriteAliases: true, }, indices: []string{spanIndex + "write", serviceIndex + "write"}, }, { params: SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: false, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndex + serviceDataLayoutFormat}, }, { params: SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, UseReadWriteAliases: true, }, indices: []string{"foo:-" + spanIndex + "write", "foo:-" + serviceIndex + "write"}, }, { params: SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true, }, indices: []string{spanIndex + archiveIndexSuffix, ""}, }, { params: SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix, ""}, }, { params: SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, Archive: true, UseReadWriteAliases: true, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveWriteIndexSuffix, ""}, @@ -355,6 +357,7 @@ func TestWriteSpanInternalError(t *testing.T) { func TestNewSpanTags(t *testing.T) { client := &mocks.Client{} + clientFn := func() es.Client { return client } logger, _ := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) testCases := []struct { @@ -364,7 +367,7 @@ func TestNewSpanTags(t *testing.T) { }{ { writer: NewSpanWriter(SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, AllTagsAsFields: true, }), expected: dbmodel.Span{ @@ -375,7 +378,7 @@ func TestNewSpanTags(t *testing.T) { }, { writer: NewSpanWriter(SpanWriterParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, + Client: clientFn, Logger: logger, MetricsFactory: metricsFactory, TagKeysAsFields: []string{"foo", "bar", "rere"}, }), expected: dbmodel.Span{ @@ -385,7 +388,7 @@ func TestNewSpanTags(t *testing.T) { name: "definedTagNames", }, { - writer: NewSpanWriter(SpanWriterParams{Client: client, Logger: logger, MetricsFactory: metricsFactory}), + writer: NewSpanWriter(SpanWriterParams{Client: clientFn, Logger: logger, MetricsFactory: metricsFactory}), expected: dbmodel.Span{ Tags: []dbmodel.KeyValue{{ Key: "foo", @@ -444,7 +447,7 @@ func TestSpanWriterParamsTTL(t *testing.T) { t.Run(test.name, func(t *testing.T) { client := &mocks.Client{} params := SpanWriterParams{ - Client: client, + Client: func() es.Client { return client }, Logger: logger, MetricsFactory: metricsFactory, ServiceCacheTTL: test.serviceTTL, diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 9d2ec031b1f..4bc05fc58a5 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -127,9 +127,10 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro if err != nil { return err } + clientFn := func() estemplate.Client { return client } w := spanstore.NewSpanWriter( spanstore.SpanWriterParams{ - Client: client, + Client: clientFn, Logger: s.logger, MetricsFactory: metrics.NullFactory, IndexPrefix: indexPrefix, @@ -143,7 +144,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro } s.SpanWriter = w s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ - Client: client, + Client: clientFn, Logger: s.logger, MetricsFactory: metrics.NullFactory, IndexPrefix: indexPrefix, @@ -153,7 +154,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro MaxDocCount: defaultMaxDocCount, }) dependencyStore := dependencystore.NewDependencyStore(dependencystore.DependencyStoreParams{ - Client: client, + Client: clientFn, Logger: s.logger, IndexPrefix: indexPrefix, IndexDateLayout: indexDateLayout, From 3b5f514016c302f0c7f8095d90ef02e33cbead1b Mon Sep 17 00:00:00 2001 From: haanhvu Date: Fri, 21 Apr 2023 23:24:47 +0700 Subject: [PATCH 02/14] Append watcher incrementally Signed-off-by: haanhvu --- plugin/storage/es/factory.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 727ff14c9f1..ad148654c14 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -103,12 +103,12 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) } f.primaryClient.Store(&primaryClient) - f.watchers = make([]*fswatcher.FSWatcher, 2) + f.watchers = make([]*fswatcher.FSWatcher, 0) primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger) if err != nil { return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err) } - f.watchers[0] = primaryWatcher + f.watchers = append(f.watchers, primaryWatcher) if f.archiveConfig.Enabled { archiveClient, err := f.newClientFn(f.archiveConfig, logger, metricsFactory) @@ -121,7 +121,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) if err != nil { return fmt.Errorf("failed to create watcher for archive ES client's password: %w", err) } - f.watchers[1] = archiveWatcher + f.watchers = append(f.watchers, archiveWatcher) } return nil @@ -275,14 +275,16 @@ func (f *Factory) onPrimaryPasswordChange() { primaryClient, err := f.newClientFn(f.primaryConfig, f.logger, f.metricsFactory) if err != nil { f.logger.Error("failed to recreate primary Elasticsearch client from new password", zap.Error(err)) + } else { + f.primaryClient.Swap(&primaryClient) } - f.primaryClient.Swap(&primaryClient) } func (f *Factory) onArchivePasswordChange() { archiveClient, err := f.newClientFn(f.archiveConfig, f.logger, f.metricsFactory) if err != nil { f.logger.Error("failed to recreate archive Elasticsearch client from new password", zap.Error(err)) + } else { + f.archiveClient.Swap(&archiveClient) } - f.archiveClient.Swap(&archiveClient) } From 6dc3a2e14b2cb1ab964ad298b145668580c778f9 Mon Sep 17 00:00:00 2001 From: haanhvu Date: Sun, 23 Apr 2023 16:58:49 +0700 Subject: [PATCH 03/14] Fix lint failures and append from nil slice Signed-off-by: haanhvu --- plugin/storage/es/factory.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index ad148654c14..cbabcbcaf42 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -103,7 +103,6 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) } f.primaryClient.Store(&primaryClient) - f.watchers = make([]*fswatcher.FSWatcher, 0) primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger) if err != nil { return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err) @@ -129,17 +128,17 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - return createSpanReader(f.metricsFactory, f.logger, f.primaryClient, f.primaryConfig, false) + return createSpanReader(f.metricsFactory, f.logger, &f.primaryClient, f.primaryConfig, false) } // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return createSpanWriter(f.metricsFactory, f.logger, f.primaryClient, f.primaryConfig, false) + return createSpanWriter(f.metricsFactory, f.logger, &f.primaryClient, f.primaryConfig, false) } // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - return createDependencyReader(f.logger, f.primaryClient, f.primaryConfig) + return createDependencyReader(f.logger, &f.primaryClient, f.primaryConfig) } // CreateArchiveSpanReader implements storage.ArchiveFactory @@ -147,7 +146,7 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { if !f.archiveConfig.Enabled { return nil, nil } - return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true) + return createSpanReader(f.metricsFactory, f.logger, &f.archiveClient, f.archiveConfig, true) } // CreateArchiveSpanWriter implements storage.ArchiveFactory @@ -155,13 +154,13 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { if !f.archiveConfig.Enabled { return nil, nil } - return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true) + return createSpanWriter(f.metricsFactory, f.logger, &f.archiveClient, f.archiveConfig, true) } func createSpanReader( mFactory metrics.Factory, logger *zap.Logger, - client atomic.Pointer[es.Client], + client *atomic.Pointer[es.Client], cfg *config.Configuration, archive bool, ) (spanstore.Reader, error) { @@ -189,7 +188,7 @@ func createSpanReader( func createSpanWriter( mFactory metrics.Factory, logger *zap.Logger, - client atomic.Pointer[es.Client], + client *atomic.Pointer[es.Client], cfg *config.Configuration, archive bool, ) (spanstore.Writer, error) { @@ -242,7 +241,7 @@ func createSpanWriter( func createDependencyReader( logger *zap.Logger, - client atomic.Pointer[es.Client], + client *atomic.Pointer[es.Client], cfg *config.Configuration, ) (dependencystore.Reader, error) { reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{ From 3641d673e7df0763a7a176a0e25b3aef7e2d228f Mon Sep 17 00:00:00 2001 From: haanhvu Date: Sun, 23 Apr 2023 21:38:53 +0700 Subject: [PATCH 04/14] Add options test Signed-off-by: haanhvu --- plugin/storage/es/options_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/plugin/storage/es/options_test.go b/plugin/storage/es/options_test.go index 9e0ffeb73a7..60c7023cac4 100644 --- a/plugin/storage/es/options_test.go +++ b/plugin/storage/es/options_test.go @@ -30,6 +30,7 @@ func TestOptions(t *testing.T) { primary := opts.GetPrimary() assert.Empty(t, primary.Username) assert.Empty(t, primary.Password) + assert.Empty(t, primary.PasswordFilePath) assert.NotEmpty(t, primary.Servers) assert.Empty(t, primary.RemoteReadClusters) assert.Equal(t, int64(5), primary.NumShards) @@ -41,6 +42,7 @@ func TestOptions(t *testing.T) { aux := opts.Get("archive") assert.Equal(t, primary.Username, aux.Username) assert.Equal(t, primary.Password, aux.Password) + assert.Equal(t, primary.PasswordFilePath, aux.PasswordFilePath) assert.Equal(t, primary.Servers, aux.Servers) } @@ -52,6 +54,7 @@ func TestOptionsWithFlags(t *testing.T) { "--es.username=hello", "--es.password=world", "--es.token-file=/foo/bar", + "--es.password-file=/foo/bar/baz", "--es.sniffer=true", "--es.sniffer-tls-enabled=true", "--es.max-span-age=48h", @@ -81,7 +84,9 @@ func TestOptionsWithFlags(t *testing.T) { primary := opts.GetPrimary() assert.Equal(t, "hello", primary.Username) + assert.Equal(t, "world", primary.Password) assert.Equal(t, "/foo/bar", primary.TokenFilePath) + assert.Equal(t, "/foo/bar/baz", primary.PasswordFilePath) assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers) assert.Equal(t, []string{"cluster_one", "cluster_two"}, primary.RemoteReadClusters) assert.Equal(t, 48*time.Hour, primary.MaxSpanAge) From 2bcfafc6c71936b00241e36a880b1eae2bf0ea2d Mon Sep 17 00:00:00 2001 From: haanhvu Date: Fri, 28 Apr 2023 16:28:44 +0700 Subject: [PATCH 05/14] Make config atomic and add test on primary config Signed-off-by: haanhvu --- pkg/es/config/config.go | 6 ++-- plugin/storage/es/factory.go | 60 +++++++++++++++++++------------ plugin/storage/es/factory_test.go | 56 +++++++++++++++++++++++++---- 3 files changed, 90 insertions(+), 32 deletions(-) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index fe37e7f40b6..bb2e31d7005 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -317,7 +317,7 @@ func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOp options = append(options, elastic.SetHttpClient(httpClient)) if c.PasswordFilePath != "" { - passwordFromFile, err := loadFileContent(c.PasswordFilePath) + passwordFromFile, err := LoadFileContent(c.PasswordFilePath) if err != nil { return nil, fmt.Errorf("failed to load password from file: %w", err) } @@ -409,7 +409,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe if c.AllowTokenFromContext { logger.Warn("Token file and token propagation are both enabled, token from file won't be used") } - tokenFromFile, err := loadFileContent(c.TokenFilePath) + tokenFromFile, err := LoadFileContent(c.TokenFilePath) if err != nil { return nil, err } @@ -425,7 +425,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe return transport, nil } -func loadFileContent(path string) (string, error) { +func LoadFileContent(path string) (string, error) { b, err := os.ReadFile(filepath.Clean(path)) if err != nil { return "", err diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index cbabcbcaf42..b26d6c88ab9 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -56,9 +56,9 @@ type Factory struct { newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) - primaryConfig *config.Configuration + primaryConfig atomic.Pointer[config.Configuration] primaryClient atomic.Pointer[es.Client] - archiveConfig *config.Configuration + archiveConfig atomic.Pointer[config.Configuration] archiveClient atomic.Pointer[es.Client] watchers []*fswatcher.FSWatcher @@ -80,16 +80,16 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) { // InitFromViper implements plugin.Configurable func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) { f.Options.InitFromViper(v) - f.primaryConfig = f.Options.GetPrimary() - f.archiveConfig = f.Options.Get(archiveNamespace) + f.primaryConfig.Store(f.Options.GetPrimary()) + f.archiveConfig.Store(f.Options.Get(archiveNamespace)) } // InitFromOptions configures factory from Options struct. func (f *Factory) InitFromOptions(o Options) { f.Options = &o - f.primaryConfig = f.Options.GetPrimary() + f.primaryConfig.Store(f.Options.GetPrimary()) if cfg := f.Options.Get(archiveNamespace); cfg != nil { - f.archiveConfig = cfg + f.archiveConfig.Store(cfg) } } @@ -97,26 +97,26 @@ func (f *Factory) InitFromOptions(o Options) { func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { f.metricsFactory, f.logger = metricsFactory, logger - primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory) + primaryClient, err := f.newClientFn(f.primaryConfig.Load(), logger, metricsFactory) if err != nil { return fmt.Errorf("failed to create primary Elasticsearch client: %w", err) } f.primaryClient.Store(&primaryClient) - primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger) + primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.Load().PasswordFilePath}, f.onPrimaryPasswordChange, f.logger) if err != nil { return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err) } f.watchers = append(f.watchers, primaryWatcher) - if f.archiveConfig.Enabled { - archiveClient, err := f.newClientFn(f.archiveConfig, logger, metricsFactory) + if f.archiveConfig.Load().Enabled { + archiveClient, err := f.newClientFn(f.archiveConfig.Load(), logger, metricsFactory) if err != nil { return fmt.Errorf("failed to create archive Elasticsearch client: %w", err) } f.archiveClient.Store(&archiveClient) - archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.PasswordFilePath}, f.onArchivePasswordChange, f.logger) + archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.Load().PasswordFilePath}, f.onArchivePasswordChange, f.logger) if err != nil { return fmt.Errorf("failed to create watcher for archive ES client's password: %w", err) } @@ -128,33 +128,33 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - return createSpanReader(f.metricsFactory, f.logger, &f.primaryClient, f.primaryConfig, false) + return createSpanReader(f.metricsFactory, f.logger, &f.primaryClient, f.primaryConfig.Load(), false) } // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return createSpanWriter(f.metricsFactory, f.logger, &f.primaryClient, f.primaryConfig, false) + return createSpanWriter(f.metricsFactory, f.logger, &f.primaryClient, f.primaryConfig.Load(), false) } // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - return createDependencyReader(f.logger, &f.primaryClient, f.primaryConfig) + return createDependencyReader(f.logger, &f.primaryClient, f.primaryConfig.Load()) } // CreateArchiveSpanReader implements storage.ArchiveFactory func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { - if !f.archiveConfig.Enabled { + if !f.archiveConfig.Load().Enabled { return nil, nil } - return createSpanReader(f.metricsFactory, f.logger, &f.archiveClient, f.archiveConfig, true) + return createSpanReader(f.metricsFactory, f.logger, &f.archiveClient, f.archiveConfig.Load(), true) } // CreateArchiveSpanWriter implements storage.ArchiveFactory func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { - if !f.archiveConfig.Enabled { + if !f.archiveConfig.Load().Enabled { return nil, nil } - return createSpanWriter(f.metricsFactory, f.logger, &f.archiveClient, f.archiveConfig, true) + return createSpanWriter(f.metricsFactory, f.logger, &f.archiveClient, f.archiveConfig.Load(), true) } func createSpanReader( @@ -271,19 +271,35 @@ func (f *Factory) Close() error { } func (f *Factory) onPrimaryPasswordChange() { - primaryClient, err := f.newClientFn(f.primaryConfig, f.logger, f.metricsFactory) + newPrimaryCfg := *f.primaryConfig.Load() + newPrimaryPassword, err := config.LoadFileContent(newPrimaryCfg.PasswordFilePath) + if err != nil { + f.logger.Error("failed to reload password for primary Elasticsearch client", zap.Error(err)) + } else { + newPrimaryCfg.Password = newPrimaryPassword + f.primaryConfig.Store(&newPrimaryCfg) + } + primaryClient, err := f.newClientFn(f.primaryConfig.Load(), f.logger, f.metricsFactory) if err != nil { f.logger.Error("failed to recreate primary Elasticsearch client from new password", zap.Error(err)) } else { - f.primaryClient.Swap(&primaryClient) + f.primaryClient.Store(&primaryClient) } } func (f *Factory) onArchivePasswordChange() { - archiveClient, err := f.newClientFn(f.archiveConfig, f.logger, f.metricsFactory) + newArchiveCfg := *f.archiveConfig.Load() + newPassword, err := config.LoadFileContent(newArchiveCfg.PasswordFilePath) + if err != nil { + f.logger.Error("failed to reload password for archive Elasticsearch client", zap.Error(err)) + } else { + newArchiveCfg.Password = newPassword + f.archiveConfig.Store(&newArchiveCfg) + } + archiveClient, err := f.newClientFn(f.archiveConfig.Load(), f.logger, f.metricsFactory) if err != nil { f.logger.Error("failed to recreate archive Elasticsearch client from new password", zap.Error(err)) } else { - f.archiveClient.Swap(&archiveClient) + f.archiveClient.Store(&archiveClient) } } diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 9405497275f..0c80291a753 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -16,24 +16,22 @@ package es import ( - "context" - "errors" + "os" + "sync/atomic" "testing" + "github.com/olivere/elastic" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" - "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/es" escfg "github.com/jaegertracing/jaeger/pkg/es/config" - "github.com/jaegertracing/jaeger/pkg/es/mocks" + eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper" "github.com/jaegertracing/jaeger/pkg/metrics" - "github.com/jaegertracing/jaeger/storage" ) -var _ storage.Factory = new(Factory) +/*var _ storage.Factory = new(Factory) type mockClientBuilder struct { err error @@ -239,4 +237,48 @@ func TestInitFromOptions(t *testing.T) { f.InitFromOptions(o) assert.Equal(t, o.GetPrimary(), f.primaryConfig) assert.Equal(t, o.Get(archiveNamespace), f.archiveConfig) +}*/ + +func TestPasswordFromFile(t *testing.T) { + passwordFile, err := os.CreateTemp("", "") + require.NoError(t, err) + defer passwordFile.Close() + + passwordFile.WriteString("bar") + + c := escfg.Configuration{ + Username: "foo", + PasswordFilePath: passwordFile.Name(), + } + + f := NewFactory() + f.newClientFn = func(c *escfg.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) { + rawClient := &elastic.Client{} + + passwordFromFile, err := escfg.LoadFileContent(c.PasswordFilePath) + require.NoError(t, err) + c.Password = passwordFromFile + + option := elastic.SetBasicAuth(c.Username, c.Password) + option(rawClient) + + return eswrapper.WrapESClient(rawClient, nil, 0), nil + } + f.primaryConfig.Store(&c) + f.archiveConfig.Store(&c) + + require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) + assert.Equal(t, "bar", f.primaryConfig.Load().Password) + + primaryClient, err := f.newClientFn(f.primaryConfig.Load(), nil, nil) + require.NoError(t, err) + + var expectedPrimaryClient atomic.Pointer[es.Client] + expectedPrimaryClient.Store(&primaryClient) + assert.Equal(t, expectedPrimaryClient.Load(), f.primaryClient.Load()) + + err = os.WriteFile(f.primaryConfig.Load().PasswordFilePath, []byte("barbaz"), 0o600) + //f.onPrimaryPasswordChange() + require.NoError(t, err) + assert.Equal(t, "barbaz", f.primaryConfig.Load().Password) } From c9e8a2a6ab35aabf90170027907cd0b132675dcf Mon Sep 17 00:00:00 2001 From: haanhvu Date: Fri, 28 Apr 2023 16:51:12 +0700 Subject: [PATCH 06/14] Add f.onPrimaryPasswordChange() in test Signed-off-by: haanhvu --- plugin/storage/es/factory_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 0c80291a753..f7fc89dc11f 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -277,8 +277,8 @@ func TestPasswordFromFile(t *testing.T) { expectedPrimaryClient.Store(&primaryClient) assert.Equal(t, expectedPrimaryClient.Load(), f.primaryClient.Load()) - err = os.WriteFile(f.primaryConfig.Load().PasswordFilePath, []byte("barbaz"), 0o600) - //f.onPrimaryPasswordChange() + _, err = passwordFile.WriteString("baz") require.NoError(t, err) + f.onPrimaryPasswordChange() assert.Equal(t, "barbaz", f.primaryConfig.Load().Password) } From 66ad336a3e6b45495d730a813425cf2522b54492 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 8 Sep 2023 11:55:39 -0400 Subject: [PATCH 07/14] move pwd lines together Signed-off-by: Yuri Shkuro --- pkg/es/config/config.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index bb2e31d7005..3385af9a5fa 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -297,10 +297,6 @@ func (c *Configuration) TagKeysAsFields() ([]string, error) { // getConfigOptions wraps the configs to feed to the ElasticSearch client init func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOptionFunc, error) { - if c.Password != "" && c.PasswordFilePath != "" { - return nil, fmt.Errorf("both Password and PasswordFilePath are set") - } - options := []elastic.ClientOptionFunc{ elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer), // Disable health check when token from context is allowed, this is because at this time @@ -316,6 +312,9 @@ func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOp } options = append(options, elastic.SetHttpClient(httpClient)) + if c.Password != "" && c.PasswordFilePath != "" { + return nil, fmt.Errorf("both Password and PasswordFilePath are set") + } if c.PasswordFilePath != "" { passwordFromFile, err := LoadFileContent(c.PasswordFilePath) if err != nil { From 1db1edd5cfd6e1f7caff06e6a73e307d1465d76b Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 8 Sep 2023 12:19:53 -0400 Subject: [PATCH 08/14] cleanup Signed-off-by: Yuri Shkuro --- plugin/storage/es/factory.go | 95 ++++++++++++++++++++---------------- 1 file changed, 53 insertions(+), 42 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 55af2905542..056522e2c28 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -59,9 +59,10 @@ type Factory struct { newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) - primaryConfig atomic.Pointer[config.Configuration] + primaryConfig *config.Configuration + archiveConfig *config.Configuration + primaryClient atomic.Pointer[es.Client] - archiveConfig atomic.Pointer[config.Configuration] archiveClient atomic.Pointer[es.Client] watchers []*fswatcher.FSWatcher @@ -84,16 +85,16 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) { // InitFromViper implements plugin.Configurable func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) { f.Options.InitFromViper(v) - f.primaryConfig.Store(f.Options.GetPrimary()) - f.archiveConfig.Store(f.Options.Get(archiveNamespace)) + f.primaryConfig = f.Options.GetPrimary() + f.archiveConfig = f.Options.Get(archiveNamespace) } // InitFromOptions configures factory from Options struct. func (f *Factory) InitFromOptions(o Options) { f.Options = &o - f.primaryConfig.Store(f.Options.GetPrimary()) + f.primaryConfig = f.Options.GetPrimary() if cfg := f.Options.Get(archiveNamespace); cfg != nil { - f.archiveConfig.Store(cfg) + f.archiveConfig = cfg } } @@ -101,68 +102,80 @@ func (f *Factory) InitFromOptions(o Options) { func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { f.metricsFactory, f.logger = metricsFactory, logger - primaryClient, err := f.newClientFn(f.primaryConfig.Load(), logger, metricsFactory) + primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory) if err != nil { return fmt.Errorf("failed to create primary Elasticsearch client: %w", err) } f.primaryClient.Store(&primaryClient) - primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.Load().PasswordFilePath}, f.onPrimaryPasswordChange, f.logger) - if err != nil { - return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err) + if f.primaryConfig.PasswordFilePath != "" { + primaryWatcher, err := fswatcher.New([]string{f.primaryConfig.PasswordFilePath}, f.onPrimaryPasswordChange, f.logger) + if err != nil { + return fmt.Errorf("failed to create watcher for primary ES client's password: %w", err) + } + f.watchers = append(f.watchers, primaryWatcher) } - f.watchers = append(f.watchers, primaryWatcher) - if f.archiveConfig.Load().Enabled { - archiveClient, err := f.newClientFn(f.archiveConfig.Load(), logger, metricsFactory) + if f.archiveConfig.Enabled { + archiveClient, err := f.newClientFn(f.archiveConfig, logger, metricsFactory) if err != nil { return fmt.Errorf("failed to create archive Elasticsearch client: %w", err) } f.archiveClient.Store(&archiveClient) - archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.Load().PasswordFilePath}, f.onArchivePasswordChange, f.logger) - if err != nil { - return fmt.Errorf("failed to create watcher for archive ES client's password: %w", err) + if f.archiveConfig.PasswordFilePath != "" { + archiveWatcher, err := fswatcher.New([]string{f.archiveConfig.PasswordFilePath}, f.onArchivePasswordChange, f.logger) + if err != nil { + return fmt.Errorf("failed to create watcher for archive ES client's password: %w", err) + } + f.watchers = append(f.watchers, archiveWatcher) } - f.watchers = append(f.watchers, archiveWatcher) } return nil } +func (f *Factory) getPrimaryClient() es.Client { + return *(f.primaryClient.Load()) +} + +func (f *Factory) getArchiveClient() es.Client { + return *f.archiveClient.Load() +} + // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - return createSpanReader(f.primaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer) + return createSpanReader(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer) } // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return createSpanWriter(f.primaryClient, f.primaryConfig, false, f.metricsFactory, f.logger) + return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger) } // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - return createDependencyReader(f.primaryClient, f.primaryConfig, f.logger) + return createDependencyReader(f.getPrimaryClient, f.primaryConfig, f.logger) } // CreateArchiveSpanReader implements storage.ArchiveFactory func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { - if !f.archiveConfig.Load().Enabled { + if !f.archiveConfig.Enabled { return nil, nil } - return createSpanReader(f.archiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer) + return createSpanReader(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer) } // CreateArchiveSpanWriter implements storage.ArchiveFactory func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { - if !f.archiveConfig.Load().Enabled { + if !f.archiveConfig.Enabled { return nil, nil } - return createSpanWriter(f.archiveClient, f.archiveConfig, true, f.metricsFactory, f.logger) + return createSpanWriter(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger) } func createSpanReader( - client es.Client, + clientFn func() es.Client, cfg *config.Configuration, archive bool, mFactory metrics.Factory, @@ -173,7 +186,7 @@ func createSpanReader( return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping") } return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{ - Client: func() es.Client { return *client.Load() }, + Client: clientFn, MaxDocCount: cfg.MaxDocCount, MaxSpanAge: cfg.MaxSpanAge, IndexPrefix: cfg.IndexPrefix, @@ -192,7 +205,7 @@ func createSpanReader( } func createSpanWriter( - client *atomic.Pointer[es.Client], + clientFn func() es.Client, cfg *config.Configuration, archive bool, mFactory metrics.Factory, @@ -222,7 +235,7 @@ func createSpanWriter( return nil, err } writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{ - Client: func() es.Client { return *client.Load() }, + Client: clientFn, IndexPrefix: cfg.IndexPrefix, SpanIndexDateLayout: cfg.IndexDateLayoutSpans, ServiceIndexDateLayout: cfg.IndexDateLayoutServices, @@ -246,12 +259,12 @@ func createSpanWriter( } func createDependencyReader( - client *atomic.Pointer[es.Client], + clientFn func() es.Client, cfg *config.Configuration, logger *zap.Logger, ) (dependencystore.Reader, error) { reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{ - Client: func() es.Client { return *client.Load() }, + Client: clientFn, Logger: logger, IndexPrefix: cfg.IndexPrefix, IndexDateLayout: cfg.IndexDateLayoutDependencies, @@ -277,15 +290,14 @@ func (f *Factory) Close() error { } func (f *Factory) onPrimaryPasswordChange() { - newPrimaryCfg := *f.primaryConfig.Load() - newPrimaryPassword, err := config.LoadFileContent(newPrimaryCfg.PasswordFilePath) + newPrimaryPassword, err := config.LoadFileContent(f.primaryConfig.PasswordFilePath) if err != nil { f.logger.Error("failed to reload password for primary Elasticsearch client", zap.Error(err)) - } else { - newPrimaryCfg.Password = newPrimaryPassword - f.primaryConfig.Store(&newPrimaryCfg) + return } - primaryClient, err := f.newClientFn(f.primaryConfig.Load(), f.logger, f.metricsFactory) + newPrimaryCfg := *f.primaryConfig // copy by value + newPrimaryCfg.Password = newPrimaryPassword + primaryClient, err := f.newClientFn(&newPrimaryCfg, f.logger, f.metricsFactory) if err != nil { f.logger.Error("failed to recreate primary Elasticsearch client from new password", zap.Error(err)) } else { @@ -294,15 +306,14 @@ func (f *Factory) onPrimaryPasswordChange() { } func (f *Factory) onArchivePasswordChange() { - newArchiveCfg := *f.archiveConfig.Load() - newPassword, err := config.LoadFileContent(newArchiveCfg.PasswordFilePath) + newPassword, err := config.LoadFileContent(f.archiveConfig.PasswordFilePath) if err != nil { f.logger.Error("failed to reload password for archive Elasticsearch client", zap.Error(err)) - } else { - newArchiveCfg.Password = newPassword - f.archiveConfig.Store(&newArchiveCfg) + return } - archiveClient, err := f.newClientFn(f.archiveConfig.Load(), f.logger, f.metricsFactory) + newArchiveCfg := *f.archiveConfig // copy by value + newArchiveCfg.Password = newPassword + archiveClient, err := f.newClientFn(&newArchiveCfg, f.logger, f.metricsFactory) if err != nil { f.logger.Error("failed to recreate archive Elasticsearch client from new password", zap.Error(err)) } else { From 8e64941a7118590f0e4c58f161fb788b6d79def7 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 8 Sep 2023 12:38:20 -0400 Subject: [PATCH 09/14] restore main tests Signed-off-by: Yuri Shkuro --- plugin/storage/es/factory.go | 1 + plugin/storage/es/factory_test.go | 80 ++++++++++++++++--------------- 2 files changed, 42 insertions(+), 39 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 056522e2c28..a9e338aa248 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -95,6 +95,7 @@ func (f *Factory) InitFromOptions(o Options) { f.primaryConfig = f.Options.GetPrimary() if cfg := f.Options.Get(archiveNamespace); cfg != nil { f.archiveConfig = cfg + // TODO it looks like f.archiveConfig==nil is possible, but below f.archiveConfig.Enabled is used } } diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index f7fc89dc11f..2ef2d748bca 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -16,22 +16,24 @@ package es import ( - "os" - "sync/atomic" + "context" + "errors" "testing" - "github.com/olivere/elastic" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/es" escfg "github.com/jaegertracing/jaeger/pkg/es/config" - eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper" + "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/storage" ) -/*var _ storage.Factory = new(Factory) +var _ storage.Factory = new(Factory) type mockClientBuilder struct { err error @@ -237,48 +239,48 @@ func TestInitFromOptions(t *testing.T) { f.InitFromOptions(o) assert.Equal(t, o.GetPrimary(), f.primaryConfig) assert.Equal(t, o.Get(archiveNamespace), f.archiveConfig) -}*/ +} -func TestPasswordFromFile(t *testing.T) { - passwordFile, err := os.CreateTemp("", "") - require.NoError(t, err) - defer passwordFile.Close() +// func TestPasswordFromFile(t *testing.T) { +// passwordFile, err := os.CreateTemp("", "") +// require.NoError(t, err) +// defer passwordFile.Close() - passwordFile.WriteString("bar") +// passwordFile.WriteString("bar") - c := escfg.Configuration{ - Username: "foo", - PasswordFilePath: passwordFile.Name(), - } +// c := escfg.Configuration{ +// Username: "foo", +// PasswordFilePath: passwordFile.Name(), +// } - f := NewFactory() - f.newClientFn = func(c *escfg.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) { - rawClient := &elastic.Client{} +// f := NewFactory() +// f.newClientFn = func(c *escfg.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) { +// rawClient := &elastic.Client{} - passwordFromFile, err := escfg.LoadFileContent(c.PasswordFilePath) - require.NoError(t, err) - c.Password = passwordFromFile +// passwordFromFile, err := escfg.LoadFileContent(c.PasswordFilePath) +// require.NoError(t, err) +// c.Password = passwordFromFile - option := elastic.SetBasicAuth(c.Username, c.Password) - option(rawClient) +// option := elastic.SetBasicAuth(c.Username, c.Password) +// option(rawClient) - return eswrapper.WrapESClient(rawClient, nil, 0), nil - } - f.primaryConfig.Store(&c) - f.archiveConfig.Store(&c) +// return eswrapper.WrapESClient(rawClient, nil, 0), nil +// } +// f.primaryConfig.Store(&c) +// f.archiveConfig.Store(&c) - require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) - assert.Equal(t, "bar", f.primaryConfig.Load().Password) +// require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) +// assert.Equal(t, "bar", f.primaryConfig.Load().Password) - primaryClient, err := f.newClientFn(f.primaryConfig.Load(), nil, nil) - require.NoError(t, err) +// primaryClient, err := f.newClientFn(f.primaryConfig.Load(), nil, nil) +// require.NoError(t, err) - var expectedPrimaryClient atomic.Pointer[es.Client] - expectedPrimaryClient.Store(&primaryClient) - assert.Equal(t, expectedPrimaryClient.Load(), f.primaryClient.Load()) +// var expectedPrimaryClient atomic.Pointer[es.Client] +// expectedPrimaryClient.Store(&primaryClient) +// assert.Equal(t, expectedPrimaryClient.Load(), f.primaryClient.Load()) - _, err = passwordFile.WriteString("baz") - require.NoError(t, err) - f.onPrimaryPasswordChange() - assert.Equal(t, "barbaz", f.primaryConfig.Load().Password) -} +// _, err = passwordFile.WriteString("baz") +// require.NoError(t, err) +// f.onPrimaryPasswordChange() +// assert.Equal(t, "barbaz", f.primaryConfig.Load().Password) +// } From dbac89cc0b11713ceeb108dadea1920259115427 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 8 Sep 2023 14:05:46 -0400 Subject: [PATCH 10/14] Add test Signed-off-by: Yuri Shkuro --- plugin/storage/es/factory.go | 13 ++-- plugin/storage/es/factory_test.go | 101 ++++++++++++++++++------------ 2 files changed, 66 insertions(+), 48 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index a9e338aa248..1c18a753fb5 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -85,23 +85,18 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) { // InitFromViper implements plugin.Configurable func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) { f.Options.InitFromViper(v) - f.primaryConfig = f.Options.GetPrimary() - f.archiveConfig = f.Options.Get(archiveNamespace) } // InitFromOptions configures factory from Options struct. func (f *Factory) InitFromOptions(o Options) { f.Options = &o - f.primaryConfig = f.Options.GetPrimary() - if cfg := f.Options.Get(archiveNamespace); cfg != nil { - f.archiveConfig = cfg - // TODO it looks like f.archiveConfig==nil is possible, but below f.archiveConfig.Enabled is used - } } -// Initialize implements storage.Factory +// Initialize implements storage.Factory. func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { f.metricsFactory, f.logger = metricsFactory, logger + f.primaryConfig = f.Options.GetPrimary() + f.archiveConfig = f.Options.Get(archiveNamespace) primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory) if err != nil { @@ -298,6 +293,7 @@ func (f *Factory) onPrimaryPasswordChange() { } newPrimaryCfg := *f.primaryConfig // copy by value newPrimaryCfg.Password = newPrimaryPassword + newPrimaryCfg.PasswordFilePath = "" // avoid error that both are set primaryClient, err := f.newClientFn(&newPrimaryCfg, f.logger, f.metricsFactory) if err != nil { f.logger.Error("failed to recreate primary Elasticsearch client from new password", zap.Error(err)) @@ -314,6 +310,7 @@ func (f *Factory) onArchivePasswordChange() { } newArchiveCfg := *f.archiveConfig // copy by value newArchiveCfg.Password = newPassword + newArchiveCfg.PasswordFilePath = "" // avoid error that both are set archiveClient, err := f.newClientFn(&newArchiveCfg, f.logger, f.metricsFactory) if err != nil { f.logger.Error("failed to recreate archive Elasticsearch client from new password", zap.Error(err)) diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 2ef2d748bca..ee9c1fd2c15 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -17,14 +17,24 @@ package es import ( "context" + "encoding/base64" "errors" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "sync/atomic" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.uber.org/zap/zaptest" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/es" escfg "github.com/jaegertracing/jaeger/pkg/es/config" @@ -241,46 +251,57 @@ func TestInitFromOptions(t *testing.T) { assert.Equal(t, o.Get(archiveNamespace), f.archiveConfig) } -// func TestPasswordFromFile(t *testing.T) { -// passwordFile, err := os.CreateTemp("", "") -// require.NoError(t, err) -// defer passwordFile.Close() - -// passwordFile.WriteString("bar") - -// c := escfg.Configuration{ -// Username: "foo", -// PasswordFilePath: passwordFile.Name(), -// } - -// f := NewFactory() -// f.newClientFn = func(c *escfg.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) { -// rawClient := &elastic.Client{} - -// passwordFromFile, err := escfg.LoadFileContent(c.PasswordFilePath) -// require.NoError(t, err) -// c.Password = passwordFromFile - -// option := elastic.SetBasicAuth(c.Username, c.Password) -// option(rawClient) - -// return eswrapper.WrapESClient(rawClient, nil, 0), nil -// } -// f.primaryConfig.Store(&c) -// f.archiveConfig.Store(&c) - -// require.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) -// assert.Equal(t, "bar", f.primaryConfig.Load().Password) +func TestPasswordFromFile(t *testing.T) { + var authReceived atomic.Pointer[string] + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Logf("request to fake ES server: %v", r) + h := strings.Split(r.Header.Get("Authorization"), " ") + require.Len(t, h, 2) + require.Equal(t, "Basic", h[0]) + authBytes, err := base64.StdEncoding.DecodeString(h[1]) + assert.NoError(t, err, "header: %s", h) + auth := string(authBytes) + authReceived.Store(&auth) + w.Write([]byte(` + { + "Version": { + "Number": "6" + } + } + `)) + })) + defer server.Close() -// primaryClient, err := f.newClientFn(f.primaryConfig.Load(), nil, nil) -// require.NoError(t, err) + pwdFile := filepath.Join(t.TempDir(), "pwd") + require.NoError(t, os.WriteFile(pwdFile, []byte("first password"), 0o600)) -// var expectedPrimaryClient atomic.Pointer[es.Client] -// expectedPrimaryClient.Store(&primaryClient) -// assert.Equal(t, expectedPrimaryClient.Load(), f.primaryClient.Load()) + f := NewFactory() + f.primaryConfig = &escfg.Configuration{ + Servers: []string{server.URL}, + LogLevel: "debug", + PasswordFilePath: pwdFile, + } + require.NoError(t, f.Initialize(metrics.NullFactory, zaptest.NewLogger(t))) + defer f.Close() -// _, err = passwordFile.WriteString("baz") -// require.NoError(t, err) -// f.onPrimaryPasswordChange() -// assert.Equal(t, "barbaz", f.primaryConfig.Load().Password) -// } + writer, err := f.CreateSpanWriter() + require.NoError(t, err) + span := &model.Span{ + Process: &model.Process{ServiceName: "foo"}, + } + require.NoError(t, writer.WriteSpan(context.Background(), span)) + require.Equal(t, ":first password", *authReceived.Load()) + + client1 := f.getPrimaryClient() + require.NoError(t, os.WriteFile(pwdFile, []byte("second password"), 0o600)) + assert.Eventually(t, + func() bool { + client2 := f.getPrimaryClient() + return client1 != client2 + }, + 5*time.Second, time.Millisecond, + "expecting es.Client to change for the new password", + ) + require.NoError(t, writer.WriteSpan(context.Background(), span)) + require.Equal(t, ":second password", *authReceived.Load()) +} From 29e3cb69b5ec8ec5d6f56b4f5c109cc7c1bbf829 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 8 Sep 2023 14:18:24 -0400 Subject: [PATCH 11/14] fix test Signed-off-by: Yuri Shkuro --- plugin/storage/es/factory.go | 6 ++++-- plugin/storage/es/factory_test.go | 1 + 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 1c18a753fb5..e7afa46d781 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -85,18 +85,20 @@ func (f *Factory) AddFlags(flagSet *flag.FlagSet) { // InitFromViper implements plugin.Configurable func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) { f.Options.InitFromViper(v) + f.primaryConfig = f.Options.GetPrimary() + f.archiveConfig = f.Options.Get(archiveNamespace) } // InitFromOptions configures factory from Options struct. func (f *Factory) InitFromOptions(o Options) { f.Options = &o + f.primaryConfig = f.Options.GetPrimary() + f.archiveConfig = f.Options.Get(archiveNamespace) } // Initialize implements storage.Factory. func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { f.metricsFactory, f.logger = metricsFactory, logger - f.primaryConfig = f.Options.GetPrimary() - f.archiveConfig = f.Options.Get(archiveNamespace) primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory) if err != nil { diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index ee9c1fd2c15..7f291e2c369 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -281,6 +281,7 @@ func TestPasswordFromFile(t *testing.T) { LogLevel: "debug", PasswordFilePath: pwdFile, } + f.archiveConfig = &escfg.Configuration{} require.NoError(t, f.Initialize(metrics.NullFactory, zaptest.NewLogger(t))) defer f.Close() From 4c719e78be2ba56c7b88c62c8c304ea9761d9d96 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 8 Sep 2023 15:32:15 -0400 Subject: [PATCH 12/14] cleanup Signed-off-by: Yuri Shkuro --- pkg/es/config/config.go | 6 +++--- plugin/storage/es/factory.go | 15 +++++++++++++-- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 3385af9a5fa..9500fbcb883 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -316,7 +316,7 @@ func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOp return nil, fmt.Errorf("both Password and PasswordFilePath are set") } if c.PasswordFilePath != "" { - passwordFromFile, err := LoadFileContent(c.PasswordFilePath) + passwordFromFile, err := loadTokenFromFile(c.PasswordFilePath) if err != nil { return nil, fmt.Errorf("failed to load password from file: %w", err) } @@ -408,7 +408,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe if c.AllowTokenFromContext { logger.Warn("Token file and token propagation are both enabled, token from file won't be used") } - tokenFromFile, err := LoadFileContent(c.TokenFilePath) + tokenFromFile, err := loadTokenFromFile(c.TokenFilePath) if err != nil { return nil, err } @@ -424,7 +424,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe return transport, nil } -func LoadFileContent(path string) (string, error) { +func loadTokenFromFile(path string) (string, error) { b, err := os.ReadFile(filepath.Clean(path)) if err != nil { return "", err diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index e7afa46d781..da942b605de 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -20,6 +20,9 @@ import ( "flag" "fmt" "io" + "os" + "path/filepath" + "strings" "sync/atomic" "github.com/spf13/viper" @@ -288,7 +291,7 @@ func (f *Factory) Close() error { } func (f *Factory) onPrimaryPasswordChange() { - newPrimaryPassword, err := config.LoadFileContent(f.primaryConfig.PasswordFilePath) + newPrimaryPassword, err := loadTokenFromFile(f.primaryConfig.PasswordFilePath) if err != nil { f.logger.Error("failed to reload password for primary Elasticsearch client", zap.Error(err)) return @@ -305,7 +308,7 @@ func (f *Factory) onPrimaryPasswordChange() { } func (f *Factory) onArchivePasswordChange() { - newPassword, err := config.LoadFileContent(f.archiveConfig.PasswordFilePath) + newPassword, err := loadTokenFromFile(f.archiveConfig.PasswordFilePath) if err != nil { f.logger.Error("failed to reload password for archive Elasticsearch client", zap.Error(err)) return @@ -320,3 +323,11 @@ func (f *Factory) onArchivePasswordChange() { f.archiveClient.Store(&archiveClient) } } + +func loadTokenFromFile(path string) (string, error) { + b, err := os.ReadFile(filepath.Clean(path)) + if err != nil { + return "", err + } + return strings.TrimRight(string(b), "\r\n"), nil +} From ee85e72cc3ab2b20894da425398bf92f6ce54484 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 8 Sep 2023 19:55:36 -0400 Subject: [PATCH 13/14] simplify, add tests Signed-off-by: Yuri Shkuro --- plugin/storage/es/factory.go | 35 +++++------- plugin/storage/es/factory_test.go | 91 +++++++++++++++++++++++++++---- 2 files changed, 92 insertions(+), 34 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index da942b605de..b770bfa794a 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -291,36 +291,27 @@ func (f *Factory) Close() error { } func (f *Factory) onPrimaryPasswordChange() { - newPrimaryPassword, err := loadTokenFromFile(f.primaryConfig.PasswordFilePath) - if err != nil { - f.logger.Error("failed to reload password for primary Elasticsearch client", zap.Error(err)) - return - } - newPrimaryCfg := *f.primaryConfig // copy by value - newPrimaryCfg.Password = newPrimaryPassword - newPrimaryCfg.PasswordFilePath = "" // avoid error that both are set - primaryClient, err := f.newClientFn(&newPrimaryCfg, f.logger, f.metricsFactory) - if err != nil { - f.logger.Error("failed to recreate primary Elasticsearch client from new password", zap.Error(err)) - } else { - f.primaryClient.Store(&primaryClient) - } + f.onClientPasswordChange(f.primaryConfig, &f.primaryClient) } func (f *Factory) onArchivePasswordChange() { - newPassword, err := loadTokenFromFile(f.archiveConfig.PasswordFilePath) + f.onClientPasswordChange(f.archiveConfig, &f.archiveClient) +} + +func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atomic.Pointer[es.Client]) { + newPassword, err := loadTokenFromFile(cfg.PasswordFilePath) if err != nil { - f.logger.Error("failed to reload password for archive Elasticsearch client", zap.Error(err)) + f.logger.Error("failed to reload password for Elasticsearch client", zap.Error(err)) return } - newArchiveCfg := *f.archiveConfig // copy by value - newArchiveCfg.Password = newPassword - newArchiveCfg.PasswordFilePath = "" // avoid error that both are set - archiveClient, err := f.newClientFn(&newArchiveCfg, f.logger, f.metricsFactory) + newCfg := *cfg // copy by value + newCfg.Password = newPassword + newCfg.PasswordFilePath = "" // avoid error that both are set + primaryClient, err := f.newClientFn(&newCfg, f.logger, f.metricsFactory) if err != nil { - f.logger.Error("failed to recreate archive Elasticsearch client from new password", zap.Error(err)) + f.logger.Error("failed to recreate Elasticsearch client with new password", zap.Error(err)) } else { - f.archiveClient.Store(&archiveClient) + client.Store(&primaryClient) } } diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index 7f291e2c369..a016d1e86f2 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -40,11 +40,21 @@ import ( escfg "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/storage" + "github.com/jaegertracing/jaeger/storage/spanstore" ) var _ storage.Factory = new(Factory) +var mockEsServerResponse = []byte(` +{ + "Version": { + "Number": "6" + } +} +`) + type mockClientBuilder struct { err error createTemplateError error @@ -252,6 +262,25 @@ func TestInitFromOptions(t *testing.T) { } func TestPasswordFromFile(t *testing.T) { + t.Run("primary client", func(t *testing.T) { + f := NewFactory() + testPasswordFromFile(t, f, f.getPrimaryClient, f.CreateSpanWriter) + }) + + t.Run("archive client", func(t *testing.T) { + f2 := NewFactory() + testPasswordFromFile(t, f2, f2.getArchiveClient, f2.CreateArchiveSpanWriter) + }) + + t.Run("load token error", func(t *testing.T) { + file := filepath.Join(t.TempDir(), "does not exist") + token, err := loadTokenFromFile(file) + assert.Error(t, err) + assert.Equal(t, "", token) + }) +} + +func testPasswordFromFile(t *testing.T, f *Factory, getClient func() es.Client, getWriter func() (spanstore.Writer, error)) { var authReceived atomic.Pointer[string] server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { t.Logf("request to fake ES server: %v", r) @@ -262,30 +291,28 @@ func TestPasswordFromFile(t *testing.T) { assert.NoError(t, err, "header: %s", h) auth := string(authBytes) authReceived.Store(&auth) - w.Write([]byte(` - { - "Version": { - "Number": "6" - } - } - `)) + w.Write(mockEsServerResponse) })) defer server.Close() pwdFile := filepath.Join(t.TempDir(), "pwd") require.NoError(t, os.WriteFile(pwdFile, []byte("first password"), 0o600)) - f := NewFactory() f.primaryConfig = &escfg.Configuration{ Servers: []string{server.URL}, LogLevel: "debug", PasswordFilePath: pwdFile, } - f.archiveConfig = &escfg.Configuration{} + f.archiveConfig = &escfg.Configuration{ + Enabled: true, + Servers: []string{server.URL}, + LogLevel: "debug", + PasswordFilePath: pwdFile, + } require.NoError(t, f.Initialize(metrics.NullFactory, zaptest.NewLogger(t))) defer f.Close() - writer, err := f.CreateSpanWriter() + writer, err := getWriter() require.NoError(t, err) span := &model.Span{ Process: &model.Process{ServiceName: "foo"}, @@ -293,11 +320,12 @@ func TestPasswordFromFile(t *testing.T) { require.NoError(t, writer.WriteSpan(context.Background(), span)) require.Equal(t, ":first password", *authReceived.Load()) - client1 := f.getPrimaryClient() + // replace password in the file + client1 := getClient() require.NoError(t, os.WriteFile(pwdFile, []byte("second password"), 0o600)) assert.Eventually(t, func() bool { - client2 := f.getPrimaryClient() + client2 := getClient() return client1 != client2 }, 5*time.Second, time.Millisecond, @@ -306,3 +334,42 @@ func TestPasswordFromFile(t *testing.T) { require.NoError(t, writer.WriteSpan(context.Background(), span)) require.Equal(t, ":second password", *authReceived.Load()) } + +func TestPasswordFromFileErrors(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write(mockEsServerResponse) + })) + defer server.Close() + + pwdFile := filepath.Join(t.TempDir(), "pwd") + require.NoError(t, os.WriteFile(pwdFile, []byte("first password"), 0o600)) + + f := NewFactory() + f.primaryConfig = &escfg.Configuration{ + Servers: []string{server.URL}, + LogLevel: "debug", + PasswordFilePath: pwdFile, + } + f.archiveConfig = &escfg.Configuration{ + Servers: []string{server.URL}, + LogLevel: "debug", + PasswordFilePath: pwdFile, + } + + logger, buf := testutils.NewEchoLogger(t) + require.NoError(t, f.Initialize(metrics.NullFactory, logger)) + defer f.Close() + + f.primaryConfig.Servers = []string{} + f.onPrimaryPasswordChange() + assert.Contains(t, buf.String(), "no servers specified") + + f.archiveConfig.Servers = []string{} + buf.Reset() + f.onArchivePasswordChange() + assert.Contains(t, buf.String(), "no servers specified") + + require.NoError(t, os.Remove(pwdFile)) + f.onPrimaryPasswordChange() + f.onArchivePasswordChange() +} From 842243cd9a882f0026313b8c335ef60bc8c09446 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 8 Sep 2023 20:29:54 -0400 Subject: [PATCH 14/14] add logging Signed-off-by: Yuri Shkuro --- plugin/storage/es/factory.go | 1 + plugin/storage/es/factory_test.go | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index b770bfa794a..94bf28e57cd 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -304,6 +304,7 @@ func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atom f.logger.Error("failed to reload password for Elasticsearch client", zap.Error(err)) return } + f.logger.Sugar().Infof("loaded new password of length %d from file", len(newPassword)) newCfg := *cfg // copy by value newCfg.Password = newPassword newCfg.PasswordFilePath = "" // avoid error that both are set diff --git a/plugin/storage/es/factory_test.go b/plugin/storage/es/factory_test.go index a016d1e86f2..6afd7f148ef 100644 --- a/plugin/storage/es/factory_test.go +++ b/plugin/storage/es/factory_test.go @@ -320,9 +320,11 @@ func testPasswordFromFile(t *testing.T, f *Factory, getClient func() es.Client, require.NoError(t, writer.WriteSpan(context.Background(), span)) require.Equal(t, ":first password", *authReceived.Load()) - // replace password in the file + t.Log("replace password in the file") client1 := getClient() - require.NoError(t, os.WriteFile(pwdFile, []byte("second password"), 0o600)) + newPwdFile := filepath.Join(t.TempDir(), "pwd2") + require.NoError(t, os.WriteFile(newPwdFile, []byte("second password"), 0o600)) + require.NoError(t, os.Rename(newPwdFile, pwdFile)) assert.Eventually(t, func() bool { client2 := getClient()