diff --git a/nomad/event_endpoint.go b/nomad/event_endpoint.go index b04a0757ba8..97d0d978ed3 100644 --- a/nomad/event_endpoint.go +++ b/nomad/event_endpoint.go @@ -25,6 +25,7 @@ func (e *Event) register() { e.srv.streamingRpcs.Register("Event.Stream", e.stream) } +// ListSinks is used to list the event sinks registered in each namespace. func (e *Event) ListSinks(args *structs.EventSinkListRequest, reply *structs.EventSinkListResponse) error { if done, err := e.srv.forward("Event.ListSinks", args, args, reply); done { return err @@ -77,6 +78,7 @@ func (e *Event) ListSinks(args *structs.EventSinkListRequest, reply *structs.Eve return e.srv.blockingRPC(&opts) } +// UpsertSink is used to create or update an event sink func (e *Event) UpsertSink(args *structs.EventSinkUpsertRequest, reply *structs.GenericResponse) error { if done, err := e.srv.forward("Event.UpsertSink", args, args, reply); done { return err @@ -89,7 +91,9 @@ func (e *Event) UpsertSink(args *structs.EventSinkUpsertRequest, reply *structs. return structs.ErrPermissionDenied } - // TODO(drew) validate sink values + if err := args.Sink.Validate(); err != nil { + return err + } // Update via Raft _, index, err := e.srv.raftApply(structs.EventSinkUpsertRequestType, args) @@ -101,6 +105,7 @@ func (e *Event) UpsertSink(args *structs.EventSinkUpsertRequest, reply *structs. return nil } +// GetSink returns the requested event sink func (e *Event) GetSink(args *structs.EventSinkSpecificRequest, reply *structs.EventSinkResponse) error { if done, err := e.srv.forward("Event.GetSink", args, args, reply); done { return err @@ -141,6 +146,7 @@ func (e *Event) GetSink(args *structs.EventSinkSpecificRequest, reply *structs.E return e.srv.blockingRPC(&opts) } +// DeleteSink deletes an event sink func (e *Event) DeleteSink(args *structs.EventSinkDeleteRequest, reply *structs.GenericResponse) error { if done, err := e.srv.forward("Event.DeleteSink", args, args, reply); done { return err diff --git a/nomad/event_endpoint_test.go b/nomad/event_endpoint_test.go index 5202d03c0c4..9c36dc95ba4 100644 --- a/nomad/event_endpoint_test.go +++ b/nomad/event_endpoint_test.go @@ -543,6 +543,34 @@ func TestEvent_UpsertSink(t *testing.T) { require.EqualValues(t, sink, out) } +func TestEvent_UpsertSink_Invalid(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + sink := &structs.EventSink{ + Type: structs.SinkWebhook, + } + + req := &structs.EventSinkUpsertRequest{ + Sink: sink, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Event.UpsertSink", req, &resp) + require.Error(t, err) + + require.Contains(t, err.Error(), "Missing sink ID") + require.Contains(t, err.Error(), "Sink must be in a namespace") + require.Contains(t, err.Error(), "Webhook sink requires a valid Address") + +} + func TestEvent_GetSink(t *testing.T) { t.Parallel() diff --git a/nomad/structs/event.go b/nomad/structs/event.go index 389bac9602a..e15e899b5ab 100644 --- a/nomad/structs/event.go +++ b/nomad/structs/event.go @@ -1,5 +1,13 @@ package structs +import ( + "fmt" + "net/url" + "strings" + + multierror "github.com/hashicorp/go-multierror" +) + // EventStreamRequest is used to stream events from a servers EventBroker type EventStreamRequest struct { Topics map[Topic][]string @@ -116,3 +124,32 @@ type EventSink struct { CreateIndex uint64 ModifyIndex uint64 } + +func (e *EventSink) Validate() error { + var mErr multierror.Error + + if e.ID == "" { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Missing sink ID")) + } else if strings.Contains(e.ID, " ") { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Sink ID contains a space")) + } else if strings.Contains(e.ID, "\000") { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Sink ID contains a null character")) + } + + if e.Namespace == "" { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Sink must be in a namespace")) + } + + switch e.Type { + case SinkWebhook: + if e.Address == "" { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Webhook sink requires a valid Address")) + } else if _, err := url.Parse(e.Address); err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Webhook sink Address must be a valid url: %w", err)) + } + default: + mErr.Errors = append(mErr.Errors, fmt.Errorf("Sink type invalid")) + } + + return mErr.ErrorOrNil() +}