From 9a5b945d7f5b315b765953bbe7c4f346fcec4b68 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Wed, 6 Feb 2019 17:26:05 +0100 Subject: [PATCH] Improve /database/create API (#1350) --- docs/how_to/single_node.md | 31 +-- docs/integrations/prometheus.md | 2 +- .../namespace_configuration.md | 130 +++++----- scripts/docker-integration-tests/common.sh | 98 ++------ src/query/api/v1/handler/database/common.go | 6 +- src/query/api/v1/handler/database/create.go | 226 ++++++++++++++---- .../api/v1/handler/database/create_test.go | 221 +++++++++++++++-- src/query/api/v1/handler/placement/get.go | 82 +++++-- src/x/net/http/convert.go | 11 +- 9 files changed, 573 insertions(+), 234 deletions(-) diff --git a/docs/how_to/single_node.md b/docs/how_to/single_node.md index 768d5c3c1f..53cb07f22d 100644 --- a/docs/how_to/single_node.md +++ b/docs/how_to/single_node.md @@ -15,14 +15,12 @@ directory on your host for durability: ``` docker pull quay.io/m3/m3dbnode:latest -docker run -p 7201:7201 -p 7203:7203 -p 9003:9003 --name m3db -v $(pwd)/m3db_data:/var/lib/m3db -v $GOPATH/src/github.com/m3db/m3/src/dbnode/config/m3dbnode-local-etcd.yml:/etc/m3dbnode/m3dbnode.yml quay.io/m3/m3dbnode:latest +docker run -p 7201:7201 -p 7203:7203 -p 9003:9003 --name m3db -v $(pwd)/m3db_data:/var/lib/m3db -v :/etc/m3dbnode/m3dbnode.yml quay.io/m3/m3dbnode:latest ``` -**Note:** If you don't have `M3` setup in your `$GOPATH`, you can find the [m3dbnode-local-etcd.yml file here](https://github.com/m3db/m3/blob/master/src/dbnode/config/m3dbnode-local-etcd.yml). +**Note:** For the single node case, we recommend that you start with this [sample config file](https://github.com/m3db/m3/blob/master/src/dbnode/config/m3dbnode-local-etcd.yml). If you inspect the file, you'll see that all the configuration is namespaced by `coordinator` or `db`. That's because this setup runs `M3DB` and `M3Coordinator` as one application. While this is convenient for testing and development, you'll want to run clustered `M3DB` with a separate `M3Coordinator` in production. You can read more about that [here.](cluster_hard_way.md). -**Note:** This setup runs `M3DB` and `M3Coordinator` as one application and should only be used for testing/development purposes. If you want to run a clustered `M3DB` with a separate `M3Coordinator` process (which is our recommended production setup), please [see here](cluster_hard_way.md). - -Next, create an initial namespace for your metrics in the database: +Next, create an initial namespace for your metrics in the database using the cURL below. Keep in mind that the provided `namespaceName` must match the namespace in the `local` section of the `M3Coordinator` YAML configuration, and if you choose to [add any additional namespaces](../operational_guide/namespace_configuration.md) you'll need to add them to the `local` section of `M3Coordinator`'s YAML config as well. ```json curl -X POST http://localhost:7201/api/v1/database/create -d '{ @@ -32,24 +30,9 @@ curl -X POST http://localhost:7201/api/v1/database/create -d '{ }' ``` -**Note:** If you want to create more than one namespace, you should follow the [instructions here](../operational_guide/namespace_configuration.md) and also add the namespace you created to the `local` section of the `m3dbnode-local-etcd.yml` file used in the `docker run` command above with the appropriate aggregation options specified - for more information on our aggregation functionality, check out our [M3Query documentation](query.md). For example: - - - -```json -local: - namespaces: - - namespace: default - type: unaggregated - retention: 48h - - namespace: - type: aggregated - retention: - resolution: -``` +**Note**: The `api/v1/database/create` endpoint is abstraction over two concepts in M3DB called [placements](../operational_guide/placement.md) and [namespaces](../operational_guide/namespace_configuration.md). If a placement doesn't exist, it will create one based on the `type` argument, otherwise if the placement already exists, it just creates the specified namespace. For now it's enough to just understand that it creates M3DB namespaces (tables), but if you're going to run a clustered M3 setup in production, make sure you familiarize yourself with the links above. -Shortly after, you should see your node complete bootstrapping! Don't worry if you see warnings or -errors related to a local cache file, such as `[W] could not load cache from file +Shortly after, you should see your node complete bootstrapping! Don't worry if you see warnings or errors related to a local cache file, such as `[W] could not load cache from file /var/lib/m3kv/m3db_embedded.json`. Those are expected for a local instance and in general any warn-level errors (prefixed with `[W]`) should not block bootstrapping. @@ -131,6 +114,4 @@ curl -sSf -X POST http://localhost:9003/query -d '{ } ``` -## Integrations - -[Prometheus as a long term storage remote read/write endpoint](../integrations/prometheus.md). +Now that you've got the M3 stack up and running, take a look at the rest of our documentation to see how you can integrate with [Prometheus](../integrations/prometheus.md) and [Graphite](../integrations/graphite.md) diff --git a/docs/integrations/prometheus.md b/docs/integrations/prometheus.md index b9f89b9f51..c9a14a7ac6 100644 --- a/docs/integrations/prometheus.md +++ b/docs/integrations/prometheus.md @@ -8,7 +8,7 @@ To write to a remote M3DB cluster the simplest configuration is to run `m3coordi Start by downloading the [config template](https://github.com/m3db/m3/blob/master/src/query/config/m3coordinator-cluster-template.yml). Update the `namespaces` and the `client` section for a new cluster to match your cluster's configuration. -You'll need to specify the static IPs or hostnames of your M3DB seed nodes, and the name and retention values of the namespace you set up. You can leave the namespace storage metrics type as `unaggregated` since it's required by default to have a cluster that receives all Prometheus metrics unaggregated. In the future you might also want to aggregate and downsample metrics for longer retention, and you can come back and update the config once you've setup those clusters. +You'll need to specify the static IPs or hostnames of your M3DB seed nodes, and the name and retention values of the namespace you set up. You can leave the namespace storage metrics type as `unaggregated` since it's required by default to have a cluster that receives all Prometheus metrics unaggregated. In the future you might also want to aggregate and downsample metrics for longer retention, and you can come back and update the config once you've setup those clusters. You can read more about our aggregation functionality [here](../how_to/query.md). It should look something like: diff --git a/docs/operational_guide/namespace_configuration.md b/docs/operational_guide/namespace_configuration.md index e7092f1ed9..ba0ee109b4 100644 --- a/docs/operational_guide/namespace_configuration.md +++ b/docs/operational_guide/namespace_configuration.md @@ -2,7 +2,81 @@ ## Introduction -Namespaces in M3DB are analogous to tables in other databases. Each namespace has a unique name as well as distinct configuration with regards to data retention and blocksize. For more information about namespaces, read our [storage engine documentation](../architecture/engine.md). +Namespaces in M3DB are analogous to tables in other databases. Each namespace has a unique name as well as distinct configuration with regards to data retention and blocksize. For more information about namespaces and the technical details of their implementation, read our [storage engine documentation](../m3db/architecture/engine.md). + +## Namespace Operations + +The operations below include sample cURLs, but you can always review the API documentation by navigating to + +`http://:/api/v1/openapi` or our [online API documentation](https://m3db.io/openapi/). + +### Adding a Namespace + +#### Recommended (Easy way) + +The recommended way to add a namespace to M3DB is to use our `api/v1/database/namespace` endpoint. This API abstracts over a lot of the complexity of configuring a namespace and requires only two pieces of configuration to be provided: the name of the namespace, as well as its retention. + +For example, the following cURL: + +```bash +curl -X POST :api/v1/database/namespace/create -d '{ + "namespaceName": "default_unaggregated", + "retentionTime": "24h" +}' +``` + +will create a namespace called `default_unaggregated` with a retention of `24 hours`. All of the other namespace options will either use reasonable default values or be calculated based on the provided `retentionTime`. + +Adding a namespace does not require restarting M3DB, but will require modifying the M3Coordinator configuration to include the new namespace, and then restarting it. + +If you feel the need to configure the namespace options yourself (for performance or other reasons), read the `Advanced` section below. + +#### Advanced (Hard Way) + +The "advanced" API allows you to configure every aspect of the namespace that you're adding which can sometimes be helpful for development, debugging, and tuning clusters for maximum performance. +Adding a namespace is a simple as using the `POST` `api/v1/namespace` API on an M3Coordinator instance. + +``` +curl -X POST :api/v1/namespace -d '{ + "name": "default_unaggregated", + "options": { + "bootstrapEnabled": true, + "flushEnabled": true, + "writesToCommitLog": true, + "cleanupEnabled": true, + "snapshotEnabled": true, + "repairEnabled": false, + "retentionOptions": { + "retentionPeriodDuration": "2d", + "blockSizeDuration": "2h", + "bufferFutureDuration": "10m", + "bufferPastDuration": "10m", + "blockDataExpiry": true, + "blockDataExpiryAfterNotAccessPeriodDuration": "5m" + }, + "indexOptions": { + "enabled": true, + "blockSizeDuration": "4h" + } + } +}' +``` + +Adding a namespace does not require restarting M3DB, but will require modifying the M3Coordinator configuration to include the new namespace, and then restarting it. + +### Deleting a Namespace + +Deleting a namespace is a simple as using the `DELETE` `/api/v1/namespace` API on an M3Coordinator instance. + +`curl -X DELETE :/api/v1/namespace/` + +Note that deleting a namespace will not have any effect on the M3DB nodes until they are all restarted. In addition, the namespace will need to be removed from the M3Coordinator configuration and then the M3Coordinator node will need to be restarted. + +### Modifying a Namespace + +There is currently no atomic namespace modification endpoint. Instead, you will need to delete a namespace and then add it back again with the same name, but modified settings. Review the individual namespace settings above to determine whether or not a given setting is safe to modify. For example, it is never safe to modify the blockSize of a namespace. + +Also, be very careful not to restart the M3DB nodes after deleting the namespace, but before adding it back. If you do this, the M3DB nodes may detect the existing data files on disk and delete them since they are not configured to retain that namespace. ## Namespace Attributes @@ -44,7 +118,7 @@ Can be modified without creating a new namespace: `yes` #### blockSize -This is the most important value to consider when tuning the performance of an M3DB namespace. Read the [storage engine documentation](../architecture/engine.md) for more details, but the basic idea is that larger blockSizes will use more memory, but achieve higher compression. Similarly, smaller blockSizes will use less memory, but have worse compression. +This is the most important value to consider when tuning the performance of an M3DB namespace. Read the [storage engine documentation](../../m3db/architecture/engine.md) for more details, but the basic idea is that larger blockSizes will use more memory, but achieve higher compression. Similarly, smaller blockSizes will use less memory, but have worse compression. Can be modified without creating a new namespace: `no` @@ -77,55 +151,3 @@ Can be modified without creating a new namespace: `yes` ### Index Options TODO - -## Namespace Operations - -The operations below include sample CURLs, but you can always review the API documentation by navigating to - -`http://:/api/v1/openapi` or our [online API documentation](https://m3db.io/openapi/). - -### Adding a Namespace - -Adding a namespace is a simple as using the `POST` `api/v1/namespace` API on an M3Coordinator instance. - -``` -curl -X POST :api/v1/namespace -d '{ - "name": "default_unaggregated", - "options": { - "bootstrapEnabled": true, - "flushEnabled": true, - "writesToCommitLog": true, - "cleanupEnabled": true, - "snapshotEnabled": true, - "repairEnabled": false, - "retentionOptions": { - "retentionPeriodDuration": "2d", - "blockSizeDuration": "2h", - "bufferFutureDuration": "10m", - "bufferPastDuration": "10m", - "blockDataExpiry": true, - "blockDataExpiryAfterNotAccessPeriodDuration": "5m" - }, - "indexOptions": { - "enabled": true, - "blockSizeDuration": "4h" - } - } -}' -``` - -Adding a namespace does not require restarting M3DB, but will require modifying the M3Coordinator configuration to include the new namespace, and then restarting it. - -### Deleting a Namespace - -Deleting a namespace is a simple as using the `DELETE` `/api/v1/namespace` API on an M3Coordinator instance. - -`curl -X DELETE :/api/v1/namespace/` - -Note that deleting a namespace will not have any effect on the M3DB nodes until they are all restarted. In addition, the namespace will need to be removed from the M3Coordinator configuration and then the M3Coordinator node will need to be restarted. - -### Modifying a Namespace - -There is currently no atomic namespace modification endpoint. Instead, you will need to delete a namespace and then add it back again with the same name, but modified settings. Review the individual namespace settings above to determine whether or not a given setting is safe to modify. For example,it is never safe to modify the blockSize of a namespace. - -Also, be very careful not to restart the M3DB nodes after deleting the namespace, but before adding it back. If you do this, the M3DB nodes may detect the existing data files on disk and delete them since they are not configured to retain that namespace. \ No newline at end of file diff --git a/scripts/docker-integration-tests/common.sh b/scripts/docker-integration-tests/common.sh index 3c4d133bad..254f08c18b 100644 --- a/scripts/docker-integration-tests/common.sh +++ b/scripts/docker-integration-tests/common.sh @@ -46,90 +46,44 @@ function wait_for_db_init { ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq ".namespaces | length")" == "0" ]' - echo "Adding namespace" - curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{ - "name": "agg", - "options": { - "bootstrapEnabled": true, - "flushEnabled": true, - "writesToCommitLog": true, - "cleanupEnabled": true, - "snapshotEnabled": true, - "repairEnabled": false, - "retentionOptions": { - "retentionPeriodDuration": "48h", - "blockSizeDuration": "2h", - "bufferFutureDuration": "10m", - "bufferPastDuration": "10m", - "blockDataExpiry": true, - "blockDataExpiryAfterNotAccessPeriodDuration": "5m" - }, - "indexOptions": { - "enabled": true, - "blockSizeDuration": "2h" + echo "Adding placement and agg namespace" + curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/database/create -d '{ + "type": "cluster", + "namespaceName": "agg", + "retentionTime": "24h", + "replicationFactor": 1, + "hosts": [ + { + "id": "m3db_local", + "isolation_group": "rack-a", + "zone": "embedded", + "weight": 1024, + "address": "dbnode01", + "port": 9000 } - } + ] }' - echo "Wait until namespace is init'd" + echo "Wait until placement is init'd" ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.agg.indexOptions.enabled)" == true ]' - - curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/namespace -d '{ - "name": "unagg", - "options": { - "bootstrapEnabled": true, - "flushEnabled": true, - "writesToCommitLog": true, - "cleanupEnabled": true, - "snapshotEnabled": true, - "repairEnabled": false, - "retentionOptions": { - "retentionPeriodDuration": "48h", - "blockSizeDuration": "2h", - "bufferFutureDuration": "10m", - "bufferPastDuration": "10m", - "blockDataExpiry": true, - "blockDataExpiryAfterNotAccessPeriodDuration": "5m" - }, - "indexOptions": { - "enabled": true, - "blockSizeDuration": "2h" - } - } - }' + '[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | jq .placement.instances.m3db_local.id)" == \"m3db_local\" ]' - echo "Sleep until namespace is init'd" + echo "Wait until agg namespace is init'd" ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.unagg.indexOptions.enabled)" == true ]' + '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.agg.indexOptions.enabled)" == true ]' - echo "Placement initialization" - curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/placement/init -d '{ - "num_shards": 64, - "replication_factor": 1, - "instances": [ - { - "id": "m3db_local", - "isolation_group": "rack-a", - "zone": "embedded", - "weight": 1024, - "endpoint": "dbnode01:9000", - "hostname": "dbnode01", - "port": 9000 - } - ] + echo "Adding unagg namespace" + curl -vvvsSf -X POST 0.0.0.0:7201/api/v1/database/namespace/create -d '{ + "namespaceName": "unagg", + "retentionTime": "24h" }' - echo "Sleep until placement is init'd" + echo "Wait until unagg namespace is init'd" ATTEMPTS=4 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | jq .placement.instances.m3db_local.id)" == \"m3db_local\" ]' + '[ "$(curl -sSf 0.0.0.0:7201/api/v1/namespace | jq .registry.namespaces.unagg.indexOptions.enabled)" == true ]' - echo "Sleep until bootstrapped" + echo "Wait until bootstrapped" ATTEMPTS=10 TIMEOUT=2 retry_with_backoff \ '[ "$(curl -sSf 0.0.0.0:9002/health | jq .bootstrapped)" == true ]' - - echo "Waiting until shards are marked as available" - ATTEMPTS=10 TIMEOUT=1 retry_with_backoff \ - '[ "$(curl -sSf 0.0.0.0:7201/api/v1/placement | grep -c INITIALIZING)" -eq 0 ]' } diff --git a/src/query/api/v1/handler/database/common.go b/src/query/api/v1/handler/database/common.go index d941d2a1fa..64814fe9e0 100644 --- a/src/query/api/v1/handler/database/common.go +++ b/src/query/api/v1/handler/database/common.go @@ -45,7 +45,11 @@ func RegisterRoutes( ) { logged := logging.WithResponseTimeLogging - r.HandleFunc(CreateURL, logged(NewCreateHandler(client, cfg, embeddedDbCfg)).ServeHTTP).Methods(CreateHTTPMethod) + // Register the same handler under two different endpoints. This just makes explaining things in + // our documentation easier so we can separate out concepts, but share the underlying code. + createHandler := logged(NewCreateHandler(client, cfg, embeddedDbCfg)).ServeHTTP + r.HandleFunc(CreateURL, createHandler).Methods(CreateHTTPMethod) + r.HandleFunc(CreateNamespaceURL, createHandler).Methods(CreateNamespaceHTTPMethod) r.HandleFunc(ConfigGetBootstrappersURL, logged(NewConfigGetBootstrappersHandler(client)).ServeHTTP).Methods(ConfigGetBootstrappersHTTPMethod) r.HandleFunc(ConfigSetBootstrappersURL, logged(NewConfigSetBootstrappersHandler(client)).ServeHTTP).Methods(ConfigSetBootstrappersHTTPMethod) diff --git a/src/query/api/v1/handler/database/create.go b/src/query/api/v1/handler/database/create.go index e0072abe8f..5587d38082 100644 --- a/src/query/api/v1/handler/database/create.go +++ b/src/query/api/v1/handler/database/create.go @@ -32,6 +32,7 @@ import ( clusterclient "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/cluster/generated/proto/placementpb" + clusterplacement "github.com/m3db/m3/src/cluster/placement" dbconfig "github.com/m3db/m3/src/cmd/services/m3dbnode/config" "github.com/m3db/m3/src/cmd/services/m3query/config" dbnamespace "github.com/m3db/m3/src/dbnode/storage/namespace" @@ -39,7 +40,6 @@ import ( "github.com/m3db/m3/src/query/api/v1/handler/namespace" "github.com/m3db/m3/src/query/api/v1/handler/placement" "github.com/m3db/m3/src/query/generated/proto/admin" - "github.com/m3db/m3/src/query/util" "github.com/m3db/m3/src/query/util/logging" "github.com/m3db/m3/src/x/net/http" @@ -48,15 +48,28 @@ import ( ) const ( - // CreateURL is the url for the database create handler. + // CreateURL is the URL for the database create handler. CreateURL = handler.RoutePrefixV1 + "/database/create" - // CreateHTTPMethod is the HTTP method used with this resource. + // CreateNamespaceURL is the URL for the database namespace create handler. + CreateNamespaceURL = handler.RoutePrefixV1 + "/database/namespace/create" + + // CreateHTTPMethod is the HTTP method used with the create database resource. CreateHTTPMethod = http.MethodPost + // CreateNamespaceHTTPMethod is the HTTP method used with the create database namespace resource. + CreateNamespaceHTTPMethod = http.MethodPost + // DefaultLocalHostID is the default local host ID when creating a database. DefaultLocalHostID = "m3db_local" + // DefaultLocalIsolationGroup is the default isolation group when creating a + // local database. + DefaultLocalIsolationGroup = "local" + + // DefaultLocalZone is the default zone when creating a local database. + DefaultLocalZone = "embedded" + idealDatapointsPerBlock = 720 blockSizeFromExpectedSeriesScalar = idealDatapointsPerBlock * int64(time.Hour) shardMultiplier = 64 @@ -107,13 +120,18 @@ var ( errMissingEmbeddedDBPort = errors.New("unable to get port from embedded database listen address") errMissingEmbeddedDBConfig = errors.New("unable to find local embedded database config") errMissingHostID = errors.New("missing host ID") + + errClusteredPlacementAlreadyExists = errors.New("cannot use database create API to modify clustered placements after they are instantiated. Use the placement APIs directly to make placement changes, or remove the list of hosts from the request to add a namespace without modifying the placement") + errCantReplaceLocalPlacementWithClustered = errors.New("cannot replace existing local placement with a clustered placement. Use the placement APIs directly to make placement changes, or remove the `type` field from the request to add a namespace without modifying the existing local placement") ) type dbType string type createHandler struct { placementInitHandler *placement.InitHandler + placementGetHandler *placement.GetHandler namespaceAddHandler *namespace.AddHandler + namespaceGetHandler *namespace.GetHandler namespaceDeleteHandler *namespace.DeleteHandler embeddedDbCfg *dbconfig.DBConfiguration } @@ -124,52 +142,77 @@ func NewCreateHandler( cfg config.Configuration, embeddedDbCfg *dbconfig.DBConfiguration, ) http.Handler { + placementHandlerOptions := placement.HandlerOptions{ + ClusterClient: client, + Config: cfg, + } return &createHandler{ - placementInitHandler: placement.NewInitHandler( - placement.HandlerOptions{ClusterClient: client, Config: cfg}), + placementInitHandler: placement.NewInitHandler(placementHandlerOptions), + placementGetHandler: placement.NewGetHandler(placementHandlerOptions), namespaceAddHandler: namespace.NewAddHandler(client), + namespaceGetHandler: namespace.NewGetHandler(client), namespaceDeleteHandler: namespace.NewDeleteHandler(client), embeddedDbCfg: embeddedDbCfg, } } func (h *createHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - logger := logging.WithContext(ctx) + var ( + ctx = r.Context() + logger = logging.WithContext(ctx) + ) - namespaceRequest, placementRequest, rErr := h.parseRequest(r) + currPlacement, _, err := h.placementGetHandler.Get(placement.M3DBServiceName, nil) + if err != nil { + logger.Error("unable to get placement", zap.Error(err)) + xhttp.Error(w, err, http.StatusInternalServerError) + return + } + + parsedReq, namespaceRequest, placementRequest, rErr := h.parseAndValidateRequest(r, currPlacement) if rErr != nil { logger.Error("unable to parse request", zap.Any("error", rErr)) xhttp.Error(w, rErr.Inner(), rErr.Code()) return } - nsRegistry, err := h.namespaceAddHandler.Add(namespaceRequest) + currPlacement, badRequest, err := h.maybeInitPlacement(currPlacement, parsedReq, placementRequest, r) if err != nil { - logger.Error("unable to add namespace", zap.Any("error", err)) - xhttp.Error(w, err, http.StatusInternalServerError) + logger.Error("unable to initialize placement", zap.Error(err)) + status := http.StatusBadRequest + if !badRequest { + status = http.StatusInternalServerError + } + xhttp.Error(w, err, status) return } - initPlacement, err := h.placementInitHandler.Init(placement.M3DBServiceName, r, placementRequest) + nsRegistry, err := h.namespaceGetHandler.Get() if err != nil { - // Attempt to delete the namespace that was just created to maintain idempotency. - nsDeleteErr := h.namespaceDeleteHandler.Delete(namespaceRequest.Name) - if nsDeleteErr != nil { - logger.Error( - "unable to delete namespace we just added", - zap.Any("originalError", err), - zap.Any("namespaceDeleteError", nsDeleteErr)) - xhttp.Error(w, err, http.StatusInternalServerError) - return - } + logger.Error("unable to retrieve existing namespaces", zap.Error(err)) + xhttp.Error(w, err, http.StatusInternalServerError) + return + } - logger.Error("unable to initialize placement", zap.Any("error", err)) + // TODO(rartoul): Add test for NS exists. + _, nsExists := nsRegistry.Namespaces[namespaceRequest.Name] + if nsExists { + err := fmt.Errorf( + "unable to create namespace: %s because it already exists", + namespaceRequest.Name) + logger.Error("unable to create namespace", zap.Error(err)) + xhttp.Error(w, err, http.StatusBadRequest) + return + } + + nsRegistry, err = h.namespaceAddHandler.Add(namespaceRequest) + if err != nil { + logger.Error("unable to add namespace", zap.Error(err)) xhttp.Error(w, err, http.StatusInternalServerError) return } - placementProto, err := initPlacement.Proto() + placementProto, err := currPlacement.Proto() if err != nil { logger.Error("unable to get placement protobuf", zap.Any("error", err)) xhttp.Error(w, err, http.StatusInternalServerError) @@ -188,43 +231,132 @@ func (h *createHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { xhttp.WriteProtoMsgJSONResponse(w, resp, logger) } -func (h *createHandler) parseRequest(r *http.Request) (*admin.NamespaceAddRequest, *admin.PlacementInitRequest, *xhttp.ParseError) { +func (h *createHandler) maybeInitPlacement( + currPlacement clusterplacement.Placement, + parsedReq *admin.DatabaseCreateRequest, + placementRequest *admin.PlacementInitRequest, + r *http.Request, +) (clusterplacement.Placement, bool, error) { + if currPlacement == nil { + // If we're here then there is no existing placement, so just create it. This is safe because in + // the case where a placement did not already exist, the parse function above validated that we + // have all the required information to create a placement. + newPlacement, err := h.placementInitHandler.Init(placement.M3DBServiceName, r, placementRequest) + if err != nil { + return nil, false, err + } + + return newPlacement, false, nil + } + + // NB(rartoul): Pardon the branchiness, making sure every permutation is "reasoned" through for + // the scenario where the placement already exists. + switch dbType(parsedReq.Type) { + case dbTypeCluster: + if placementRequest != nil { + // If the caller has specified a desired clustered placement and a placement already exists, + // throw an error because the create database API should not be used for modifying clustered + // placements. Instead, they should use the placement APIs. + return nil, true, errClusteredPlacementAlreadyExists + } + + if placementIsLocal(currPlacement) { + // If the caller has specified that they desire a clustered placement (without specifying hosts) + // and a local placement already exists then throw an error because we can't ignore their request + // and we also can't convert a local placement to a clustered one. + return nil, true, errCantReplaceLocalPlacementWithClustered + } + + // This is fine because we'll just assume they want to keep the same clustered placement + // that they already have because they didn't specify any hosts. + return currPlacement, false, nil + case dbTypeLocal: + if !placementIsLocal(currPlacement) { + // If the caller has specified that they desire a local placement and a clustered placement + // already exists then throw an error because we can't ignore their request and we also can't + // convert a clustered placement to a local one. + return nil, true, errCantReplaceLocalPlacementWithClustered + } + + // This is fine because we'll just assume they want to keep the same local placement + // that they already have. + return currPlacement, false, nil + case "": + // This is fine because we'll just assume they want to keep the same placement that they already + // have. + return currPlacement, false, nil + default: + // Invalid dbType. + return nil, true, fmt.Errorf("unknown database type: %s", parsedReq.Type) + } +} + +func (h *createHandler) parseAndValidateRequest( + r *http.Request, + existingPlacement clusterplacement.Placement, +) (*admin.DatabaseCreateRequest, *admin.NamespaceAddRequest, *admin.PlacementInitRequest, *xhttp.ParseError) { + requirePlacement := existingPlacement == nil + defer r.Body.Close() rBody, err := xhttp.DurationToNanosBytes(r.Body) if err != nil { - return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) + wrapped := fmt.Errorf("error converting duration to nano bytes: %s", err.Error()) + return nil, nil, nil, xhttp.NewParseError(wrapped, http.StatusBadRequest) } dbCreateReq := new(admin.DatabaseCreateRequest) if err := jsonpb.Unmarshal(bytes.NewReader(rBody), dbCreateReq); err != nil { - return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) + return nil, nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) } - // Required fields - if util.HasEmptyString(dbCreateReq.NamespaceName, dbCreateReq.Type) { - return nil, nil, xhttp.NewParseError(errMissingRequiredField, http.StatusBadRequest) + if dbCreateReq.NamespaceName == "" { + err := fmt.Errorf("%s: namespaceName", errMissingRequiredField) + return nil, nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) } - if dbType(dbCreateReq.Type) == dbTypeCluster && len(dbCreateReq.Hosts) == 0 { - return nil, nil, xhttp.NewParseError(errMissingRequiredField, http.StatusBadRequest) + requestedDBType := dbType(dbCreateReq.Type) + if requirePlacement && + requestedDBType == dbTypeCluster && + len(dbCreateReq.Hosts) == 0 { + return nil, nil, nil, xhttp.NewParseError(errMissingRequiredField, http.StatusBadRequest) } - namespaceAddRequest, err := defaultedNamespaceAddRequest(dbCreateReq) + namespaceAddRequest, err := defaultedNamespaceAddRequest(dbCreateReq, existingPlacement) if err != nil { - return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) + return nil, nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) } - placementInitRequest, err := defaultedPlacementInitRequest(dbCreateReq, h.embeddedDbCfg) - if err != nil { - return nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) + + var placementInitRequest *admin.PlacementInitRequest + if (requestedDBType == dbTypeCluster && len(dbCreateReq.Hosts) > 0) || + requestedDBType == dbTypeLocal { + placementInitRequest, err = defaultedPlacementInitRequest(dbCreateReq, h.embeddedDbCfg) + if err != nil { + return nil, nil, nil, xhttp.NewParseError(err, http.StatusBadRequest) + } } - return namespaceAddRequest, placementInitRequest, nil + return dbCreateReq, namespaceAddRequest, placementInitRequest, nil } -func defaultedNamespaceAddRequest(r *admin.DatabaseCreateRequest) (*admin.NamespaceAddRequest, error) { - opts := dbnamespace.NewOptions() +func defaultedNamespaceAddRequest( + r *admin.DatabaseCreateRequest, + existingPlacement clusterplacement.Placement, +) (*admin.NamespaceAddRequest, error) { + var ( + opts = dbnamespace.NewOptions() + dbType = dbType(r.Type) + ) + if dbType == "" && existingPlacement != nil { + // If they didn't provide a database type, infer it from the + // existing placement. + if placementIsLocal(existingPlacement) { + dbType = dbTypeLocal + } else { + dbType = dbTypeCluster + } + } - switch dbType(r.Type) { + switch dbType { case dbTypeLocal, dbTypeCluster: opts = opts.SetRepairEnabled(false) retentionOpts := opts.RetentionOptions() @@ -330,8 +462,8 @@ func defaultedPlacementInitRequest( instances = []*placementpb.Instance{ &placementpb.Instance{ Id: DefaultLocalHostID, - IsolationGroup: "local", - Zone: "embedded", + IsolationGroup: DefaultLocalIsolationGroup, + Zone: DefaultLocalZone, Weight: 1, Endpoint: fmt.Sprintf("127.0.0.1:%d", port), Hostname: "localhost", @@ -396,6 +528,14 @@ func defaultedPlacementInitRequest( }, nil } +func placementIsLocal(p clusterplacement.Placement) bool { + existingInstances := p.Instances() + return len(existingInstances) == 1 && + existingInstances[0].ID() == DefaultLocalHostID && + existingInstances[0].IsolationGroup() == DefaultLocalIsolationGroup && + existingInstances[0].Zone() == DefaultLocalZone +} + func portFromEmbeddedDBConfigListenAddress(address string) (int, error) { colonIdx := strings.LastIndex(address, ":") if colonIdx == -1 || colonIdx == len(address)-1 { diff --git a/src/query/api/v1/handler/database/create_test.go b/src/query/api/v1/handler/database/create_test.go index 2af61f8b47..c5afa72dbc 100644 --- a/src/query/api/v1/handler/database/create_test.go +++ b/src/query/api/v1/handler/database/create_test.go @@ -76,6 +76,18 @@ func SetupDatabaseTest( } func TestLocalType(t *testing.T) { + testLocalType(t, "local", false) +} + +func TestLocalTypePlacementAlreadyExists(t *testing.T) { + testLocalType(t, "local", true) +} + +func TestLocalTypePlacementAlreadyExistsNoTypeProvided(t *testing.T) { + testLocalType(t, "", true) +} + +func testLocalType(t *testing.T, providedType string, placementExists bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -83,17 +95,17 @@ func TestLocalType(t *testing.T) { createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg) w := httptest.NewRecorder() - jsonInput := ` + jsonInput := fmt.Sprintf(` { "namespaceName": "testNamespace", - "type": "local" + "type": "%s" } - ` + `, providedType) req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) require.NotNil(t, req) - mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound) + mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound).Times(2) mockKV.EXPECT().CheckAndSet(namespace.M3DBNodeNamespacesKey, gomock.Any(), gomock.Not(nil)).Return(1, nil) placementProto := &placementpb.Placement{ @@ -111,7 +123,13 @@ func TestLocalType(t *testing.T) { } newPlacement, err := placement.NewPlacementFromProto(placementProto) require.NoError(t, err) - mockPlacementService.EXPECT().BuildInitialPlacement(gomock.Any(), 64, 1).Return(newPlacement, nil) + + if placementExists { + mockPlacementService.EXPECT().Placement().Return(newPlacement, nil) + } else { + mockPlacementService.EXPECT().Placement().Return(nil, kv.ErrNotFound) + mockPlacementService.EXPECT().BuildInitialPlacement(gomock.Any(), 64, 1).Return(newPlacement, nil) + } createHandler.ServeHTTP(w, req) @@ -178,6 +196,50 @@ func TestLocalType(t *testing.T) { xtest.Diff(mustPrettyJSON(t, expectedResponse), mustPrettyJSON(t, string(body)))) } +func TestLocalTypeClusteredPlacementAlreadyExists(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl) + createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg) + w := httptest.NewRecorder() + + jsonInput := ` + { + "namespaceName": "testNamespace", + "type": "local" + } + ` + + req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) + require.NotNil(t, req) + + placementProto := &placementpb.Placement{ + Instances: map[string]*placementpb.Instance{ + "localhost": &placementpb.Instance{ + Id: "m3db_not_local", + IsolationGroup: "local", + Zone: "embedded", + Weight: 1, + Endpoint: "http://localhost:9000", + Hostname: "localhost", + Port: 9000, + }, + }, + } + newPlacement, err := placement.NewPlacementFromProto(placementProto) + require.NoError(t, err) + + mockPlacementService.EXPECT().Placement().Return(newPlacement, nil) + + createHandler.ServeHTTP(w, req) + + resp := w.Result() + _, err = ioutil.ReadAll(resp.Body) + assert.NoError(t, err) + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) +} + func TestLocalTypeWithNumShards(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -197,7 +259,7 @@ func TestLocalTypeWithNumShards(t *testing.T) { req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) require.NotNil(t, req) - mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound) + mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound).Times(2) mockKV.EXPECT().CheckAndSet(namespace.M3DBNodeNamespacesKey, gomock.Any(), gomock.Not(nil)).Return(1, nil) placementProto := &placementpb.Placement{ @@ -215,6 +277,7 @@ func TestLocalTypeWithNumShards(t *testing.T) { } newPlacement, err := placement.NewPlacementFromProto(placementProto) require.NoError(t, err) + mockPlacementService.EXPECT().Placement().Return(nil, kv.ErrNotFound) mockPlacementService.EXPECT().BuildInitialPlacement(gomock.Any(), 51, 1).Return(newPlacement, nil) createHandler.ServeHTTP(w, req) @@ -300,7 +363,7 @@ func TestLocalWithBlockSizeNanos(t *testing.T) { req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) require.NotNil(t, req) - mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound) + mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound).Times(2) mockKV.EXPECT().CheckAndSet(namespace.M3DBNodeNamespacesKey, gomock.Any(), gomock.Not(nil)).Return(1, nil) placementProto := &placementpb.Placement{ @@ -318,6 +381,7 @@ func TestLocalWithBlockSizeNanos(t *testing.T) { } newPlacement, err := placement.NewPlacementFromProto(placementProto) require.NoError(t, err) + mockPlacementService.EXPECT().Placement().Return(nil, kv.ErrNotFound) mockPlacementService.EXPECT().BuildInitialPlacement(gomock.Any(), 64, 1).Return(newPlacement, nil) createHandler.ServeHTTP(w, req) @@ -407,7 +471,7 @@ func TestLocalWithBlockSizeExpectedSeriesDatapointsPerHour(t *testing.T) { req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) require.NotNil(t, req) - mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound) + mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound).Times(2) mockKV.EXPECT().CheckAndSet(namespace.M3DBNodeNamespacesKey, gomock.Any(), gomock.Not(nil)).Return(1, nil) placementProto := &placementpb.Placement{ @@ -425,6 +489,7 @@ func TestLocalWithBlockSizeExpectedSeriesDatapointsPerHour(t *testing.T) { } newPlacement, err := placement.NewPlacementFromProto(placementProto) require.NoError(t, err) + mockPlacementService.EXPECT().Placement().Return(nil, kv.ErrNotFound) mockPlacementService.EXPECT().BuildInitialPlacement(gomock.Any(), 64, 1).Return(newPlacement, nil) createHandler.ServeHTTP(w, req) @@ -494,10 +559,18 @@ func TestLocalWithBlockSizeExpectedSeriesDatapointsPerHour(t *testing.T) { } func TestClusterTypeHosts(t *testing.T) { + testClusterTypeHosts(t, false) +} + +func TestClusterTypeHostsNotProvided(t *testing.T) { + testClusterTypeHosts(t, true) +} + +func TestClusterTypeHostsPlacementAlreadyExistsHostsProvided(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) + mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl) createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg) w := httptest.NewRecorder() @@ -512,7 +585,116 @@ func TestClusterTypeHosts(t *testing.T) { req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) require.NotNil(t, req) - mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound) + placementProto := &placementpb.Placement{ + Instances: map[string]*placementpb.Instance{ + "host1": &placementpb.Instance{ + Id: "host1", + IsolationGroup: "cluster", + Zone: "embedded", + Weight: 1, + Endpoint: "http://host1:9000", + Hostname: "host1", + Port: 9000, + }, + "host2": &placementpb.Instance{ + Id: "host2", + IsolationGroup: "cluster", + Zone: "embedded", + Weight: 1, + Endpoint: "http://host2:9000", + Hostname: "host2", + Port: 9000, + }, + }, + } + newPlacement, err := placement.NewPlacementFromProto(placementProto) + require.NoError(t, err) + + mockPlacementService.EXPECT().Placement().Return(newPlacement, nil) + + createHandler.ServeHTTP(w, req) + + resp := w.Result() + _, err = ioutil.ReadAll(resp.Body) + assert.NoError(t, err) + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) +} + +func TestClusterTypeHostsPlacementAlreadyExistsExistingIsLocal(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl) + createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg) + w := httptest.NewRecorder() + + jsonInput := ` + { + "namespaceName": "testNamespace", + "type": "cluster" + } + ` + + req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) + require.NotNil(t, req) + + placementProto := &placementpb.Placement{ + Instances: map[string]*placementpb.Instance{ + "localhost": &placementpb.Instance{ + Id: DefaultLocalHostID, + IsolationGroup: "local", + Zone: "embedded", + Weight: 1, + Endpoint: "http://localhost:9000", + Hostname: "localhost", + Port: 9000, + }, + }, + } + newPlacement, err := placement.NewPlacementFromProto(placementProto) + require.NoError(t, err) + + mockPlacementService.EXPECT().Placement().Return(newPlacement, nil) + + createHandler.ServeHTTP(w, req) + + resp := w.Result() + _, err = ioutil.ReadAll(resp.Body) + assert.NoError(t, err) + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) +} + +func testClusterTypeHosts(t *testing.T, placementExists bool) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient, mockKV, mockPlacementService := SetupDatabaseTest(t, ctrl) + createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg) + w := httptest.NewRecorder() + + var jsonInput string + + if placementExists { + jsonInput = ` + { + "namespaceName": "testNamespace", + "type": "cluster" + } + ` + } else { + jsonInput = ` + { + "namespaceName": "testNamespace", + "type": "cluster", + "hosts": [{"id": "host1"}, {"id": "host2"}] + } + ` + } + + req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) + require.NotNil(t, req) + + mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound).Times(2) mockKV.EXPECT().CheckAndSet(namespace.M3DBNodeNamespacesKey, gomock.Any(), gomock.Not(nil)).Return(1, nil) placementProto := &placementpb.Placement{ @@ -539,7 +721,13 @@ func TestClusterTypeHosts(t *testing.T) { } newPlacement, err := placement.NewPlacementFromProto(placementProto) require.NoError(t, err) - mockPlacementService.EXPECT().BuildInitialPlacement(gomock.Any(), 128, 3).Return(newPlacement, nil) + + if placementExists { + mockPlacementService.EXPECT().Placement().Return(newPlacement, nil) + } else { + mockPlacementService.EXPECT().Placement().Return(nil, kv.ErrNotFound) + mockPlacementService.EXPECT().BuildInitialPlacement(gomock.Any(), 128, 3).Return(newPlacement, nil) + } createHandler.ServeHTTP(w, req) @@ -636,7 +824,7 @@ func TestClusterTypeHostsWithIsolationGroup(t *testing.T) { req := httptest.NewRequest("POST", "/database/create", strings.NewReader(jsonInput)) require.NotNil(t, req) - mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound) + mockKV.EXPECT().Get(namespace.M3DBNodeNamespacesKey).Return(nil, kv.ErrNotFound).Times(2) mockKV.EXPECT().CheckAndSet(namespace.M3DBNodeNamespacesKey, gomock.Any(), gomock.Not(nil)).Return(1, nil) placementProto := &placementpb.Placement{ @@ -663,6 +851,7 @@ func TestClusterTypeHostsWithIsolationGroup(t *testing.T) { } newPlacement, err := placement.NewPlacementFromProto(placementProto) require.NoError(t, err) + mockPlacementService.EXPECT().Placement().Return(nil, kv.ErrNotFound) mockPlacementService.EXPECT().BuildInitialPlacement(gomock.Any(), 128, 3).Return(newPlacement, nil) createHandler.ServeHTTP(w, req) @@ -744,7 +933,9 @@ func TestClusterTypeMissingHostnames(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockClient, _, _ := SetupDatabaseTest(t, ctrl) + mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl) + mockPlacementService.EXPECT().Placement().Return(nil, kv.ErrNotFound) + createHandler := NewCreateHandler(mockClient, config.Configuration{}, testDBCfg) w := httptest.NewRecorder() @@ -771,7 +962,9 @@ func TestBadType(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mockClient, _, _ := SetupDatabaseTest(t, ctrl) + mockClient, _, mockPlacementService := SetupDatabaseTest(t, ctrl) + mockPlacementService.EXPECT().Placement().Return(nil, kv.ErrNotFound) + createHandler := NewCreateHandler(mockClient, config.Configuration{}, nil) w := httptest.NewRecorder() diff --git a/src/query/api/v1/handler/placement/get.go b/src/query/api/v1/handler/placement/get.go index ea2ffbf9aa..055b03f1c0 100644 --- a/src/query/api/v1/handler/placement/get.go +++ b/src/query/api/v1/handler/placement/get.go @@ -21,11 +21,13 @@ package placement import ( + "errors" "net/http" "path" "strconv" "time" + "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/query/api/v1/handler" "github.com/m3db/m3/src/query/generated/proto/admin" @@ -56,6 +58,8 @@ var ( // M3CoordinatorGetURL is the url for the placement get handler (with the GET method) // for the M3Coordinator service. M3CoordinatorGetURL = path.Join(handler.RoutePrefixV1, M3CoordinatorServicePlacementPathName) + + errPlacementDoesNotExist = errors.New("placement does not exist") ) // GetHandler is the handler for placement gets. @@ -70,34 +74,20 @@ func (h *GetHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *htt var ( ctx = r.Context() logger = logging.WithContext(ctx) - opts = NewServiceOptions( - serviceName, r.Header, h.M3AggServiceOptions) ) - service, err := Service(h.ClusterClient, opts, h.nowFn(), nil) - if err != nil { - xhttp.Error(w, err, http.StatusInternalServerError) + placement, badRequest, err := h.Get(serviceName, r) + if err != nil && badRequest { + xhttp.Error(w, err, http.StatusBadRequest) return } - - var placement placement.Placement - var version int - status := http.StatusNotFound - if vs := r.FormValue("version"); vs != "" { - version, err = strconv.Atoi(vs) - if err == nil { - placement, err = service.PlacementForVersion(version) - } else { - status = http.StatusBadRequest - } - } else { - placement, err = service.Placement() - } - if err != nil { - xhttp.Error(w, err, status) + xhttp.Error(w, err, http.StatusNotFound) return } + if placement == nil { + xhttp.Error(w, errPlacementDoesNotExist, http.StatusNotFound) + } placementProto, err := placement.Proto() if err != nil { @@ -113,3 +103,53 @@ func (h *GetHandler) ServeHTTP(serviceName string, w http.ResponseWriter, r *htt xhttp.WriteProtoMsgJSONResponse(w, resp, logger) } + +// Get gets a placement. +func (h *GetHandler) Get( + serviceName string, + httpReq *http.Request, +) (placement placement.Placement, badRequest bool, err error) { + var headers http.Header + if httpReq != nil { + headers = httpReq.Header + } + + opts := NewServiceOptions( + serviceName, headers, h.M3AggServiceOptions) + + service, err := Service(h.ClusterClient, opts, h.nowFn(), nil) + if err != nil { + return nil, false, err + } + + var ( + version int + vs string + ) + if httpReq != nil { + vs = httpReq.FormValue("version") + } + + if vs != "" { + version, err = strconv.Atoi(vs) + if err == nil { + placement, err = service.PlacementForVersion(version) + } else { + badRequest = true + } + } else { + placement, err = service.Placement() + } + + if err == kv.ErrNotFound { + // TODO(rartoul): This should probably be handled at the service + // level but that would be a large refactor. + return nil, false, nil + } + + if err != nil { + return nil, badRequest, err + } + + return placement, badRequest, nil +} diff --git a/src/x/net/http/convert.go b/src/x/net/http/convert.go index a2b4988624..ca2220ea6b 100644 --- a/src/x/net/http/convert.go +++ b/src/x/net/http/convert.go @@ -23,6 +23,7 @@ package xhttp import ( "encoding/json" "errors" + "fmt" "io" "strings" "time" @@ -43,15 +44,19 @@ func DurationToNanosBytes(r io.Reader) ([]byte, error) { d := json.NewDecoder(r) d.UseNumber() if err := d.Decode(&dict); err != nil { - return nil, err + return nil, fmt.Errorf("error decoding JSON: %s", err.Error()) } ret, err := DurationToNanosMap(dict) if err != nil { - return nil, err + return nil, fmt.Errorf("error converting duration to nanos: %s", err.Error()) } - return json.Marshal(ret) + b, err := json.Marshal(ret) + if err != nil { + return nil, fmt.Errorf("error unmarshaling JSON: %s", err.Error()) + } + return b, nil } // DurationToNanosMap transforms keys with a Duration into Nanos