From c6b65e2a7e5bc12751117cac2a183ca673071bfe Mon Sep 17 00:00:00 2001 From: fpetkovski Date: Thu, 21 Apr 2022 08:08:30 +0200 Subject: [PATCH] Implement Query API discovery A recent commit (#5250) added a GRPC API to Thanos Query which allows executing PromQL over GRPC. This API is currently not discoverable through endpointsets which makes it hard for other Thanos components to use it. This commit extends endpointsets with a GetQueryAPIClients method which returns Query API clients to all components which support this API. --- cmd/thanos/query.go | 1 + pkg/info/info.go | 18 +++ pkg/info/infopb/rpc.pb.go | 234 +++++++++++++++++++++++++++++----- pkg/info/infopb/rpc.proto | 19 ++- pkg/query/endpointset.go | 33 +++++ pkg/query/endpointset_test.go | 1 + 6 files changed, 271 insertions(+), 35 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 7c65edf77d4..ce110edbd7e 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -675,6 +675,7 @@ func runQuery( info.WithRulesInfoFunc(), info.WithMetricMetadataInfoFunc(), info.WithTargetsInfoFunc(), + info.WithQueryAPIInfoFunc(), ) grpcAPI := apiv1.NewGRPCAPI(time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution) diff --git a/pkg/info/info.go b/pkg/info/info.go index 40df172a68f..f61fdef1875 100644 --- a/pkg/info/info.go +++ b/pkg/info/info.go @@ -25,6 +25,7 @@ type InfoServer struct { getRulesInfo func() *infopb.RulesInfo getTargetsInfo func() *infopb.TargetsInfo getMetricMetadataInfo func() *infopb.MetricMetadataInfo + getQueryAPIInfo func() *infopb.QueryAPIInfo } // NewInfoServer creates a new server instance for given component @@ -42,6 +43,7 @@ func NewInfoServer( getRulesInfo: func() *infopb.RulesInfo { return nil }, getTargetsInfo: func() *infopb.TargetsInfo { return nil }, getMetricMetadataInfo: func() *infopb.MetricMetadataInfo { return nil }, + getQueryAPIInfo: func() *infopb.QueryAPIInfo { return nil }, } for _, o := range options { @@ -144,6 +146,21 @@ func WithMetricMetadataInfoFunc(getMetricMetadataInfo ...func() *infopb.MetricMe } } +// WithQueryAPIInfoFunc determines the function that should be executed to obtain +// the query information. If no function is provided, the default empty +// query info is returned. Only the first function from the list is considered. +func WithQueryAPIInfoFunc(queryInfo ...func() *infopb.QueryAPIInfo) ServerOptionFunc { + if len(queryInfo) == 0 { + return func(s *InfoServer) { + s.getQueryAPIInfo = func() *infopb.QueryAPIInfo { return &infopb.QueryAPIInfo{} } + } + } + + return func(s *InfoServer) { + s.getQueryAPIInfo = queryInfo[0] + } +} + // RegisterInfoServer registers the info server. func RegisterInfoServer(infoSrv infopb.InfoServer) func(*grpc.Server) { return func(s *grpc.Server) { @@ -161,5 +178,6 @@ func (srv *InfoServer) Info(ctx context.Context, req *infopb.InfoRequest) (*info Rules: srv.getRulesInfo(), Targets: srv.getTargetsInfo(), MetricMetadata: srv.getMetricMetadataInfo(), + Query: srv.getQueryAPIInfo(), }, nil } diff --git a/pkg/info/infopb/rpc.pb.go b/pkg/info/infopb/rpc.pb.go index 3da712b3e59..703fa1ff80a 100644 --- a/pkg/info/infopb/rpc.pb.go +++ b/pkg/info/infopb/rpc.pb.go @@ -78,6 +78,8 @@ type InfoResponse struct { Targets *TargetsInfo `protobuf:"bytes,6,opt,name=targets,proto3" json:"targets,omitempty"` // ExemplarsInfo holds the metadata related to Exemplars API if exposed by the component otherwise it will be null. Exemplars *ExemplarsInfo `protobuf:"bytes,7,opt,name=exemplars,proto3" json:"exemplars,omitempty"` + // QueryAPIInfo holds the metadata related to Query API if exposed by the component, otherwise it will be null. + Query *QueryAPIInfo `protobuf:"bytes,8,opt,name=query,proto3" json:"query,omitempty"` } func (m *InfoResponse) Reset() { *m = InfoResponse{} } @@ -302,6 +304,43 @@ func (m *ExemplarsInfo) XXX_DiscardUnknown() { var xxx_messageInfo_ExemplarsInfo proto.InternalMessageInfo +// QueryInfo holds the metadata related to Query API exposed by the component. +type QueryAPIInfo struct { +} + +func (m *QueryAPIInfo) Reset() { *m = QueryAPIInfo{} } +func (m *QueryAPIInfo) String() string { return proto.CompactTextString(m) } +func (*QueryAPIInfo) ProtoMessage() {} +func (*QueryAPIInfo) Descriptor() ([]byte, []int) { + return fileDescriptor_a1214ec45d2bf952, []int{7} +} +func (m *QueryAPIInfo) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *QueryAPIInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_QueryAPIInfo.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *QueryAPIInfo) XXX_Merge(src proto.Message) { + xxx_messageInfo_QueryAPIInfo.Merge(m, src) +} +func (m *QueryAPIInfo) XXX_Size() int { + return m.Size() +} +func (m *QueryAPIInfo) XXX_DiscardUnknown() { + xxx_messageInfo_QueryAPIInfo.DiscardUnknown(m) +} + +var xxx_messageInfo_QueryAPIInfo proto.InternalMessageInfo + func init() { proto.RegisterType((*InfoRequest)(nil), "thanos.info.InfoRequest") proto.RegisterType((*InfoResponse)(nil), "thanos.info.InfoResponse") @@ -310,40 +349,43 @@ func init() { proto.RegisterType((*MetricMetadataInfo)(nil), "thanos.info.MetricMetadataInfo") proto.RegisterType((*TargetsInfo)(nil), "thanos.info.TargetsInfo") proto.RegisterType((*ExemplarsInfo)(nil), "thanos.info.ExemplarsInfo") + proto.RegisterType((*QueryAPIInfo)(nil), "thanos.info.QueryAPIInfo") } func init() { proto.RegisterFile("info/infopb/rpc.proto", fileDescriptor_a1214ec45d2bf952) } var fileDescriptor_a1214ec45d2bf952 = []byte{ - // 437 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0xcf, 0x6e, 0xd3, 0x40, - 0x10, 0xc6, 0xed, 0x26, 0x4d, 0xf0, 0x98, 0x80, 0x58, 0x15, 0xb4, 0xc9, 0xc1, 0x8d, 0xac, 0x1e, - 0x72, 0x40, 0xb6, 0x14, 0x24, 0x84, 0xc4, 0x89, 0x56, 0x95, 0x40, 0xa2, 0x17, 0x37, 0xa7, 0x5e, - 0xa2, 0x4d, 0x99, 0x06, 0x4b, 0xde, 0x3f, 0x78, 0xb7, 0x52, 0x7a, 0xe3, 0x11, 0x78, 0xac, 0x1c, - 0x7b, 0xe4, 0x84, 0x20, 0x79, 0x11, 0xb4, 0xbb, 0x6e, 0x89, 0x45, 0x4f, 0xbd, 0xd8, 0xbb, 0xfb, - 0xfb, 0xbe, 0xd9, 0x99, 0xf1, 0x18, 0x5e, 0x96, 0xe2, 0x4a, 0xe6, 0xf6, 0xa1, 0x16, 0x79, 0xad, - 0x2e, 0x33, 0x55, 0x4b, 0x23, 0x49, 0x6c, 0xbe, 0x32, 0x21, 0x75, 0x66, 0xc1, 0x68, 0xa8, 0x8d, - 0xac, 0x31, 0xaf, 0xd8, 0x02, 0x2b, 0xb5, 0xc8, 0xcd, 0x8d, 0x42, 0xed, 0x75, 0xa3, 0x83, 0xa5, - 0x5c, 0x4a, 0xb7, 0xcc, 0xed, 0xca, 0x9f, 0xa6, 0x03, 0x88, 0x3f, 0x89, 0x2b, 0x59, 0xe0, 0xb7, - 0x6b, 0xd4, 0x26, 0xfd, 0xde, 0x81, 0xa7, 0x7e, 0xaf, 0x95, 0x14, 0x1a, 0xc9, 0x5b, 0x00, 0x17, - 0x6c, 0xae, 0xd1, 0x68, 0x1a, 0x8e, 0x3b, 0x93, 0x78, 0xfa, 0x22, 0x6b, 0xae, 0xbc, 0xf8, 0x6c, - 0xd1, 0x39, 0x9a, 0xe3, 0xee, 0xfa, 0xd7, 0x61, 0x50, 0x44, 0x55, 0xb3, 0xd7, 0xe4, 0x08, 0x06, - 0x27, 0x92, 0x2b, 0x29, 0x50, 0x98, 0xd9, 0x8d, 0x42, 0xba, 0x37, 0x0e, 0x27, 0x51, 0xd1, 0x3e, - 0x24, 0xaf, 0x61, 0xdf, 0x25, 0x4c, 0x3b, 0xe3, 0x70, 0x12, 0x4f, 0x5f, 0x65, 0x3b, 0xb5, 0x64, - 0xe7, 0x96, 0xb8, 0x64, 0xbc, 0xc8, 0xaa, 0xeb, 0xeb, 0x0a, 0x35, 0xed, 0x3e, 0xa0, 0x2e, 0x2c, - 0xf1, 0x6a, 0x27, 0x22, 0x1f, 0xe1, 0x39, 0x47, 0x53, 0x97, 0x97, 0x73, 0x8e, 0x86, 0x7d, 0x61, - 0x86, 0xd1, 0x7d, 0xe7, 0x3b, 0x6c, 0xf9, 0xce, 0x9c, 0xe6, 0xac, 0x91, 0xb8, 0x00, 0xcf, 0x78, - 0xeb, 0x8c, 0x4c, 0xa1, 0x6f, 0x58, 0xbd, 0xb4, 0x0d, 0xe8, 0xb9, 0x08, 0xb4, 0x15, 0x61, 0xe6, - 0x99, 0xb3, 0xde, 0x09, 0xc9, 0x3b, 0x88, 0x70, 0x85, 0x5c, 0x55, 0xac, 0xd6, 0xb4, 0xef, 0x5c, - 0xa3, 0x96, 0xeb, 0xf4, 0x8e, 0x3a, 0xdf, 0x3f, 0x71, 0xfa, 0x01, 0xa2, 0xfb, 0xca, 0xc9, 0x10, - 0x9e, 0xf0, 0x52, 0xcc, 0x4d, 0xc9, 0x91, 0x86, 0xe3, 0x70, 0xd2, 0x29, 0xfa, 0xbc, 0x14, 0xb3, - 0x92, 0xa3, 0x43, 0x6c, 0xe5, 0xd1, 0x5e, 0x83, 0xd8, 0xca, 0xa2, 0x34, 0x86, 0xe8, 0xbe, 0x1d, - 0xe9, 0x01, 0x90, 0xff, 0x6b, 0xb4, 0xdf, 0x7d, 0x27, 0xef, 0xf4, 0x14, 0x06, 0xad, 0x84, 0x1e, - 0x77, 0xf1, 0xf4, 0x04, 0xba, 0xce, 0xfd, 0xbe, 0x79, 0xb7, 0x1b, 0xb5, 0x33, 0x68, 0xa3, 0xe1, - 0x03, 0xc4, 0x8f, 0xdc, 0xf1, 0xd1, 0xfa, 0x4f, 0x12, 0xac, 0x37, 0x49, 0x78, 0xbb, 0x49, 0xc2, - 0xdf, 0x9b, 0x24, 0xfc, 0xb1, 0x4d, 0x82, 0xdb, 0x6d, 0x12, 0xfc, 0xdc, 0x26, 0xc1, 0x45, 0xcf, - 0xff, 0x00, 0x8b, 0x9e, 0x9b, 0xdf, 0x37, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xc9, 0x9c, 0xd8, - 0x20, 0x16, 0x03, 0x00, 0x00, + // 465 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0xcf, 0x6a, 0xdb, 0x40, + 0x10, 0xc6, 0xa5, 0xf8, 0x5f, 0x34, 0x8a, 0x53, 0xba, 0xa4, 0x45, 0xf6, 0x41, 0x31, 0x22, 0x07, + 0x1f, 0x8a, 0x04, 0x2e, 0x94, 0x42, 0x4f, 0x49, 0x08, 0x34, 0xd0, 0x40, 0xab, 0xf8, 0x94, 0x8b, + 0x59, 0xa7, 0x13, 0x57, 0xa0, 0xd5, 0x6e, 0xb4, 0x6b, 0xb0, 0xdf, 0xa2, 0xaf, 0xd2, 0xb7, 0xf0, + 0x31, 0xc7, 0x9e, 0x4a, 0x6b, 0xbf, 0x48, 0xd9, 0x5d, 0x25, 0xb5, 0xa8, 0x4f, 0xb9, 0x48, 0xbb, + 0xf3, 0xfb, 0xbe, 0xd9, 0x9d, 0x61, 0x16, 0x5e, 0x65, 0xc5, 0x1d, 0x4f, 0xf4, 0x47, 0x4c, 0x93, + 0x52, 0xdc, 0xc6, 0xa2, 0xe4, 0x8a, 0x13, 0x5f, 0x7d, 0xa3, 0x05, 0x97, 0xb1, 0x06, 0xfd, 0x9e, + 0x54, 0xbc, 0xc4, 0x24, 0xa7, 0x53, 0xcc, 0xc5, 0x34, 0x51, 0x4b, 0x81, 0xd2, 0xea, 0xfa, 0x47, + 0x33, 0x3e, 0xe3, 0x66, 0x99, 0xe8, 0x95, 0x8d, 0x46, 0x5d, 0xf0, 0x2f, 0x8b, 0x3b, 0x9e, 0xe2, + 0xfd, 0x1c, 0xa5, 0x8a, 0x7e, 0x34, 0xe0, 0xc0, 0xee, 0xa5, 0xe0, 0x85, 0x44, 0xf2, 0x0e, 0xc0, + 0x24, 0x9b, 0x48, 0x54, 0x32, 0x70, 0x07, 0x8d, 0xa1, 0x3f, 0x7a, 0x19, 0x57, 0x47, 0xde, 0x7c, + 0xd2, 0xe8, 0x1a, 0xd5, 0x59, 0x73, 0xf5, 0xeb, 0xd8, 0x49, 0xbd, 0xbc, 0xda, 0x4b, 0x72, 0x02, + 0xdd, 0x73, 0xce, 0x04, 0x2f, 0xb0, 0x50, 0xe3, 0xa5, 0xc0, 0x60, 0x6f, 0xe0, 0x0e, 0xbd, 0xb4, + 0x1e, 0x24, 0x6f, 0xa0, 0x65, 0x2e, 0x1c, 0x34, 0x06, 0xee, 0xd0, 0x1f, 0xbd, 0x8e, 0xb7, 0x6a, + 0x89, 0xaf, 0x35, 0x31, 0x97, 0xb1, 0x22, 0xad, 0x2e, 0xe7, 0x39, 0xca, 0xa0, 0xb9, 0x43, 0x9d, + 0x6a, 0x62, 0xd5, 0x46, 0x44, 0x3e, 0xc2, 0x0b, 0x86, 0xaa, 0xcc, 0x6e, 0x27, 0x0c, 0x15, 0xfd, + 0x4a, 0x15, 0x0d, 0x5a, 0xc6, 0x77, 0x5c, 0xf3, 0x5d, 0x19, 0xcd, 0x55, 0x25, 0x31, 0x09, 0x0e, + 0x59, 0x2d, 0x46, 0x46, 0xd0, 0x51, 0xb4, 0x9c, 0xe9, 0x06, 0xb4, 0x4d, 0x86, 0xa0, 0x96, 0x61, + 0x6c, 0x99, 0xb1, 0x3e, 0x0a, 0xc9, 0x7b, 0xf0, 0x70, 0x81, 0x4c, 0xe4, 0xb4, 0x94, 0x41, 0xc7, + 0xb8, 0xfa, 0x35, 0xd7, 0xc5, 0x23, 0x35, 0xbe, 0x7f, 0x62, 0x92, 0x40, 0xeb, 0x7e, 0x8e, 0xe5, + 0x32, 0xd8, 0x37, 0xae, 0x5e, 0xcd, 0xf5, 0x45, 0x93, 0xd3, 0xcf, 0x97, 0xb6, 0x50, 0xa3, 0x8b, + 0x4e, 0xc1, 0x7b, 0x6a, 0x15, 0xe9, 0xc1, 0x3e, 0xcb, 0x8a, 0x89, 0xca, 0x18, 0x06, 0xee, 0xc0, + 0x1d, 0x36, 0xd2, 0x0e, 0xcb, 0x8a, 0x71, 0xc6, 0xd0, 0x20, 0xba, 0xb0, 0x68, 0xaf, 0x42, 0x74, + 0xa1, 0x51, 0xe4, 0x83, 0xf7, 0xd4, 0xbf, 0xe8, 0x08, 0xc8, 0xff, 0x4d, 0xd1, 0x83, 0xb2, 0x55, + 0x68, 0x74, 0x01, 0xdd, 0x5a, 0x05, 0xcf, 0x3c, 0xf8, 0x10, 0x0e, 0xb6, 0x4b, 0x1a, 0x9d, 0x43, + 0xd3, 0x64, 0xfb, 0x50, 0xfd, 0xeb, 0x9d, 0xde, 0x9a, 0xd4, 0x7e, 0x6f, 0x07, 0xb1, 0x33, 0x7b, + 0x76, 0xb2, 0xfa, 0x13, 0x3a, 0xab, 0x75, 0xe8, 0x3e, 0xac, 0x43, 0xf7, 0xf7, 0x3a, 0x74, 0xbf, + 0x6f, 0x42, 0xe7, 0x61, 0x13, 0x3a, 0x3f, 0x37, 0xa1, 0x73, 0xd3, 0xb6, 0x2f, 0x68, 0xda, 0x36, + 0x0f, 0xe0, 0xed, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0b, 0x22, 0x37, 0x8b, 0x57, 0x03, 0x00, + 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -471,6 +513,18 @@ func (m *InfoResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Query != nil { + { + size, err := m.Query.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x42 + } if m.Exemplars != nil { { size, err := m.Exemplars.MarshalToSizedBuffer(dAtA[:i]) @@ -690,6 +744,29 @@ func (m *ExemplarsInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *QueryAPIInfo) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *QueryAPIInfo) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryAPIInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + func encodeVarintRpc(dAtA []byte, offset int, v uint64) int { offset -= sovRpc(v) base := offset @@ -746,6 +823,10 @@ func (m *InfoResponse) Size() (n int) { l = m.Exemplars.Size() n += 1 + l + sovRpc(uint64(l)) } + if m.Query != nil { + l = m.Query.Size() + n += 1 + l + sovRpc(uint64(l)) + } return n } @@ -806,6 +887,15 @@ func (m *ExemplarsInfo) Size() (n int) { return n } +func (m *QueryAPIInfo) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + func sovRpc(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -1137,6 +1227,42 @@ func (m *InfoResponse) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Query", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Query == nil { + m.Query = &QueryAPIInfo{} + } + if err := m.Query.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) @@ -1484,6 +1610,56 @@ func (m *ExemplarsInfo) Unmarshal(dAtA []byte) error { } return nil } +func (m *QueryAPIInfo) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: QueryAPIInfo: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: QueryAPIInfo: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipRpc(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pkg/info/infopb/rpc.proto b/pkg/info/infopb/rpc.proto index 3d05168b31e..66a182a3e5c 100644 --- a/pkg/info/infopb/rpc.proto +++ b/pkg/info/infopb/rpc.proto @@ -31,21 +31,24 @@ message InfoRequest {} message InfoResponse { repeated ZLabelSet label_sets = 1 [(gogoproto.nullable) = false]; string ComponentType = 2; - + // StoreInfo holds the metadata related to Store API if exposed by the component otherwise it will be null. - StoreInfo store = 3; + StoreInfo store = 3; // RulesInfo holds the metadata related to Rules API if exposed by the component otherwise it will be null. RulesInfo rules = 4; - + // MetricMetadataInfo holds the metadata related to Metadata API if exposed by the component otherwise it will be null. MetricMetadataInfo metric_metadata = 5; - + // TargetsInfo holds the metadata related to Targets API if exposed by the component otherwise it will be null. TargetsInfo targets = 6; - + // ExemplarsInfo holds the metadata related to Exemplars API if exposed by the component otherwise it will be null. ExemplarsInfo exemplars = 7; + + // QueryAPIInfo holds the metadata related to Query API if exposed by the component, otherwise it will be null. + QueryAPIInfo query = 8; } // StoreInfo holds the metadata related to Store API exposed by the component. @@ -70,4 +73,8 @@ message TargetsInfo { message ExemplarsInfo { int64 min_time = 1; int64 max_time = 2; -} \ No newline at end of file +} + +// QueryInfo holds the metadata related to Query API exposed by the component. +message QueryAPIInfo { +} diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 36eca1e7c49..048c7c14369 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -12,6 +12,8 @@ import ( "sync" "time" + "github.com/thanos-io/thanos/pkg/api/query/querypb" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" @@ -118,6 +120,7 @@ func (es *GRPCEndpointSpec) fillExpectedAPIs(componentType component.Component, Targets: &infopb.TargetsInfo{}, MetricMetadata: &infopb.MetricMetadataInfo{}, Exemplars: &infopb.ExemplarsInfo{}, + Query: &infopb.QueryAPIInfo{}, } } case component.Receive: @@ -371,6 +374,20 @@ func (e *EndpointSet) GetStoreClients() []store.Client { return stores } +// GetQueryAPIClients returns a list of all active query API clients. +func (e *EndpointSet) GetQueryAPIClients() []querypb.QueryClient { + e.endpointsMtx.RLock() + defer e.endpointsMtx.RUnlock() + + stores := make([]querypb.QueryClient, 0, len(e.endpoints)) + for _, er := range e.endpoints { + if er.HasQueryAPI() { + stores = append(stores, er.clients.query) + } + } + return stores +} + // GetRulesClients returns a list of all active rules clients. func (e *EndpointSet) GetRulesClients() []rulespb.RulesClient { e.endpointsMtx.RLock() @@ -648,6 +665,10 @@ func (er *endpointRef) Update(metadata *endpointMetadata) { clients.exemplar = exemplarspb.NewExemplarsClient(er.cc) } + if metadata.Query != nil { + clients.query = querypb.NewQueryClient(er.cc) + } + er.clients = clients er.metadata = metadata } @@ -670,6 +691,13 @@ func (er *endpointRef) HasStoreAPI() bool { return er.clients != nil && er.clients.store != nil } +func (er *endpointRef) HasQueryAPI() bool { + er.mtx.RLock() + defer er.mtx.RUnlock() + + return er.clients != nil && er.clients.query != nil +} + func (er *endpointRef) HasRulesAPI() bool { er.mtx.RLock() defer er.mtx.RUnlock() @@ -768,6 +796,10 @@ func (er *endpointRef) apisPresent() []string { apisPresent = append(apisPresent, "MetricMetadataAPI") } + if er.HasQueryAPI() { + apisPresent = append(apisPresent, "QueryAPI") + } + return apisPresent } @@ -777,6 +809,7 @@ type endpointClients struct { metricMetadata metadatapb.MetadataClient exemplar exemplarspb.ExemplarsClient target targetspb.TargetsClient + query querypb.QueryClient info infopb.InfoClient } diff --git a/pkg/query/endpointset_test.go b/pkg/query/endpointset_test.go index 08e59f858d4..181e576e9fe 100644 --- a/pkg/query/endpointset_test.go +++ b/pkg/query/endpointset_test.go @@ -51,6 +51,7 @@ var ( Rules: &infopb.RulesInfo{}, MetricMetadata: &infopb.MetricMetadataInfo{}, Targets: &infopb.TargetsInfo{}, + Query: &infopb.QueryAPIInfo{}, } ruleInfo = &infopb.InfoResponse{ ComponentType: component.Rule.String(),