Skip to content

Commit

Permalink
Arnikola/fix tag funcs (#1985)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored Oct 15, 2019
1 parent b112fd5 commit b267303
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 6 deletions.
6 changes: 5 additions & 1 deletion scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
set -xe

source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/common.sh
source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/prometheus/test_correctness.sh
REVISION=$(git rev-parse HEAD)
COMPOSE_FILE=$GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/prometheus/docker-compose.yml
# quay.io/m3db/prometheus_remote_client_golang @ v0.4.3
Expand Down Expand Up @@ -179,10 +180,13 @@ function test_query_restrict_metrics_type {
retry_with_backoff prometheus_query_native
}
# Run all tests
echo "Running prometehus tests"
test_prometheus_remote_read
test_prometheus_remote_write_multi_namespaces
test_prometheus_remote_write_too_old_returns_400_status_code
test_prometheus_remote_write_restrict_metrics_type
test_query_limits_applied
test_query_restrict_metrics_type
echo "Running function correctness tests"
test_correctness
59 changes: 59 additions & 0 deletions scripts/docker-integration-tests/prometheus/test_correctness.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#!/usr/bin/env bash

set -ex
source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/common.sh

function write_metrics {
NUM=$1
echo "Writing $NUM metrics to [0.0.0.0:9003]"
# set +x
for (( i=0; i<$NUM; i++ ))
do
curl -X POST 0.0.0.0:9003/writetagged -d '{
"namespace": "unagg",
"id": "{__name__=\"'$METRIC_NAME'\",val=\"'$i'\"}",
"tags": [
{
"name": "__name__",
"value": "'$METRIC_NAME'"
},
{
"name": "val",
"value": "'$i'"
}
],
"datapoint": {
"timestamp":'"$t"',
"value": 1
}
}'
done
# set -x
}

function test_instantaneous {
QUERY=$1
EXPECTED_COUNT=$2
EXPECTED=$3
RESPONSE=$(curl -sSL "http://localhost:7201/api/v1/query?query=$QUERY")
echo $REPONSE | jq .data.result
ACTUAL_COUNT=$(echo $RESPONSE | jq '.data.result | length')
ACTUAL=$(echo $RESPONSE | jq .data.result[].metric.foo | tr -d "\n")
CONCAT=$(echo $EXPECTED | tr -d " ")
test $ACTUAL_COUNT = $EXPECTED_COUNT && test $ACTUAL = $CONCAT
}

function test_replace {
export t=$(date +%s)
METRIC_NAME="quail_$t"
write_metrics 5
sleep 1
query='label_replace('$METRIC_NAME',"foo","bar_$1","val","(.*)")'
test_instantaneous $query 5 "\"bar_0\" \"bar_1\" \"bar_2\" \"bar_3\" \"bar_4\""
query='label_replace('$METRIC_NAME',"foo","bar_$1","val","(.*)")-0'
test_instantaneous $query 5 "\"bar_0\" \"bar_1\" \"bar_2\" \"bar_3\" \"bar_4\""
}

function test_correctness {
test_replace
}
6 changes: 4 additions & 2 deletions scripts/docker-integration-tests/query_fanout/warning.sh
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ function test_label_values {
ACTUAL_HEADER=$(cat $HEADER_FILE | grep M3-Results-Limited | cut -d' ' -f2 | tr -d "\r\n")
test $ACTUAL_HEADER = $EXPECTED_HEADER
}

function write_carbon {
CLUSTER=$1
case $CLUSTER in
Expand Down Expand Up @@ -282,6 +282,7 @@ function test_fanout_warning_label_values {
ATTEMPTS=3 TIMEOUT=1 retry_with_backoff test_label_values 1 max_fetch_series_limit_applied
}


function test_fanout_warning_graphite {
# Update write time as it will otherwise not be written correctly.
t=$(date +%s)
Expand Down Expand Up @@ -333,7 +334,7 @@ function test_fanout_warning_missing_zone {

ATTEMPTS=3 TIMEOUT=1 retry_with_backoff render_carbon 16 15 remote_store_cluster-c_fetch_warning
ATTEMPTS=3 TIMEOUT=1 retry_with_backoff render_carbon 9 14 max_fetch_series_limit_applied,remote_store_cluster-c_fetch_warning

ATTEMPTS=3 TIMEOUT=1 retry_with_backoff find_carbon 16 remote_store_cluster-c_complete_tags_warning
ATTEMPTS=3 TIMEOUT=1 retry_with_backoff find_carbon 9 max_fetch_series_limit_applied,remote_store_cluster-c_complete_tags_warning
}
Expand All @@ -349,3 +350,4 @@ function test_fanout_warnings {
test_fanout_warning_graphite
test_fanout_warning_missing_zone
}

13 changes: 12 additions & 1 deletion src/query/executor/transform/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,18 @@ func ProcessSimpleBlock(
return err
}

// NB: The flow here is a little weird; this kicks off the next block's
// processing step after retrieving it, then attempts to close it. There is a
// trick here where some blocks (specifically lazy wrappers) that should not
// be closed, as they would free underlying data. The general story in block
// lifecycle should be revisited to remove quirks arising from these edge
// cases (something where blocks are responsible for calling their own
// downstreams would seem more intuative and allow finer grained lifecycle
// control).
err = controller.Process(queryCtx, nextBlock)
nextBlock.Close()
if nextBlock.Info().Type() != block.BlockLazy {
nextBlock.Close()
}

return err
}
28 changes: 26 additions & 2 deletions src/query/executor/transform/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,22 @@ func TestProcessSimpleBlock(t *testing.T) {
return ProcessSimpleBlock(tctx.Node, tctx.Controller, tctx.QueryCtx, tctx.Controller.ID, tctx.SourceBlock)
}

configureSuccessfulNode := func(tctx *testContext) {
configureNode := func(
tctx *testContext,
blockType block.BlockType,
closeExpected bool,
) {
tctx.Node.EXPECT().Params().Return(utils.StaticParams("foo"))
tctx.Node.EXPECT().ProcessBlock(gomock.Any(), gomock.Any(), gomock.Any()).Return(tctx.ResultBlock, nil)
tctx.ChildNode.EXPECT().Process(gomock.Any(), gomock.Any(), gomock.Any())
tctx.ResultBlock.EXPECT().Close()
tctx.ResultBlock.EXPECT().Info().Return(block.NewBlockInfo(blockType))
if closeExpected {
tctx.ResultBlock.EXPECT().Close()
}
}

configureSuccessfulNode := func(tctx *testContext) {
configureNode(tctx, block.BlockM3TSZCompressed, true)
}

t.Run("closes next block", func(t *testing.T) {
Expand All @@ -96,6 +107,19 @@ func TestProcessSimpleBlock(t *testing.T) {
require.NoError(t, doCall(tctx))
})

configureLazyNode := func(tctx *testContext) {
configureNode(tctx, block.BlockLazy, false)
}

t.Run("does not close lazy block", func(t *testing.T) {
tctx, closer := setup(t)
defer closer()

configureLazyNode(tctx)

require.NoError(t, doCall(tctx))
})

t.Run("errors on process error", func(t *testing.T) {
tctx, closer := setup(t)
defer closer()
Expand Down

0 comments on commit b267303

Please sign in to comment.