Skip to content

Commit

Permalink
rename Metadata API to MetricMetadata API (#3877)
Browse files Browse the repository at this point in the history
Signed-off-by: yeya24 <[email protected]>
  • Loading branch information
yeya24 authored Mar 5, 2021
1 parent 93f2605 commit af4ee3a
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 163 deletions.
4 changes: 2 additions & 2 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ func NewMetricMetadataHandler(client metadata.UnaryClient, enablePartialResponse
}

return func(r *http.Request) (interface{}, []error, *api.ApiError) {
req := &metadatapb.MetadataRequest{
req := &metadatapb.MetricMetadataRequest{
// By default we use -1, which means no limit.
Limit: -1,
Metric: r.URL.Query().Get("metric"),
Expand All @@ -824,7 +824,7 @@ func NewMetricMetadataHandler(client metadata.UnaryClient, enablePartialResponse
req.Limit = int32(limit)
}

t, warnings, err := client.Metadata(r.Context(), req)
t, warnings, err := client.MetricMetadata(r.Context(), req)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Wrap(err, "retrieving metadata")}
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (

var _ UnaryClient = &GRPCClient{}

// UnaryClient is gRPC metadatapb.Metadata client which expands streaming metadata API. Useful for consumers that does not
// UnaryClient is a gRPC metadatapb.Metadata client which expands streaming metadata API. Useful for consumers that does not
// support streaming.
type UnaryClient interface {
Metadata(ctx context.Context, req *metadatapb.MetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error)
MetricMetadata(ctx context.Context, req *metadatapb.MetricMetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error)
}

// GRPCClient allows to retrieve metadata from local gRPC streaming server implementation.
Expand All @@ -31,7 +31,7 @@ func NewGRPCClient(ts metadatapb.MetadataServer) *GRPCClient {
}
}

func (rr *GRPCClient) Metadata(ctx context.Context, req *metadatapb.MetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error) {
func (rr *GRPCClient) MetricMetadata(ctx context.Context, req *metadatapb.MetricMetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error) {
srv := &metadataServer{ctx: ctx, metric: req.Metric, limit: int(req.Limit)}

if req.Limit >= 0 {
Expand All @@ -46,16 +46,16 @@ func (rr *GRPCClient) Metadata(ctx context.Context, req *metadatapb.MetadataRequ
srv.metadataMap = make(map[string][]metadatapb.Meta)
}

if err := rr.proxy.Metadata(req, srv); err != nil {
return nil, nil, errors.Wrap(err, "proxy Metadata")
if err := rr.proxy.MetricMetadata(req, srv); err != nil {
return nil, nil, errors.Wrap(err, "proxy MetricMetadata")
}

return srv.metadataMap, srv.warnings, nil
}

type metadataServer struct {
// This field just exist to pseudo-implement the unused methods of the interface.
metadatapb.Metadata_MetadataServer
metadatapb.Metadata_MetricMetadataServer
ctx context.Context

metric string
Expand All @@ -65,7 +65,7 @@ type metadataServer struct {
metadataMap map[string][]metadatapb.Meta
}

func (srv *metadataServer) Send(res *metadatapb.MetadataResponse) error {
func (srv *metadataServer) Send(res *metadatapb.MetricMetadataResponse) error {
if res.GetWarning() != "" {
srv.warnings = append(srv.warnings, errors.New(res.GetWarning()))
return nil
Expand Down
12 changes: 6 additions & 6 deletions pkg/metadata/metadatapb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ import (
"unsafe"
)

func NewMetadataResponse(metadata *MetricMetadata) *MetadataResponse {
return &MetadataResponse{
Result: &MetadataResponse_Metadata{
func NewMetricMetadataResponse(metadata *MetricMetadata) *MetricMetadataResponse {
return &MetricMetadataResponse{
Result: &MetricMetadataResponse_Metadata{
Metadata: metadata,
},
}
}

func NewWarningMetadataResponse(warning error) *MetadataResponse {
return &MetadataResponse{
Result: &MetadataResponse_Warning{
func NewWarningMetadataResponse(warning error) *MetricMetadataResponse {
return &MetricMetadataResponse{
Result: &MetricMetadataResponse_Warning{
Warning: warning.Error(),
},
}
Expand Down
241 changes: 121 additions & 120 deletions pkg/metadata/metadatapb/rpc.pb.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pkg/metadata/metadatapb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ option (gogoproto.goproto_unrecognized_all) = false;
option (gogoproto.goproto_sizecache_all) = false;

service Metadata {
rpc Metadata(MetadataRequest) returns (stream MetadataResponse);
rpc MetricMetadata(MetricMetadataRequest) returns (stream MetricMetadataResponse);
}

message MetadataRequest {
message MetricMetadataRequest {
string metric = 1;
int32 limit = 2;
PartialResponseStrategy partial_response_strategy = 3;
}

message MetadataResponse {
message MetricMetadataResponse {
oneof result {
/// A collection of metric metadata entries.
MetricMetadata metadata = 1;
Expand Down
10 changes: 5 additions & 5 deletions pkg/metadata/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/thanos-io/thanos/pkg/promclient"
)

// Prometheus implements metadatapb.Metadata gRPC that allows to fetch metric metadata from Prometheus HTTP /api/v1/metadata endpoint.
// Prometheus implements metadatapb.Metadata gRPC service that allows to fetch metric metadata from Prometheus HTTP /api/v1/metadata endpoint.
type Prometheus struct {
base *url.URL
client *promclient.Client
Expand All @@ -24,13 +24,13 @@ func NewPrometheus(base *url.URL, client *promclient.Client) *Prometheus {
}
}

// Metadata returns all specified metric metadata from Prometheus.
func (p *Prometheus) Metadata(r *metadatapb.MetadataRequest, s metadatapb.Metadata_MetadataServer) error {
md, err := p.client.MetadataInGRPC(s.Context(), p.base, r.Metric, int(r.Limit))
// MetricMetadata returns all specified metric metadata from Prometheus.
func (p *Prometheus) MetricMetadata(r *metadatapb.MetricMetadataRequest, s metadatapb.Metadata_MetricMetadataServer) error {
md, err := p.client.MetricMetadataInGRPC(s.Context(), p.base, r.Metric, int(r.Limit))
if err != nil {
return err
}

return s.Send(&metadatapb.MetadataResponse{Result: &metadatapb.MetadataResponse_Metadata{
return s.Send(&metadatapb.MetricMetadataResponse{Result: &metadatapb.MetricMetadataResponse_Metadata{
Metadata: metadatapb.FromMetadataMap(md)}})
}
2 changes: 1 addition & 1 deletion pkg/metadata/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ scrape_configs:
},
} {
t.Run(tcase.name, func(t *testing.T) {
meta, w, err := NewGRPCClient(prom).Metadata(context.Background(), &metadatapb.MetadataRequest{
meta, w, err := NewGRPCClient(prom).MetricMetadata(context.Background(), &metadatapb.MetricMetadataRequest{
Metric: tcase.metric,
Limit: tcase.limit,
})
Expand Down
22 changes: 11 additions & 11 deletions pkg/metadata/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ func NewProxy(logger log.Logger, metadata func() []metadatapb.MetadataClient) *P
}
}

func (s *Proxy) Metadata(req *metadatapb.MetadataRequest, srv metadatapb.Metadata_MetadataServer) error {
func (s *Proxy) MetricMetadata(req *metadatapb.MetricMetadataRequest, srv metadatapb.Metadata_MetricMetadataServer) error {
var (
g, gctx = errgroup.WithContext(srv.Context())
respChan = make(chan *metadatapb.MetricMetadata, 10)
metas []*metadatapb.MetricMetadata
)

for _, metadataClient := range s.metadata() {
rs := &metadataStream{
rs := &metricMetadataStream{
client: metadataClient,
request: req,
channel: respChan,
Expand All @@ -71,25 +71,25 @@ func (s *Proxy) Metadata(req *metadatapb.MetadataRequest, srv metadatapb.Metadat
}

for _, t := range metas {
if err := srv.Send(metadatapb.NewMetadataResponse(t)); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send metadata response").Error())
if err := srv.Send(metadatapb.NewMetricMetadataResponse(t)); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send metric metadata response").Error())
}
}

return nil
}

type metadataStream struct {
type metricMetadataStream struct {
client metadatapb.MetadataClient
request *metadatapb.MetadataRequest
request *metadatapb.MetricMetadataRequest
channel chan<- *metadatapb.MetricMetadata
server metadatapb.Metadata_MetadataServer
server metadatapb.Metadata_MetricMetadataServer
}

func (stream *metadataStream) receive(ctx context.Context) error {
metadataCli, err := stream.client.Metadata(ctx, stream.request)
func (stream *metricMetadataStream) receive(ctx context.Context) error {
metadataCli, err := stream.client.MetricMetadata(ctx, stream.request)
if err != nil {
err = errors.Wrapf(err, "fetching metadata from metadata client %v", stream.client)
err = errors.Wrapf(err, "fetching metric metadata from metadata client %v", stream.client)

if stream.request.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT {
return err
Expand All @@ -109,7 +109,7 @@ func (stream *metadataStream) receive(ctx context.Context) error {
}

if err != nil {
err = errors.Wrapf(err, "receiving metadata from metadata client %v", stream.client)
err = errors.Wrapf(err, "receiving metric metadata from metadata client %v", stream.client)

if stream.request.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT {
return err
Expand Down
5 changes: 3 additions & 2 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,8 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin
return m.Data.Groups, nil
}

func (c *Client) MetadataInGRPC(ctx context.Context, base *url.URL, metric string, limit int) (map[string][]metadatapb.Meta, error) {
// MetricMetadataInGRPC returns the metadata from Prometheus metric metadata API. It uses gRPC errors.
func (c *Client) MetricMetadataInGRPC(ctx context.Context, base *url.URL, metric string, limit int) (map[string][]metadatapb.Meta, error) {
u := *base
u.Path = path.Join(u.Path, "/api/v1/metadata")
q := u.Query()
Expand All @@ -756,5 +757,5 @@ func (c *Client) MetadataInGRPC(ctx context.Context, base *url.URL, metric strin
var v struct {
Data map[string][]metadatapb.Meta `json:"data"`
}
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/metadata HTTP[client]", &u, &v)
return v.Data, c.get2xxResultWithGRPCErrors(ctx, "/prom_metric_metadata HTTP[client]", &u, &v)
}
12 changes: 6 additions & 6 deletions test/e2e/metadata_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,32 +64,32 @@ func TestMetadataAPI_Fanout(t *testing.T) {
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"thanos_store_nodes_grpc_connections"}, e2e.WaitMissingMetrics))
testutil.Ok(t, q.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"thanos_query_metadata_apis_dns_provider_results"}, e2e.WaitMissingMetrics))

promMeta, err := promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+prom1.HTTPEndpoint()), "", -1)
promMeta, err := promclient.NewDefaultClient().MetricMetadataInGRPC(ctx, mustURLParse(t, "http://"+prom1.HTTPEndpoint()), "", -1)
testutil.Ok(t, err)

thanosMeta, err := promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", -1)
thanosMeta, err := promclient.NewDefaultClient().MetricMetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", -1)
testutil.Ok(t, err)

// Metadata response from Prometheus and Thanos Querier should be the same after deduplication.
testutil.Equals(t, thanosMeta, promMeta)

// We only expect to see one metadata returned.
thanosMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 1)
thanosMeta, err = promclient.NewDefaultClient().MetricMetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 1)
testutil.Ok(t, err)
testutil.Assert(t, true, len(thanosMeta) == 1)

// We only expect to see ten metadata returned.
thanosMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 10)
thanosMeta, err = promclient.NewDefaultClient().MetricMetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 10)
testutil.Ok(t, err)
testutil.Assert(t, true, len(thanosMeta) == 10)

// No metadata returned.
thanosMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 0)
thanosMeta, err = promclient.NewDefaultClient().MetricMetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "", 0)
testutil.Ok(t, err)
testutil.Assert(t, true, len(thanosMeta) == 0)

// Only prometheus_build_info metric will be returned.
thanosMeta, err = promclient.NewDefaultClient().MetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "prometheus_build_info", -1)
thanosMeta, err = promclient.NewDefaultClient().MetricMetadataInGRPC(ctx, mustURLParse(t, "http://"+q.HTTPEndpoint()), "prometheus_build_info", -1)
testutil.Ok(t, err)
testutil.Assert(t, true, len(thanosMeta) == 1 && len(thanosMeta["prometheus_build_info"]) > 0)
}

0 comments on commit af4ee3a

Please sign in to comment.