Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Endpointset: Do not use info client to obtain metadata (for now) #4714

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

### Fixed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏽


- [#4714](https://github.com/thanos-io/thanos/pull/4714) Endpointset: Do not use info client to obtain metadata.

## [v0.23.0](https://github.com/thanos-io/thanos/tree/release-0.23) - 2021.09.23

### Added
Expand Down
39 changes: 29 additions & 10 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import (
)

const (
unhealthyEndpointMessage = "removing endpoint because it's unhealthy or does not exist"
unhealthyEndpointMessage = "removing endpoint because it's unhealthy or does not exist"
noMetadataEndpointMessage = "cannot obtain metadata: neither info nor store client found"

// Default minimum and maximum time values used by Prometheus when they are not passed as query parameter.
MinTime = -9223309901257974
Expand Down Expand Up @@ -76,17 +77,27 @@ func (es *grpcEndpointSpec) Addr() string {
// Metadata method for gRPC endpoint tries to call InfoAPI exposed by Thanos components until context timeout. If we are unable to get metadata after
// that time, we assume that the host is unhealthy and return error.
func (es *grpcEndpointSpec) Metadata(ctx context.Context, client *endpointClients) (*endpointMetadata, error) {
resp, err := client.info.Info(ctx, &infopb.InfoRequest{}, grpc.WaitForReady(true))
if err != nil {
// Call Info method of StoreAPI, this way querier will be able to discovery old components not exposing InfoAPI.
metadata, merr := es.getMetadataUsingStoreAPI(ctx, client.store)
if merr != nil {
return nil, errors.Wrapf(merr, "fallback fetching info from %s after err: %v", es.addr, err)
// TODO(@matej-g): Info client should not be used due to https://github.com/thanos-io/thanos/issues/4699
// Uncomment this after it is implemented in https://github.com/thanos-io/thanos/pull/4282.
// if client.info != nil {
// resp, err := client.info.Info(ctx, &infopb.InfoRequest{}, grpc.WaitForReady(true))
// if err != nil {
// return nil, errors.Wrapf(err, "fetching info from %s", es.addr)
// }

// return &endpointMetadata{resp}, nil
// }

// Call Info method of StoreAPI, this way querier will be able to discovery old components not exposing InfoAPI.
if client.store != nil {
metadata, err := es.getMetadataUsingStoreAPI(ctx, client.store)
if err != nil {
return nil, errors.Wrapf(err, "fallback fetching info from %s", es.addr)
}
return metadata, nil
}

return &endpointMetadata{resp}, nil
return nil, errors.New(noMetadataEndpointMessage)
}

func (es *grpcEndpointSpec) getMetadataUsingStoreAPI(ctx context.Context, client storepb.StoreClient) (*endpointMetadata, error) {
Expand Down Expand Up @@ -494,7 +505,9 @@ func (e *EndpointSet) getActiveEndpoints(ctx context.Context, endpoints map[stri
logger: e.logger,
StoreClient: storepb.NewStoreClient(conn),
clients: &endpointClients{
info: infopb.NewInfoClient(conn),
// TODO(@matej-g): Info client should not be used due to https://github.com/thanos-io/thanos/issues/4699
// Uncomment this after it is implemented in https://github.com/thanos-io/thanos/pull/4282.
// info: infopb.NewInfoClient(conn),
store: storepb.NewStoreClient(conn),
},
}
Expand Down Expand Up @@ -668,6 +681,10 @@ func (er *endpointRef) ComponentType() component.Component {
er.mtx.RLock()
defer er.mtx.RUnlock()

if er.metadata == nil {
return component.UnknownStoreAPI
}

return component.FromString(er.metadata.ComponentType)
}

Expand Down Expand Up @@ -786,13 +803,15 @@ func (er *endpointRef) apisPresent() []string {
return apisPresent
}

// TODO(@matej-g): Info client should not be used due to https://github.com/thanos-io/thanos/issues/4699
// Uncomment the nolint directive after https://github.com/thanos-io/thanos/pull/4282.
type endpointClients struct {
store storepb.StoreClient
rule rulespb.RulesClient
metricMetadata metadatapb.MetadataClient
exemplar exemplarspb.ExemplarsClient
target targetspb.TargetsClient
info infopb.InfoClient
info infopb.InfoClient //nolint:structcheck,unused
}

type endpointMetadata struct {
Expand Down
70 changes: 66 additions & 4 deletions pkg/query/endpointset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil"
)

Expand Down Expand Up @@ -58,7 +59,11 @@ var (
}
ruleInfo = &infopb.InfoResponse{
ComponentType: component.Rule.String(),
Rules: &infopb.RulesInfo{},
Store: &infopb.StoreInfo{
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
},
Rules: &infopb.RulesInfo{},
}
storeGWInfo = &infopb.InfoResponse{
ComponentType: component.Store.String(),
Expand Down Expand Up @@ -93,6 +98,28 @@ func (c *mockedEndpoint) Info(ctx context.Context, r *infopb.InfoRequest) (*info
return &c.info, nil
}

type mockedStoreSrv struct {
infoDelay time.Duration
info storepb.InfoResponse
}

func (s *mockedStoreSrv) Info(context.Context, *storepb.InfoRequest) (*storepb.InfoResponse, error) {
if s.infoDelay > 0 {
time.Sleep(s.infoDelay)
}

return &s.info, nil
}
func (s *mockedStoreSrv) Series(*storepb.SeriesRequest, storepb.Store_SeriesServer) error {
return nil
}
func (s *mockedStoreSrv) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
return nil, nil
}
func (s *mockedStoreSrv) LabelValues(context.Context, *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
return nil, nil
}

type APIs struct {
store bool
metricMetadata bool
Expand All @@ -113,6 +140,25 @@ type testEndpoints struct {
exposedAPIs map[string]*APIs
}

func componentTypeToStoreType(componentType string) storepb.StoreType {
switch componentType {
case component.Query.String():
return storepb.StoreType_QUERY
case component.Rule.String():
return storepb.StoreType_RULE
case component.Sidecar.String():
return storepb.StoreType_SIDECAR
case component.Store.String():
return storepb.StoreType_STORE
case component.Receive.String():
return storepb.StoreType_RECEIVE
case component.Debug.String():
return storepb.StoreType_DEBUG
default:
return storepb.StoreType_STORE
}
}

func startTestEndpoints(testEndpointMeta []testEndpointMeta) (*testEndpoints, error) {
e := &testEndpoints{
srvs: map[string]*grpc.Server{},
Expand All @@ -130,6 +176,19 @@ func startTestEndpoints(testEndpointMeta []testEndpointMeta) (*testEndpoints, er
srv := grpc.NewServer()
addr := listener.Addr().String()

storeSrv := &mockedStoreSrv{
info: storepb.InfoResponse{
LabelSets: meta.extlsetFn(listener.Addr().String()),
StoreType: componentTypeToStoreType(meta.ComponentType),
},
infoDelay: meta.infoDelay,
}

if meta.Store != nil {
storeSrv.info.MinTime = meta.Store.MinTime
storeSrv.info.MaxTime = meta.Store.MaxTime
}

endpointSrv := &mockedEndpoint{
info: infopb.InfoResponse{
LabelSets: meta.extlsetFn(listener.Addr().String()),
Expand All @@ -143,6 +202,7 @@ func startTestEndpoints(testEndpointMeta []testEndpointMeta) (*testEndpoints, er
infoDelay: meta.infoDelay,
}
infopb.RegisterInfoServer(srv, endpointSrv)
storepb.RegisterStoreServer(srv, storeSrv)
go func() {
_ = srv.Serve(listener)
}()
Expand Down Expand Up @@ -859,7 +919,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) {
}
return endpointSpec
},
expectedStores: 4, // sidecar + querier + receiver + storeGW
expectedStores: 5, // sidecar + querier + receiver + storeGW + ruler
expectedRules: 3, // sidecar + querier + ruler
expectedTarget: 2, // sidecar + querier
expectedMetricMetadata: 2, // sidecar + querier
Expand Down Expand Up @@ -895,7 +955,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) {
NewGRPCEndpointSpec(endpoints.orderAddrs[1], false),
}
},
expectedStores: 1, // sidecar
expectedStores: 2, // sidecar + ruler
expectedRules: 2, // sidecar + ruler
expectedTarget: 1, // sidecar
expectedMetricMetadata: 1, // sidecar
Expand All @@ -908,7 +968,8 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) {
NewGRPCEndpointSpec(endpoints.orderAddrs[1], false),
}
},
expectedRules: 1, // ruler
expectedStores: 1, // ruler
expectedRules: 1, // ruler
},
},
},
Expand Down Expand Up @@ -1106,6 +1167,7 @@ func exposedAPIs(c string) *APIs {
}
case component.Rule.String():
return &APIs{
store: true,
rules: true,
}
case component.Store.String():
Expand Down