Skip to content

Commit

Permalink
event sink crud operation api (#9155)
Browse files Browse the repository at this point in the history
* network sink rpc/api plumbing

state store methods and restore

upsert sink test

get sink

delete sink

event sink list and tests

go generate new msg types

validate sink on upsert

* go generate
  • Loading branch information
drewbailey authored Oct 23, 2020
1 parent d002a8f commit fbb199d
Show file tree
Hide file tree
Showing 15 changed files with 915 additions and 72 deletions.
97 changes: 97 additions & 0 deletions command/agent/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,103 @@ import (
"golang.org/x/sync/errgroup"
)

func (s *HTTPServer) EventSinksRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != http.MethodGet {
return nil, CodedError(405, ErrInvalidMethod)
}

args := structs.EventSinkListRequest{}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}

var out structs.EventSinkListResponse
if err := s.agent.RPC("Event.ListSinks", &args, &out); err != nil {
return nil, err
}

if out.Sinks == nil {
out.Sinks = make([]*structs.EventSink, 0)
}
setMeta(resp, &out.QueryMeta)
return out.Sinks, nil
}

func (s *HTTPServer) EventSinkSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
name := strings.TrimPrefix(req.URL.Path, "/v1/event/sink/")
if len(name) == 0 {
return nil, CodedError(400, "Missing Policy Name")
}
switch req.Method {
case http.MethodGet:
return s.eventSinkGet(resp, req, name)
case http.MethodPost, http.MethodPut:
return s.eventSinkUpdate(resp, req, name)
case http.MethodDelete:
return s.eventSinkDelete(resp, req, name)
default:
return nil, CodedError(405, ErrInvalidMethod)
}
}

func (s *HTTPServer) eventSinkGet(resp http.ResponseWriter, req *http.Request, sink string) (interface{}, error) {
args := structs.EventSinkSpecificRequest{
ID: sink,
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}

var out structs.EventSinkResponse
if err := s.agent.RPC("Event.GetSink", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
if out.Sink == nil {
return nil, CodedError(404, "event sink not found")
}
return out.Sink, nil
}

func (s *HTTPServer) eventSinkUpdate(resp http.ResponseWriter, req *http.Request, sinkName string) (interface{}, error) {
var sink structs.EventSink
if err := decodeBody(req, &sink); err != nil {
return nil, CodedError(500, err.Error())
}

if sink.ID != sinkName {
return nil, CodedError(400, "Event sink name does not match request path")
}

args := structs.EventSinkUpsertRequest{
Sink: &sink,
}
s.parseWriteRequest(req, &args.WriteRequest)

var out structs.GenericResponse
if err := s.agent.RPC("Event.UpsertSink", &args, &out); err != nil {
return nil, err
}

setIndex(resp, out.Index)
return nil, nil
}

func (s *HTTPServer) eventSinkDelete(resp http.ResponseWriter, req *http.Request, sink string) (interface{}, error) {

args := structs.EventSinkDeleteRequest{
IDs: []string{sink},
}
s.parseWriteRequest(req, &args.WriteRequest)

var out structs.GenericResponse
if err := s.agent.RPC("Event.DeleteSink", &args, &out); err != nil {
return nil, err
}
setIndex(resp, out.Index)
return nil, nil
}

func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
query := req.URL.Query()

Expand Down
90 changes: 90 additions & 0 deletions command/agent/event_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"

"github.com/hashicorp/nomad/testutil"
Expand All @@ -22,6 +23,95 @@ type testEvent struct {
ID string
}

func TestHTTP_EventSinkList(t *testing.T) {
t.Parallel()

httpTest(t, nil, func(s *TestAgent) {
s1 := mock.EventSink()
s2 := mock.EventSink()

require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, s1))
require.NoError(t, s.Agent.server.State().UpsertEventSink(1001, s2))

req, err := http.NewRequest("GET", "/v1/event/sinks", nil)
require.NoError(t, err)

respW := httptest.NewRecorder()
obj, err := s.Server.EventSinksRequest(respW, req)
require.NoError(t, err)

require.Equal(t, "1001", respW.HeaderMap.Get("X-Nomad-Index"))

n := obj.([]*structs.EventSink)
require.Len(t, n, 2)
})
}

func TestHTTP_EventSinkGet(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
s1 := mock.EventSink()

require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, s1))

req, err := http.NewRequest("GET", "/v1/event/sink/"+s1.ID, nil)
require.NoError(t, err)

respW := httptest.NewRecorder()
obj, err := s.Server.EventSinkSpecificRequest(respW, req)
require.NoError(t, err)

require.Equal(t, "1000", respW.HeaderMap.Get("X-Nomad-Index"))

n := obj.(*structs.EventSink)
require.Equal(t, s1, n)
})
}

func TestHTTP_EventSinkUpsert(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
s1 := mock.EventSink()

buf := encodeReq(s1)

req, err := http.NewRequest("POST", "/v1/event/sink/"+s1.ID, buf)
require.NoError(t, err)

respW := httptest.NewRecorder()
_, err = s.Server.EventSinkSpecificRequest(respW, req)
require.NoError(t, err)

require.NotEqual(t, "", respW.HeaderMap.Get("X-Nomad-Index"))

state := s.Agent.server.State()
out, err := state.EventSinkByID(nil, s1.ID)
require.NoError(t, err)
require.Equal(t, s1.Address, out.Address)
require.Equal(t, s1.ID, out.ID)
})
}

func TestHTTP_EventSinkDelete(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
s1 := mock.EventSink()

require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, s1))

req, err := http.NewRequest("DELETE", "/v1/event/sink/"+s1.ID, nil)
require.NoError(t, err)

respW := httptest.NewRecorder()
_, err = s.Server.EventSinkSpecificRequest(respW, req)
require.NoError(t, err)

require.NotEqual(t, "", respW.HeaderMap.Get("X-Nomad-Index"))

state := s.Agent.server.State()
out, err := state.EventSinkByID(nil, s1.ID)
require.NoError(t, err)
require.Nil(t, out)
})
}

func TestEventStream(t *testing.T) {
t.Parallel()

Expand Down
2 changes: 2 additions & 0 deletions command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/operator/scheduler/configuration", s.wrap(s.OperatorSchedulerConfiguration))

s.mux.HandleFunc("/v1/event/stream", s.wrap(s.EventStream))
s.mux.HandleFunc("/v1/event/sinks", s.wrap(s.EventSinksRequest))
s.mux.HandleFunc("/v1/event/sink/", s.wrap(s.EventSinkSpecificRequest))

s.mux.HandleFunc("/v1/namespaces", s.wrap(s.NamespacesRequest))
s.mux.HandleFunc("/v1/namespace", s.wrap(s.NamespaceCreateRequest))
Expand Down
2 changes: 2 additions & 0 deletions helper/raftutil/msgtypes.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

147 changes: 147 additions & 0 deletions nomad/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"io/ioutil"
"time"

metrics "github.com/armon/go-metrics"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand All @@ -22,6 +25,150 @@ func (e *Event) register() {
e.srv.streamingRpcs.Register("Event.Stream", e.stream)
}

// ListSinks is used to list the event sinks registered in Nomad
func (e *Event) ListSinks(args *structs.EventSinkListRequest, reply *structs.EventSinkListResponse) error {
if done, err := e.srv.forward("Event.ListSinks", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "event", "list_sinks"}, time.Now())

if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowOperatorRead() {
return structs.ErrPermissionDenied
}

opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
iter, err := state.EventSinks(ws)
if err != nil {
return err
}

var sinks []*structs.EventSink
for {
raw := iter.Next()
if raw == nil {
break
}

sink := raw.(*structs.EventSink)
sinks = append(sinks, sink)
}
reply.Sinks = sinks

index, err := state.Index("event_sink")
if err != nil {
return err
}

// Ensure we never set the index to zero, otherwise a blocking query cannot be used.
// We floor the index at one, since realistically the first write must have a higher index.
if index == 0 {
index = 1
}

reply.Index = index
return nil
},
}

return e.srv.blockingRPC(&opts)
}

// UpsertSink is used to create or update an event sink
func (e *Event) UpsertSink(args *structs.EventSinkUpsertRequest, reply *structs.GenericResponse) error {
if done, err := e.srv.forward("Event.UpsertSink", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "event", "upsert_sink"}, time.Now())

if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.IsManagement() {
return structs.ErrPermissionDenied
}

if err := args.Sink.Validate(); err != nil {
return err
}

// Update via Raft
_, index, err := e.srv.raftApply(structs.EventSinkUpsertRequestType, args)
if err != nil {
return err
}

reply.Index = index
return nil
}

// GetSink returns the requested event sink
func (e *Event) GetSink(args *structs.EventSinkSpecificRequest, reply *structs.EventSinkResponse) error {
if done, err := e.srv.forward("Event.GetSink", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "event", "get_sink"}, time.Now())

if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowOperatorRead() {
return structs.ErrPermissionDenied
}

opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
run: func(ws memdb.WatchSet, state *state.StateStore) error {
s, err := state.EventSinkByID(ws, args.ID)
if err != nil {
return nil
}

reply.Sink = s

index, err := state.Index("event_sink")
if err != nil {
return err
}

if index == 0 {
index = 1
}

reply.Index = index
return nil
},
}

return e.srv.blockingRPC(&opts)
}

// DeleteSink deletes an event sink
func (e *Event) DeleteSink(args *structs.EventSinkDeleteRequest, reply *structs.GenericResponse) error {
if done, err := e.srv.forward("Event.DeleteSink", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "event", "delete_sink"}, time.Now())

if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.IsManagement() {
return structs.ErrPermissionDenied
}

// Update via Raft
_, index, err := e.srv.raftApply(structs.EventSinkDeleteRequestType, args)
if err != nil {
return err
}

reply.Index = index
return nil
}

func (e *Event) stream(conn io.ReadWriteCloser) {
defer conn.Close()

Expand Down
Loading

0 comments on commit fbb199d

Please sign in to comment.