Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename Metadata API to MetricMetadata API #3877

Merged
merged 1 commit into from
Mar 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ func NewMetricMetadataHandler(client metadata.UnaryClient, enablePartialResponse
}

return func(r *http.Request) (interface{}, []error, *api.ApiError) {
req := &metadatapb.MetadataRequest{
req := &metadatapb.MetricMetadataRequest{
// By default we use -1, which means no limit.
Limit: -1,
Metric: r.URL.Query().Get("metric"),
Expand All @@ -824,7 +824,7 @@ func NewMetricMetadataHandler(client metadata.UnaryClient, enablePartialResponse
req.Limit = int32(limit)
}

t, warnings, err := client.Metadata(r.Context(), req)
t, warnings, err := client.MetricMetadata(r.Context(), req)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving metadata")}
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (

var _ UnaryClient = &GRPCClient{}

// UnaryClient is gRPC metadatapb.Metadata client which expands streaming metadata API. Useful for consumers that does not
// UnaryClient is a gRPC metadatapb.Metadata client which expands streaming metadata API. Useful for consumers that does not
// support streaming.
type UnaryClient interface {
Metadata(ctx context.Context, req *metadatapb.MetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error)
MetricMetadata(ctx context.Context, req *metadatapb.MetricMetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error)
}

// GRPCClient allows to retrieve metadata from local gRPC streaming server implementation.
Expand All @@ -31,7 +31,7 @@ func NewGRPCClient(ts metadatapb.MetadataServer) *GRPCClient {
}
}

func (rr *GRPCClient) Metadata(ctx context.Context, req *metadatapb.MetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error) {
func (rr *GRPCClient) MetricMetadata(ctx context.Context, req *metadatapb.MetricMetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error) {
srv := &metadataServer{ctx: ctx, metric: req.Metric, limit: int(req.Limit)}

if req.Limit >= 0 {
Expand All @@ -46,16 +46,16 @@ func (rr *GRPCClient) Metadata(ctx context.Context, req *metadatapb.MetadataRequ
srv.metadataMap = make(map[string][]metadatapb.Meta)
}

if err := rr.proxy.Metadata(req, srv); err != nil {
return nil, nil, errors.Wrap(err, "proxy Metadata")
if err := rr.proxy.MetricMetadata(req, srv); err != nil {
return nil, nil, errors.Wrap(err, "proxy MetricMetadata")
}

return srv.metadataMap, srv.warnings, nil
}

type metadataServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
metadatapb.Metadata_MetadataServer
metadatapb.Metadata_MetricMetadataServer
ctx context.Context

metric string
Expand All @@ -65,7 +65,7 @@ type metadataServer struct {
metadataMap map[string][]metadatapb.Meta
}

func (srv *metadataServer) Send(res *metadatapb.MetadataResponse) error {
func (srv *metadataServer) Send(res *metadatapb.MetricMetadataResponse) error {
if res.GetWarning() != "" {
srv.warnings = append(srv.warnings, errors.New(res.GetWarning()))
return nil
Expand Down
12 changes: 6 additions & 6 deletions pkg/metadata/metadatapb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ import (
"unsafe"
)

func NewMetadataResponse(metadata *MetricMetadata) *MetadataResponse {
return &MetadataResponse{
Result: &MetadataResponse_Metadata{
func NewMetricMetadataResponse(metadata *MetricMetadata) *MetricMetadataResponse {
return &MetricMetadataResponse{
Result: &MetricMetadataResponse_Metadata{
Metadata: metadata,
},
}
}

func NewWarningMetadataResponse(warning error) *MetadataResponse {
return &MetadataResponse{
Result: &MetadataResponse_Warning{
func NewWarningMetadataResponse(warning error) *MetricMetadataResponse {
return &MetricMetadataResponse{
Result: &MetricMetadataResponse_Warning{
Warning: warning.Error(),
},
}
Expand Down
241 changes: 121 additions & 120 deletions pkg/metadata/metadatapb/rpc.pb.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pkg/metadata/metadatapb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ option (gogoproto.goproto_unrecognized_all) = false;
option (gogoproto.goproto_sizecache_all) = false;

service Metadata {
rpc Metadata(MetadataRequest) returns (stream MetadataResponse);
rpc MetricMetadata(MetricMetadataRequest) returns (stream MetricMetadataResponse);
}

message MetadataRequest {
message MetricMetadataRequest {
string metric = 1;
int32 limit = 2;
PartialResponseStrategy partial_response_strategy = 3;
}

message MetadataResponse {
message MetricMetadataResponse {
oneof result {
/// A collection of metric metadata entries.
MetricMetadata metadata = 1;
Expand Down
10 changes: 5 additions & 5 deletions pkg/metadata/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/thanos-io/thanos/pkg/promclient"
)

// Prometheus implements metadatapb.Metadata gRPC that allows to fetch metric metadata from Prometheus HTTP /api/v1/metadata endpoint.
// Prometheus implements metadatapb.Metadata gRPC service that allows to fetch metric metadata from Prometheus HTTP /api/v1/metadata endpoint.
type Prometheus struct {
base *url.URL
client *promclient.Client
Expand All @@ -24,13 +24,13 @@ func NewPrometheus(base *url.URL, client *promclient.Client) *Prometheus {
}
}

// Metadata returns all specified metric metadata from Prometheus.
func (p *Prometheus) Metadata(r *metadatapb.MetadataRequest, s metadatapb.Metadata_MetadataServer) error {
md, err := p.client.MetadataInGRPC(s.Context(), p.base, r.Metric, int(r.Limit))
// MetricMetadata returns all specified metric metadata from Prometheus.
func (p *Prometheus) MetricMetadata(r *metadatapb.MetricMetadataRequest, s metadatapb.Metadata_MetricMetadataServer) error {
md, err := p.client.MetricMetadataInGRPC(s.Context(), p.base, r.Metric, int(r.Limit))
if err != nil {
return err
}

return s.Send(&metadatapb.MetadataResponse{Result: &metadatapb.MetadataResponse_Metadata{
return s.Send(&metadatapb.MetricMetadataResponse{Result: &metadatapb.MetricMetadataResponse_Metadata{
Metadata: metadatapb.FromMetadataMap(md)}})
}
2 changes: 1 addition & 1 deletion pkg/metadata/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ scrape_configs:
},
} {
t.Run(tcase.name, func(t *testing.T) {
meta, w, err := NewGRPCClient(prom).Metadata(context.Background(), &metadatapb.MetadataRequest{
meta, w, err := NewGRPCClient(prom).MetricMetadata(context.Background(), &metadatapb.MetricMetadataRequest{
Metric: tcase.metric,
Limit: tcase.limit,
})
Expand Down
22 changes: 11 additions & 11 deletions pkg/metadata/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ func NewProxy(logger log.Logger, metadata func() []metadatapb.MetadataClient) *P
}
}

func (s *Proxy) Metadata(req *metadatapb.MetadataRequest, srv metadatapb.Metadata_MetadataServer) error {
func (s *Proxy) MetricMetadata(req *metadatapb.MetricMetadataRequest, srv metadatapb.Metadata_MetricMetadataServer) error {
var (
g, gctx = errgroup.WithContext(srv.Context())
respChan = make(chan *metadatapb.MetricMetadata, 10)
metas []*metadatapb.MetricMetadata
)

for _, metadataClient := range s.metadata() {
rs := &metadataStream{
rs := &metricMetadataStream{
client: metadataClient,
request: req,
channel: respChan,
Expand All @@ -71,25 +71,25 @@ func (s *Proxy) Metadata(req *metadatapb.MetadataRequest, srv metadatapb.Metadat
}

for _, t := range metas {
if err := srv.Send(metadatapb.NewMetadataResponse(t)); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send metadata response").Error())
if err := srv.Send(metadatapb.NewMetricMetadataResponse(t)); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send metric metadata response").Error())
}
}

return nil
}

type metadataStream struct {
type metricMetadataStream struct {
client metadatapb.MetadataClient
request *metadatapb.MetadataRequest
request *metadatapb.MetricMetadataRequest
channel chan<- *metadatapb.MetricMetadata
server metadatapb.Metadata_MetadataServer
server metadatapb.Metadata_MetricMetadataServer
}

func (stream *metadataStream) receive(ctx context.Context) error {
metadataCli, err := stream.client.Metadata(ctx, stream.request)
func (stream *metricMetadataStream) receive(ctx context.Context) error {
metadataCli, err := stream.client.MetricMetadata(ctx, stream.request)
if err != nil {
err = errors.Wrapf(err, "fetching metadata from metadata client %v", stream.client)
err = errors.Wrapf(err, "fetching metric metadata from metadata client %v", stream.client)

if stream.request.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT {
return err
Expand All @@ -109,7 +109,7 @@ func (stream *metadataStream) receive(ctx context.Context) error {
}

if err != nil {
err = errors.Wrapf(err, "receiving metadata from metadata client %v", stream.client)
err = errors.Wrapf(err, "receiving metric metadata from metadata client %v", stream.client)

if stream.request.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT {
return err
Expand Down
5 changes: 3 additions & 2 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,8 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin
return m.Data.Groups, nil
}

func (c *Client) MetadataInGRPC(ctx context.Context, base *url.URL, metric string, limit int) (map[string][]metadatapb.Meta, error) {
// MetricMetadataInGRPC returns the metadata from Prometheus metric metadata API. It uses gRPC errors.
func (c *Client) MetricMetadataInGRPC(ctx context.Context, base *url.URL, metric string, limit int) (map[string][]metadatapb.Meta, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/metadata")
q := u.Query()
Expand All @@ -756,5 +757,5 @@ func (c *Client) MetadataInGRPC(ctx context.Context, base *url.URL, metric strin
var v struct {
Data map[string][]metadatapb.Meta `json:"data"`
}
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/metadata HTTP[client]", &u, &v)
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_metric_metadata HTTP[client]", &u, &v)
}
12 changes: 6 additions & 6 deletions test/e2e/metadata_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,32 +64,32 @@ func TestMetadataAPI_Fanout(t *testing.T) {
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics))
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"thanos_query_metadata_apis_dns_provider_results"}, e2e.WaitMissingMetrics))

promMeta, err := promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+prom1.HTTPEndpoint()), "", -1)
promMeta, err := promclient.NewDefaultClient().MetricMetadataInGRPC(ctx, mustURLParse(t, "http://"+prom1.HTTPEndpoint()), "", -1)
testutil.Ok(t, err)

thanosMeta, err := promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", -1)
thanosMeta, err := promclient.NewDefaultClient().MetricMetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", -1)
testutil.Ok(t, err)

// Metadata response from Prometheus and Thanos Querier should be the same after deduplication.
testutil.Equals(t, thanosMeta, promMeta)

// We only expect to see one metadata returned.
thanosMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 1)
thanosMeta, err = promclient.NewDefaultClient().MetricMetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 1)
testutil.Ok(t, err)
testutil.Assert(t, true, len(thanosMeta) == 1)

// We only expect to see ten metadata returned.
thanosMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 10)
thanosMeta, err = promclient.NewDefaultClient().MetricMetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 10)
testutil.Ok(t, err)
testutil.Assert(t, true, len(thanosMeta) == 10)

// No metadata returned.
thanosMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 0)
thanosMeta, err = promclient.NewDefaultClient().MetricMetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 0)
testutil.Ok(t, err)
testutil.Assert(t, true, len(thanosMeta) == 0)

// Only prometheus_build_info metric will be returned.
thanosMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "prometheus_build_info", -1)
thanosMeta, err = promclient.NewDefaultClient().MetricMetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "prometheus_build_info", -1)
testutil.Ok(t, err)
testutil.Assert(t, true, len(thanosMeta) == 1 && len(thanosMeta["prometheus_build_info"]) > 0)
}