Skip to content

Commit

Permalink
[DBNode] - Add support for batching fetches across namespaces (#1987)
Browse files Browse the repository at this point in the history
  • Loading branch information
Richard Artoul authored Oct 8, 2019
1 parent a6c0d4e commit 8d81062
Show file tree
Hide file tree
Showing 30 changed files with 2,273 additions and 432 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
version: "3.5"
services:
dbnode01:
expose:
- "9000-9004"
- "2379-2380"
ports:
- "0.0.0.0:9000-9004:9000-9004"
- "0.0.0.0:2379-2380:2379-2380"
networks:
- backend
image: "m3dbnode_integration:${REVISION}"
coordinator01:
expose:
- "7201"
- "7203"
ports:
- "0.0.0.0:7201:7201"
- "0.0.0.0:7203:7203"
networks:
- backend
image: "m3coordinator_integration:${REVISION}"
volumes:
- "./:/etc/m3coordinator/"
networks:
backend:
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
listenAddress:
type: "config"
value: "0.0.0.0:7201"

logging:
level: info

metrics:
scope:
prefix: "coordinator"
prometheus:
handlerPath: /metrics
listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved
sanitization: prometheus
samplingRate: 1.0
extended: none

limits:
perQuery:
maxFetchedSeries: 100

clusters:
- namespaces:
- namespace: agg
type: aggregated
retention: 10h
resolution: 15s
- namespace: unagg
type: unaggregated
retention: 10m
client:
config:
service:
env: default_env
zone: embedded
service: m3db
cacheDir: /var/lib/m3kv
etcdClusters:
- zone: embedded
endpoints:
- dbnode01:2379
writeConsistencyLevel: majority
readConsistencyLevel: unstrict_majority
useV2BatchAPIs: true

tagOptions:
idScheme: quoted
188 changes: 188 additions & 0 deletions scripts/docker-integration-tests/simple_v2_batch_apis/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
#!/usr/bin/env bash

set -xe

source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/common.sh
REVISION=$(git rev-parse HEAD)
COMPOSE_FILE=$GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/prometheus/docker-compose.yml
# quay.io/m3db/prometheus_remote_client_golang @ v0.4.3
PROMREMOTECLI_IMAGE=quay.io/m3db/prometheus_remote_client_golang@sha256:fc56df819bff9a5a087484804acf3a584dd4a78c68900c31a28896ed66ca7e7b
JQ_IMAGE=realguess/jq:1.4@sha256:300c5d9fb1d74154248d155ce182e207cf6630acccbaadd0168e18b15bfaa786
METRIC_NAME_TEST_TOO_OLD=foo
METRIC_NAME_TEST_RESTRICT_WRITE=bar
export REVISION

echo "Pull containers required for test"
docker pull $PROMREMOTECLI_IMAGE
docker pull $JQ_IMAGE

echo "Run m3dbnode and m3coordinator containers"
docker-compose -f ${COMPOSE_FILE} up -d dbnode01
docker-compose -f ${COMPOSE_FILE} up -d coordinator01

function defer {
docker-compose -f ${COMPOSE_FILE} down || echo "unable to shutdown containers" # CI fails to stop all containers sometimes
}
trap defer EXIT

setup_single_m3db_node

echo "Start Prometheus containers"
docker-compose -f ${COMPOSE_FILE} up -d prometheus01

function test_prometheus_remote_read {
# Ensure Prometheus can proxy a Prometheus query
echo "Wait until the remote write endpoint generates and allows for data to be queried"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -sSf 0.0.0.0:9090/api/v1/query?query=prometheus_remote_storage_succeeded_samples_total | jq -r .data.result[].value[1]) -gt 100 ]]'
}

function test_prometheus_remote_write_multi_namespaces {
# Make sure we're proxying writes to the unaggregated namespace
echo "Wait until data begins being written to remote storage for the unaggregated namespace"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -sSf 0.0.0.0:9090/api/v1/query?query=database_write_tagged_success\\{namespace=\"unagg\"\\} | jq -r .data.result[0].value[1]) -gt 0 ]]'

# Make sure we're proxying writes to the aggregated namespace
echo "Wait until data begins being written to remote storage for the aggregated namespace"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -sSf 0.0.0.0:9090/api/v1/query?query=database_write_tagged_success\\{namespace=\"agg\"\\} | jq -r .data.result[0].value[1]) -gt 0 ]]'
}

function prometheus_remote_write {
local metric_name=$1
local datapoint_timestamp=$2
local datapoint_value=$3
local expect_success=$4
local expect_success_err=$5
local expect_status=$6
local expect_status_err=$7
local metrics_type=$8
local metrics_storage_policy=$9

network=$(docker network ls --format '{{.ID}}' | tail -n 1)
out=$((docker run -it --rm --network $network \
$PROMREMOTECLI_IMAGE \
-u http://coordinator01:7201/api/v1/prom/remote/write \
-t __name__:${metric_name} \
-h "M3-Metrics-Type: ${metrics_type}" \
-h "M3-Storage-Policy: ${metrics_storage_policy}" \
-d ${datapoint_timestamp},${datapoint_value} | grep -v promremotecli_log) || true)
success=$(echo $out | grep -v promremotecli_log | docker run --rm -i $JQ_IMAGE jq .success)
status=$(echo $out | grep -v promremotecli_log | docker run --rm -i $JQ_IMAGE jq .statusCode)
if [[ "$success" != "$expect_success" ]]; then
echo $expect_success_err
return 1
fi
if [[ "$status" != "$expect_status" ]]; then
echo "${expect_status_err}: actual=${status}"
return 1
fi
echo "Returned success=${success}, status=${status} as expected"
return 0
}
function test_prometheus_remote_write_too_old_returns_400_status_code {
# Test writing too far into the past returns an HTTP 400 status code
echo "Test write into the past returns HTTP 400"
hour_ago=$(expr $(date +"%s") - 3600)
prometheus_remote_write \
$METRIC_NAME_TEST_TOO_OLD $hour_ago 3.142 \
false "Expected request to fail" \
400 "Expected request to return status code 400"
}
function test_prometheus_remote_write_restrict_metrics_type {
# Test we can specify metrics type
echo "Test write with unaggregated metrics type works as expected"
prometheus_remote_write \
$METRIC_NAME_TEST_RESTRICT_WRITE now 42.42 \
true "Expected request to succeed" \
200 "Expected request to return status code 200" \
unaggregated
echo "Test write with aggregated metrics type works as expected"
prometheus_remote_write \
$METRIC_NAME_TEST_RESTRICT_WRITE now 84.84 \
true "Expected request to succeed" \
200 "Expected request to return status code 200" \
aggregated 15s:10h
}
function test_query_limits_applied {
# Test the default series limit applied when directly querying
# coordinator (limit set to 100 in m3coordinator.yml)
echo "Test query limit with coordinator defaults"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s 0.0.0.0:7201/api/v1/query?query=\\{name!=\"\"\\} | jq -r ".data.result | length") -eq 100 ]]'
# Test the default series limit applied when directly querying
# coordinator (limit set by header)
echo "Test query limit with coordinator limit header"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \
'[[ $(curl -s -H "M3-Limit-Max-Series: 10" 0.0.0.0:7201/api/v1/query?query=\\{name!=\"\"\\} | jq -r ".data.result | length") -eq 10 ]]'
}
function prometheus_query_native {
local endpoint=${endpoint:-}
local query=${query:-}
local params=${params:-}
local metrics_type=${metrics_type:-}
local metrics_storage_policy=${metrics_storage_policy:-}
local jq_path=${jq_path:-}
local expected_value=${expected_value:-}
params_prefixed=""
if [[ "$params" != "" ]]; then
params_prefixed='&'"${params}"
fi
result=$(curl -s \
-H "M3-Metrics-Type: ${metrics_type}" \
-H "M3-Storage-Policy: ${metrics_storage_policy}" \
"0.0.0.0:7201/api/v1/${endpoint}?query=${query}${params_prefixed}" | jq -r "${jq_path}")
test "$result" = "$expected_value"
return $?
}
function test_query_restrict_metrics_type {
now=$(date +"%s")
hour_ago=$(expr $now - 3600)
step="30s"
params_instant=""
params_range="start=${hour_ago}"'&'"end=${now}"'&'"step=30s"
jq_path_instant=".data.result[0].value[1]"
jq_path_range=".data.result[0].values[][1]"
# Test restricting to unaggregated metrics
echo "Test query restrict to unaggregated metrics type (instant)"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \
endpoint=query query="$METRIC_NAME_TEST_RESTRICT_WRITE" params="$params_instant" \
metrics_type="unaggregated" jq_path="$jq_path_instant" expected_value="42.42" \
retry_with_backoff prometheus_query_native
echo "Test query restrict to unaggregated metrics type (range)"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \
endpoint=query_range query="$METRIC_NAME_TEST_RESTRICT_WRITE" params="$params_range" \
metrics_type="unaggregated" jq_path="$jq_path_range" expected_value="42.42" \
retry_with_backoff prometheus_query_native
# Test restricting to aggregated metrics
echo "Test query restrict to aggregated metrics type (instant)"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \
endpoint=query query="$METRIC_NAME_TEST_RESTRICT_WRITE" params="$params_instant" \
metrics_type="aggregated" metrics_storage_policy="15s:10h" jq_path="$jq_path_instant" expected_value="84.84" \
retry_with_backoff prometheus_query_native
echo "Test query restrict to aggregated metrics type (range)"
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \
endpoint=query_range query="$METRIC_NAME_TEST_RESTRICT_WRITE" params="$params_range" \
metrics_type="aggregated" metrics_storage_policy="15s:10h" jq_path="$jq_path_range" expected_value="84.84" \
retry_with_backoff prometheus_query_native
}
# Run all tests
test_prometheus_remote_read
test_prometheus_remote_write_multi_namespaces
test_prometheus_remote_write_too_old_returns_400_status_code
test_prometheus_remote_write_restrict_metrics_type
test_query_limits_applied
test_query_restrict_metrics_type
27 changes: 24 additions & 3 deletions src/dbnode/client/fetch_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import (

type fetchBatchOp struct {
checked.RefCount
request rpc.FetchBatchRawRequest
completionFns []completionFn
finalizer fetchBatchOpFinalizer
request rpc.FetchBatchRawRequest
requestV2Elements []rpc.FetchBatchRawV2RequestElement
completionFns []completionFn
finalizer fetchBatchOpFinalizer
}

func (f *fetchBatchOp) reset() {
Expand All @@ -46,20 +47,40 @@ func (f *fetchBatchOp) reset() {
f.completionFns[i] = nil
}
f.completionFns = f.completionFns[:0]

for i := range f.requestV2Elements {
f.requestV2Elements[i].NameSpace = 0
f.requestV2Elements[i].RangeStart = 0
f.requestV2Elements[i].RangeEnd = 0
f.requestV2Elements[i].ID = nil
f.requestV2Elements[i].RangeTimeType = 0
}
f.requestV2Elements = f.requestV2Elements[:0]

f.DecWrites()
}

func (f *fetchBatchOp) append(namespace, id []byte, completionFn completionFn) {
f.IncWrites()
f.request.NameSpace = namespace
f.request.Ids = append(f.request.Ids, id)
f.requestV2Elements = append(f.requestV2Elements, rpc.FetchBatchRawV2RequestElement{
// NameSpace filled in by the host queue later.
RangeStart: f.request.RangeStart,
RangeEnd: f.request.RangeEnd,
ID: id,
RangeTimeType: f.request.RangeTimeType,
})
f.completionFns = append(f.completionFns, completionFn)
f.DecWrites()
}

func (f *fetchBatchOp) Size() int {
f.IncReads()
value := len(f.request.Ids)
if value == 0 {
value = len(f.requestV2Elements)
}
f.DecReads()
return value
}
Expand Down
67 changes: 67 additions & 0 deletions src/dbnode/client/fetch_batch_element_array_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package client

import (
"github.com/m3db/m3/src/dbnode/generated/thrift/rpc"
"github.com/m3db/m3/src/x/pool"
)

type fetchBatchRawV2RequestElementArrayPool interface {
// Init pool
Init()

// Get an array of WriteBatchV2RawRequestElement objects
Get() []*rpc.FetchBatchRawV2RequestElement

// Put an array of FetchBatchRawV2RequestElement objects
Put(w []*rpc.FetchBatchRawV2RequestElement)
}

type poolOfFetchBatchRawV2RequestElementArray struct {
pool pool.ObjectPool
capacity int
}

func newFetchBatchRawV2RequestElementArrayPool(
opts pool.ObjectPoolOptions, capacity int) fetchBatchRawV2RequestElementArrayPool {

p := pool.NewObjectPool(opts)
return &poolOfFetchBatchRawV2RequestElementArray{p, capacity}
}

func (p *poolOfFetchBatchRawV2RequestElementArray) Init() {
p.pool.Init(func() interface{} {
return make([]*rpc.FetchBatchRawV2RequestElement, 0, p.capacity)
})
}

func (p *poolOfFetchBatchRawV2RequestElementArray) Get() []*rpc.FetchBatchRawV2RequestElement {
return p.pool.Get().([]*rpc.FetchBatchRawV2RequestElement)
}

func (p *poolOfFetchBatchRawV2RequestElementArray) Put(w []*rpc.FetchBatchRawV2RequestElement) {
for i := range w {
w[i] = nil
}
w = w[:0]
p.pool.Put(w)
}
Loading

0 comments on commit 8d81062

Please sign in to comment.