Skip to content

Commit

Permalink
Merge branch 'master' into rhall-leak-version
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanhall07 authored Nov 16, 2021
2 parents 1ebbf21 + d2d4306 commit 012e473
Show file tree
Hide file tree
Showing 10 changed files with 310 additions and 63 deletions.
126 changes: 115 additions & 11 deletions src/integration/resources/coordinator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"io/ioutil"
"net"
"net/http"
"path"
"strings"
"time"

Expand Down Expand Up @@ -317,7 +318,22 @@ func (c *CoordinatorClient) WaitForClusterReady() error {
logger.Error("failed checking cluster readiness", zap.Error(err))
return err
}
resp.Body.Close()
defer resp.Body.Close()

if resp.StatusCode != 200 {
err = errors.New("non-200 status code received")

body, rerr := ioutil.ReadAll(resp.Body)
if rerr != nil {
logger.Warn("failed parse response body", zap.Error(rerr))
body = []byte("")
}

logger.Error("failed to check cluster readiness", zap.Error(err),
zap.String("responseBody", string(body)),
)
return err
}

logger.Info("cluster ready to receive reads and writes")

Expand Down Expand Up @@ -715,8 +731,8 @@ func (c *CoordinatorClient) query(
}

// InstantQuery runs an instant query with provided headers
func (c *CoordinatorClient) InstantQuery(req QueryRequest, headers map[string][]string) (model.Vector, error) {
queryStr := fmt.Sprintf("%s?query=%s", route.QueryURL, req.QueryExpr)
func (c *CoordinatorClient) InstantQuery(req QueryRequest, headers Headers) (model.Vector, error) {
queryStr := fmt.Sprintf("%s?query=%s", route.QueryURL, req.Query)
if req.Time != nil {
queryStr = fmt.Sprintf("%s&time=%d", queryStr, req.Time.Unix())
}
Expand Down Expand Up @@ -745,21 +761,21 @@ type vectorResult struct {
}

// RangeQuery runs a range query with provided headers
func (c *CoordinatorClient) RangeQuery(req RangeQueryRequest, headers map[string][]string) (model.Matrix, error) {
if req.StartTime.IsZero() {
req.StartTime = time.Now()
func (c *CoordinatorClient) RangeQuery(req RangeQueryRequest, headers Headers) (model.Matrix, error) {
if req.Start.IsZero() {
req.Start = time.Now()
}
if req.EndTime.IsZero() {
req.EndTime = time.Now()
if req.End.IsZero() {
req.End = time.Now()
}
if req.Step == 0 {
req.Step = 15 * time.Second // default step is 15 seconds.
}
queryStr := fmt.Sprintf(
"%s?query=%s&start=%d&end=%d&step=%f",
route.QueryRangeURL, req.QueryExpr,
req.StartTime.Unix(),
req.EndTime.Unix(),
route.QueryRangeURL, req.Query,
req.Start.Unix(),
req.End.Unix(),
req.Step.Seconds(),
)

Expand All @@ -776,6 +792,84 @@ func (c *CoordinatorClient) RangeQuery(req RangeQueryRequest, headers map[string
return parsedResp.Data.Result, nil
}

// LabelNames return matching label names based on the request.
func (c *CoordinatorClient) LabelNames(
req LabelNamesRequest,
headers Headers,
) (model.LabelNames, error) {
urlPathAndQuery := fmt.Sprintf("%s?%s", route.LabelNamesURL, req.String())
resp, err := c.runQuery(urlPathAndQuery, headers)
if err != nil {
return nil, err
}

var parsedResp labelResponse
if err := json.Unmarshal([]byte(resp), &parsedResp); err != nil {
return nil, err
}

labelNames := make(model.LabelNames, 0, len(parsedResp.Data))
for _, label := range parsedResp.Data {
labelNames = append(labelNames, model.LabelName(label))
}

return labelNames, nil
}

// LabelValues return matching label values based on the request.
func (c *CoordinatorClient) LabelValues(
req LabelValuesRequest,
headers Headers,
) (model.LabelValues, error) {
urlPathAndQuery := fmt.Sprintf("%s?%s",
path.Join(route.Prefix, "label", req.LabelName, "values"),
req.String())
resp, err := c.runQuery(urlPathAndQuery, headers)
if err != nil {
return nil, err
}

var parsedResp labelResponse
if err := json.Unmarshal([]byte(resp), &parsedResp); err != nil {
return nil, err
}

labelValues := make(model.LabelValues, 0, len(parsedResp.Data))
for _, label := range parsedResp.Data {
labelValues = append(labelValues, model.LabelValue(label))
}

return labelValues, nil
}

// Series returns matching series based on the request.
func (c *CoordinatorClient) Series(
req SeriesRequest,
headers Headers,
) ([]model.Metric, error) {
urlPathAndQuery := fmt.Sprintf("%s?%s", route.SeriesMatchURL, req.String())
resp, err := c.runQuery(urlPathAndQuery, headers)
if err != nil {
return nil, err
}

var parsedResp seriesResponse
if err := json.Unmarshal([]byte(resp), &parsedResp); err != nil {
return nil, err
}

series := make([]model.Metric, 0, len(parsedResp.Data))
for _, labels := range parsedResp.Data {
labelSet := make(model.LabelSet)
for name, val := range labels {
labelSet[model.LabelName(name)] = model.LabelValue(val)
}
series = append(series, model.Metric(labelSet))
}

return series, nil
}

type jsonRangeQueryResponse struct {
Status string
Data matrixResult
Expand All @@ -786,6 +880,16 @@ type matrixResult struct {
Result model.Matrix
}

type labelResponse struct {
Status string
Data []string
}

type seriesResponse struct {
Status string
Data []map[string]string
}

func (c *CoordinatorClient) runQuery(
query string, headers map[string][]string,
) (string, error) {
Expand Down
41 changes: 38 additions & 3 deletions src/integration/resources/docker/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ func (c *coordinator) ApplyKVUpdate(update string) error {
}

func (c *coordinator) RunQuery(
verifier resources.ResponseVerifier, query string, headers map[string][]string,
verifier resources.ResponseVerifier,
query string,
headers resources.Headers,
) error {
if c.resource.closed {
return errClosed
Expand All @@ -233,7 +235,7 @@ func (c *coordinator) RunQuery(

func (c *coordinator) InstantQuery(
req resources.QueryRequest,
headers map[string][]string,
headers resources.Headers,
) (model.Vector, error) {
if c.resource.closed {
return nil, errClosed
Expand All @@ -244,14 +246,47 @@ func (c *coordinator) InstantQuery(
// RangeQuery runs a range query with provided headers
func (c *coordinator) RangeQuery(
req resources.RangeQueryRequest,
headers map[string][]string,
headers resources.Headers,
) (model.Matrix, error) {
if c.resource.closed {
return nil, errClosed
}
return c.client.RangeQuery(req, headers)
}

// LabelNames return matching label names based on the request.
func (c *coordinator) LabelNames(
req resources.LabelNamesRequest,
headers resources.Headers,
) (model.LabelNames, error) {
if c.resource.closed {
return nil, errClosed
}
return c.client.LabelNames(req, headers)
}

// LabelValues returns matching label values based on the request.
func (c *coordinator) LabelValues(
req resources.LabelValuesRequest,
headers resources.Headers,
) (model.LabelValues, error) {
if c.resource.closed {
return nil, errClosed
}
return c.client.LabelValues(req, headers)
}

// Series returns matching series based on the request.
func (c *coordinator) Series(
req resources.SeriesRequest,
headers resources.Headers,
) ([]model.Metric, error) {
if c.resource.closed {
return nil, errClosed
}
return c.client.Series(req, headers)
}

func (c *coordinator) Close() error {
if c.resource.closed {
return errClosed
Expand Down
10 changes: 5 additions & 5 deletions src/integration/resources/inprocess/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,11 @@ func testAggMetrics(t *testing.T, coord resources.Coordinator) {
return coord.WriteProm("cpu", map[string]string{"host": "host1"}, samples)
}))

queryHeaders := map[string][]string{"M3-Metrics-Type": {"aggregated"}, "M3-Storage-Policy": {"10s:6h"}}
queryHeaders := resources.Headers{"M3-Metrics-Type": {"aggregated"}, "M3-Storage-Policy": {"10s:6h"}}

// Instant Query
require.NoError(t, resources.Retry(func() error {
result, err := coord.InstantQuery(resources.QueryRequest{QueryExpr: "cpu"}, queryHeaders)
result, err := coord.InstantQuery(resources.QueryRequest{Query: "cpu"}, queryHeaders)
if err != nil {
return err
}
Expand All @@ -312,9 +312,9 @@ func testAggMetrics(t *testing.T, coord resources.Coordinator) {
require.NoError(t, resources.Retry(func() error {
result, err := coord.RangeQuery(
resources.RangeQueryRequest{
QueryExpr: "cpu",
StartTime: time.Now().Add(-30 * time.Second),
EndTime: time.Now(),
Query: "cpu",
Start: time.Now().Add(-30 * time.Second),
End: time.Now(),
Step: 1 * time.Second,
},
queryHeaders,
Expand Down
30 changes: 27 additions & 3 deletions src/integration/resources/inprocess/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,27 +418,51 @@ func (c *Coordinator) WriteProm(name string, tags map[string]string, samples []p
func (c *Coordinator) RunQuery(
verifier resources.ResponseVerifier,
query string,
headers map[string][]string,
headers resources.Headers,
) error {
return c.client.RunQuery(verifier, query, headers)
}

// InstantQuery runs an instant query with provided headers
func (c *Coordinator) InstantQuery(
req resources.QueryRequest,
headers map[string][]string,
headers resources.Headers,
) (model.Vector, error) {
return c.client.InstantQuery(req, headers)
}

// RangeQuery runs a range query with provided headers
func (c *Coordinator) RangeQuery(
req resources.RangeQueryRequest,
headers map[string][]string,
headers resources.Headers,
) (model.Matrix, error) {
return c.client.RangeQuery(req, headers)
}

// LabelNames return matching label names based on the request.
func (c *Coordinator) LabelNames(
req resources.LabelNamesRequest,
headers resources.Headers,
) (model.LabelNames, error) {
return c.client.LabelNames(req, headers)
}

// LabelValues returns matching label values based on the request.
func (c *Coordinator) LabelValues(
req resources.LabelValuesRequest,
headers resources.Headers,
) (model.LabelValues, error) {
return c.client.LabelValues(req, headers)
}

// Series returns matching series based on the request.
func (c *Coordinator) Series(
req resources.SeriesRequest,
headers resources.Headers,
) ([]model.Metric, error) {
return c.client.Series(req, headers)
}

func updateCoordinatorConfig(
cfg config.Configuration,
opts CoordinatorOptions,
Expand Down
Loading

0 comments on commit 012e473

Please sign in to comment.