Skip to content

Commit

Permalink
[query] Noop storage backend to serve admin APIs (#2236)
Browse files Browse the repository at this point in the history
- Some use cases require the coordinator for serving m3admin APIs but
  have no need to communicate with M3DB or serve query data.
- Currently need a live M3DB cluster in order to serve m3admin APIs.
- This PR adds a `noop-etcd` storage backend for serving m3admin APIs.

Example of querying:
```
$ curl -v "localhost:7201/api/v1/query_range?start=$(date '+%s')&end=$(date '+%s')&step=10&query=foo"
*   Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 7201 (#0)
> GET /api/v1/query_range?start=1585603982&end=1585603982&step=10&query=foo HTTP/1.1
> Host: localhost:7201
> User-Agent: curl/7.54.0
> Accept: */*
>
< HTTP/1.1 500 Internal Server Error
< Access-Control-Allow-Headers: accept, content-type, authorization
< Access-Control-Allow-Methods: POST, GET, OPTIONS, PUT, DELETE
< Access-Control-Allow-Origin: *
< Date: Mon, 30 Mar 2020 21:33:02 GMT
< Content-Length: 48
< Content-Type: text/plain; charset=utf-8
<
{"error":"operation not valid for noop client"}
```
  • Loading branch information
schallert authored Mar 31, 2020
1 parent 341e817 commit 1e794f2
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
version: "3.5"
services:
coordinator01:
expose:
- "7201"
ports:
- "0.0.0.0:7201:7201"
networks:
- backend
image: "m3coordinator_integration:${REVISION}"
volumes:
- "./m3coordinator.yml:/etc/m3coordinator/m3coordinator.yml"
etcd01:
expose:
- "2379-2380"
ports:
- "0.0.0.0:2379-2380:2379-2380"
networks:
- backend
image: quay.io/coreos/etcd:v3.4.3
command:
- "etcd"
- "--name"
- "etcd01"
- "--listen-peer-urls"
- "http://0.0.0.0:2380"
- "--listen-client-urls"
- "http://0.0.0.0:2379"
- "--advertise-client-urls"
- "http://etcd01:2379"
- "--initial-cluster-token"
- "etcd-cluster-1"
- "--initial-advertise-peer-urls"
- "http://etcd01:2380"
- "--initial-cluster"
- "etcd01=http://etcd01:2380"
- "--initial-cluster-state"
- "new"
- "--data-dir"
- "/var/lib/etcd"
networks:
backend:
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
listenAddress:
value: "0.0.0.0:7201"

logging:
level: info

metrics:
scope:
prefix: "coordinator"
prometheus:
handlerPath: /metrics
listenAddress: 0.0.0.0:7203 # until https://github.com/m3db/m3/issues/682 is resolved
sanitization: prometheus
samplingRate: 1.0
extended: none

backend: noop-etcd
clusterManagement:
etcd:
env: default_env
zone: embedded
service: m3db
cacheDir: /var/lib/m3kv
etcdClusters:
- zone: embedded
endpoints:
- etcd01:2379

tagOptions:
idScheme: quoted
51 changes: 51 additions & 0 deletions scripts/docker-integration-tests/coordinator_noop/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/usr/bin/env bash

set -xe

source $GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/common.sh
REVISION=$(git rev-parse HEAD)
SCRIPT_PATH=$GOPATH/src/github.com/m3db/m3/scripts/docker-integration-tests/coordinator_noop
COMPOSE_FILE=$SCRIPT_PATH/docker-compose.yml
export REVISION

echo "Run coordinator with no etcd"
docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes coordinator01
docker-compose -f ${COMPOSE_FILE} up -d --renew-anon-volumes etcd01

function defer {
docker-compose -f ${COMPOSE_FILE} down || echo "unable to shutdown containers" # CI fails to stop all containers sometimes
}
trap defer EXIT

I=0
RES=""
while [[ "$I" -le 5 ]]; do
if curl -vvvsSf -X POST localhost:7201/api/v1/services/m3coordinator/placement/init -d '{
"instances": [
{
"id": "m3coordinator01",
"zone": "embedded",
"endpoint": "m3coordinator01:7507",
"hostname": "m3coordinator01",
"port": 7507
}
]
}'; then
break
fi
# Need some time for coordinators to come up.
sleep 2
I=$((I+1))
done

if ! curl -vvvsSf localhost:7201/api/v1/services/m3coordinator/placement; then
echo "could not fetch existing placement"
exit 1
fi

QUERY_EXP='{"error":"operation not valid for noop client"}'
RES=$(curl "localhost:7201/api/v1/query_range?start=$(date '+%s')&end=$(date '+%s')&step=10&query=foo")
if [[ "$RES" != "$QUERY_EXP" ]]; then
echo "Expected resp '$QUERY_EXP', GOT '$RES'"
exit 1
fi
1 change: 1 addition & 0 deletions scripts/docker-integration-tests/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ TESTS=(
scripts/docker-integration-tests/repair_and_replication/test.sh
scripts/docker-integration-tests/multi_cluster_write/test.sh
scripts/docker-integration-tests/coordinator_config_rules/test.sh
scripts/docker-integration-tests/coordinator_noop/test.sh
)

# Some systems, including our default Buildkite hosts, don't come with netcat
Expand Down
5 changes: 5 additions & 0 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ const (
GRPCStorageType BackendStorageType = "grpc"
// M3DBStorageType is for m3db backend.
M3DBStorageType BackendStorageType = "m3db"
// NoopEtcdStorageType is for a noop backend which returns empty results for
// any query and blackholes any writes, but requires that a valid etcd cluster
// is defined and can be connected to. Primarily used for standalone
// coordinators used only to serve m3admin APIs.
NoopEtcdStorageType BackendStorageType = "noop-etcd"

defaultCarbonIngesterListenAddress = "0.0.0.0:7204"
errNoIDGenerationScheme = "error: a recent breaking change means that an ID " +
Expand Down
25 changes: 23 additions & 2 deletions src/query/server/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ func Run(runOpts RunOptions) {
tsdbOpts = runOpts.ApplyCustomTSDBOptions(tsdbOpts)
}

if cfg.Backend == config.GRPCStorageType {
switch cfg.Backend {
case config.GRPCStorageType:
// For grpc backend, we need to setup only the grpc client and a storage
// accompanying that client.
poolWrapper := pools.NewPoolsWrapper(
Expand All @@ -281,7 +282,24 @@ func Run(runOpts RunOptions) {
backendStorage = fanout.NewStorage(remotes, r, w, c,
instrumentOptions)
logger.Info("setup grpc backend")
} else {

case config.NoopEtcdStorageType:
backendStorage = storage.NewNoopStorage()
mgmt := cfg.ClusterManagement

if mgmt == nil || len(mgmt.Etcd.ETCDClusters) == 0 {
logger.Fatal("must specify cluster management config and at least one etcd cluster")
}

opts := mgmt.Etcd.NewOptions()
clusterClient, err = etcdclient.NewConfigServiceClient(opts)
if err != nil {
logger.Fatal("error constructing etcd client", zap.Error(err))
}
logger.Info("setup noop storage backend with etcd")

// Empty backend defaults to M3DB.
case "":
// For m3db backend, we need to make connections to the m3db cluster
// which generates a session and use the storage with the session.
m3dbClusters, m3dbPoolWrapper, err = initClusters(cfg,
Expand All @@ -300,6 +318,9 @@ func Run(runOpts RunOptions) {
logger.Fatal("unable to setup m3db backend", zap.Error(err))
}
defer cleanup()

default:
logger.Fatal("unrecognized backend", zap.String("backend", string(cfg.Backend)))
}

chainedEnforcer, chainedEnforceCloser, err := newConfiguredChainedEnforcer(&cfg,
Expand Down
88 changes: 88 additions & 0 deletions src/query/storage/noop_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package storage

import (
"context"
"errors"

"github.com/m3db/m3/src/query/block"
)

var noopClientError = errors.New("operation not valid for noop client")

// NewNoopStorage returns a fake implementation of Storage that rejects all
// writes and returns errors for all queries.
func NewNoopStorage() Storage {
return noopStorage{}
}

type noopStorage struct{}

func (noopStorage) Fetch(ctx context.Context, query *FetchQuery, options *FetchOptions) (*FetchResult, error) {
return nil, noopClientError
}

func (noopStorage) FetchProm(ctx context.Context, query *FetchQuery, options *FetchOptions) (PromResult, error) {
return PromResult{}, noopClientError
}

// FetchBlocks fetches timeseries as blocks based on a query.
func (noopStorage) FetchBlocks(ctx context.Context, query *FetchQuery, options *FetchOptions) (block.Result, error) {
return block.Result{}, noopClientError
}

// SearchSeries returns series IDs matching the current query.
func (noopStorage) SearchSeries(ctx context.Context, query *FetchQuery, options *FetchOptions) (*SearchResults, error) {
return nil, noopClientError
}

// CompleteTags returns autocompleted tag results.
func (noopStorage) CompleteTags(ctx context.Context, query *CompleteTagsQuery, options *FetchOptions) (*CompleteTagsResult, error) {
return nil, noopClientError
}

// Write writes a batched set of datapoints to storage based on the provided
// query.
func (noopStorage) Write(ctx context.Context, query *WriteQuery) error {
return noopClientError
}

// Type identifies the type of the underlying
func (noopStorage) Type() Type {
return TypeLocalDC
}

// Close is used to close the underlying storage and free up resources.
func (noopStorage) Close() error {
return noopClientError
}

// ErrorBehavior dictates what fanout storage should do when this storage
// encounters an error.
func (noopStorage) ErrorBehavior() ErrorBehavior {
return BehaviorWarn
}

// Name gives the plaintext name for this storage, used for logging purposes.
func (noopStorage) Name() string {
return "noopStorage"
}

0 comments on commit 1e794f2

Please sign in to comment.