Skip to content

Commit

Permalink
Add topic endpoints (#1060)
Browse files Browse the repository at this point in the history
  • Loading branch information
cw9 authored Oct 10, 2018
1 parent dcfd851 commit 668d176
Show file tree
Hide file tree
Showing 14 changed files with 1,368 additions and 4 deletions.
6 changes: 3 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import:
version: acc762bfdd42ecb192d34e48fa7ca1fd7ee088ac

- package: github.com/m3db/m3msg
version: 4851e2719e06b15f1fc247e1d00339192963990e
version: bb2c675f58f01d487fc0a64aa6c864bc1cc541c9

- package: github.com/m3db/bitset
version: 07973db6b78acb62ac207d0538055e874b49d90d
Expand Down
20 changes: 20 additions & 0 deletions scripts/development/m3_stack/start_m3.sh
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,26 @@ echo "Validating M3Coordinator topology"
[ "$(curl -sSf localhost:7201/api/v1/services/m3coordinator/placement | jq .placement.instances.coordinator01.id)" == '"coordinator01"' ]
echo "Done validating topology"

# Do this after placement for m3coordinator is created.
echo "Initializing m3msg topic for ingestion"
curl -vvvsSf -X POST localhost:7201/api/v1/topic/init -d '{
"numberOfShards": 64
}'

echo "Adding m3coordinator as a consumer to the topic"
curl -vvvsSf -X POST localhost:7201/api/v1/topic -d '{
"consumerService": {
"serviceId": {
"name": "m3coordinator",
"environment": "default_env",
"zone": "embedded"
},
"consumptionType": "SHARED",
"messageTtlNanos": "600000000000"
}
}'
# msgs will be discarded after 600000000000ns = 10mins

echo "Prometheus available at localhost:9090"
echo "Grafana available at localhost:3000"
echo "Run ./stop.sh to shutdown nodes when done"
114 changes: 114 additions & 0 deletions src/query/api/v1/handler/topic/add.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package topic

import (
"net/http"

"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/query/api/v1/handler"
"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/net/http"
clusterclient "github.com/m3db/m3cluster/client"
"github.com/m3db/m3msg/topic"

"go.uber.org/zap"
)

const (
// AddURL is the url for the topic add handler (with the POST method).
AddURL = handler.RoutePrefixV1 + "/topic"

// AddHTTPMethod is the HTTP method used with this resource.
AddHTTPMethod = http.MethodPost
)

// AddHandler is the handler for topic adds.
type AddHandler Handler

// NewAddHandler returns a new instance of AddHandler.
func NewAddHandler(client clusterclient.Client, cfg config.Configuration) *AddHandler {
return &AddHandler{client: client, cfg: cfg, serviceFn: Service}
}

func (h *AddHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var (
ctx = r.Context()
logger = logging.WithContext(ctx)
req admin.TopicAddRequest
)
rErr := parseRequest(r, &req)
if rErr != nil {
logger.Error("unable to parse request", zap.Any("error", rErr))
xhttp.Error(w, rErr.Inner(), rErr.Code())
return
}

service, err := h.serviceFn(h.client)
if err != nil {
logger.Error("unable to get service", zap.Any("error", err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}

t, err := service.Get(topicName(r.Header))
if err != nil {
logger.Error("unable to get topic", zap.Any("error", err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}

cs, err := topic.NewConsumerServiceFromProto(req.ConsumerService)
if err != nil {
logger.Error("unable to parse consumer service", zap.Any("error", err))
xhttp.Error(w, err, http.StatusBadRequest)
return
}

t, err = t.AddConsumerService(cs)
if err != nil {
logger.Error("unable to add consumer service", zap.Any("error", err))
xhttp.Error(w, err, http.StatusBadRequest)
return
}

t, err = service.CheckAndSet(t, t.Version())
if err != nil {
logger.Error("unable to persist consumer service", zap.Any("error", err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}

topicProto, err := topic.ToProto(t)
if err != nil {
logger.Error("unable to get topic protobuf", zap.Any("error", err))
xhttp.Error(w, err, http.StatusInternalServerError)
return
}

resp := &admin.TopicGetResponse{
Topic: topicProto,
Version: uint32(t.Version()),
}

xhttp.WriteProtoMsgJSONResponse(w, resp, logger)
}
101 changes: 101 additions & 0 deletions src/query/api/v1/handler/topic/add_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package topic

import (
"bytes"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/query/generated/proto/admin"
"github.com/m3db/m3msg/generated/proto/topicpb"
"github.com/m3db/m3msg/topic"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
)

func TestTopicAddHandler(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockService := setupTest(t, ctrl)
handler := NewAddHandler(nil, config.Configuration{})
handler.serviceFn = testServiceFn(mockService)

t1 := topic.NewTopic().SetName(DefaultTopicName).SetNumberOfShards(256)

addProto := admin.TopicAddRequest{
ConsumerService: &topicpb.ConsumerService{
ConsumptionType: topicpb.ConsumptionType_SHARED,
ServiceId: &topicpb.ServiceID{
Environment: "env1",
Zone: "zone1",
Name: "name1",
},
MessageTtlNanos: int64(5 * time.Minute),
},
}
w := httptest.NewRecorder()
b := bytes.NewBuffer(nil)
require.NoError(t, jsonMarshaler.Marshal(b, &addProto))
cs, err := topic.NewConsumerServiceFromProto(addProto.ConsumerService)
require.NoError(t, err)
t2, err := t1.AddConsumerService(cs)
require.NoError(t, err)
mockService.
EXPECT().
Get(gomock.Any()).
Return(t1, nil)
mockService.EXPECT().CheckAndSet(gomock.Any(), gomock.Any()).Return(t2.SetVersion(3), nil)
req := httptest.NewRequest("POST", "/topic", b)
require.NotNil(t, req)
handler.ServeHTTP(w, req)
resp := w.Result()
body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)

var respProto admin.TopicGetResponse
require.NoError(t, jsonUnmarshaler.Unmarshal(bytes.NewBuffer(body), &respProto))

validateEqualTopicProto(t, topicpb.Topic{
Name: DefaultTopicName,
NumberOfShards: 256,
ConsumerServices: []*topicpb.ConsumerService{
&topicpb.ConsumerService{
ConsumptionType: topicpb.ConsumptionType_SHARED,
ServiceId: &topicpb.ServiceID{
Environment: "env1",
Zone: "zone1",
Name: "name1",
},
MessageTtlNanos: int64(5 * time.Minute),
},
},
}, *respProto.Topic)

require.Equal(t, uint32(3), respProto.Version)
}
88 changes: 88 additions & 0 deletions src/query/api/v1/handler/topic/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package topic

import (
"net/http"
"strings"

"github.com/m3db/m3/src/cmd/services/m3query/config"
"github.com/m3db/m3/src/query/util/logging"
"github.com/m3db/m3/src/x/net/http"
clusterclient "github.com/m3db/m3cluster/client"
"github.com/m3db/m3msg/topic"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/gorilla/mux"
)

const (
// DefaultTopicName is the default topic name
DefaultTopicName = "aggregated_metrics"
// HeaderTopicName is the header used to specify the topic name.
HeaderTopicName = "topic-name"
)

type serviceFn func(clusterClient clusterclient.Client) (topic.Service, error)

// Handler represents a generic handler for topic endpoints.
type Handler struct {
// This is used by other topic Handlers
// nolint: structcheck
client clusterclient.Client
cfg config.Configuration

serviceFn serviceFn
}

// Service gets a topic service from m3cluster client
func Service(clusterClient clusterclient.Client) (topic.Service, error) {
return topic.NewService(
topic.NewServiceOptions().
SetConfigService(clusterClient),
)
}

// RegisterRoutes registers the topic routes
func RegisterRoutes(r *mux.Router, client clusterclient.Client, cfg config.Configuration) {
logged := logging.WithResponseTimeLogging

r.HandleFunc(InitURL, logged(NewInitHandler(client, cfg)).ServeHTTP).Methods(InitHTTPMethod)
r.HandleFunc(GetURL, logged(NewGetHandler(client, cfg)).ServeHTTP).Methods(GetHTTPMethod)
r.HandleFunc(AddURL, logged(NewAddHandler(client, cfg)).ServeHTTP).Methods(AddHTTPMethod)
}

func topicName(headers http.Header) string {
if v := strings.TrimSpace(headers.Get(HeaderTopicName)); v != "" {
return v
}
return DefaultTopicName
}

func parseRequest(r *http.Request, m proto.Message) *xhttp.ParseError {
defer r.Body.Close()

if err := jsonpb.Unmarshal(r.Body, m); err != nil {
return xhttp.NewParseError(err, http.StatusBadRequest)
}
return nil
}
Loading

0 comments on commit 668d176

Please sign in to comment.