Skip to content

Commit

Permalink
validate sink on upsert
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbailey committed Oct 23, 2020
1 parent 0cd2b1f commit 1f3ccc9
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 1 deletion.
8 changes: 7 additions & 1 deletion nomad/event_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func (e *Event) register() {
e.srv.streamingRpcs.Register("Event.Stream", e.stream)
}

// ListSinks is used to list the event sinks registered in each namespace.
func (e *Event) ListSinks(args *structs.EventSinkListRequest, reply *structs.EventSinkListResponse) error {
if done, err := e.srv.forward("Event.ListSinks", args, args, reply); done {
return err
Expand Down Expand Up @@ -77,6 +78,7 @@ func (e *Event) ListSinks(args *structs.EventSinkListRequest, reply *structs.Eve
return e.srv.blockingRPC(&opts)
}

// UpsertSink is used to create or update an event sink
func (e *Event) UpsertSink(args *structs.EventSinkUpsertRequest, reply *structs.GenericResponse) error {
if done, err := e.srv.forward("Event.UpsertSink", args, args, reply); done {
return err
Expand All @@ -89,7 +91,9 @@ func (e *Event) UpsertSink(args *structs.EventSinkUpsertRequest, reply *structs.
return structs.ErrPermissionDenied
}

// TODO(drew) validate sink values
if err := args.Sink.Validate(); err != nil {
return err
}

// Update via Raft
_, index, err := e.srv.raftApply(structs.EventSinkUpsertRequestType, args)
Expand All @@ -101,6 +105,7 @@ func (e *Event) UpsertSink(args *structs.EventSinkUpsertRequest, reply *structs.
return nil
}

// GetSink returns the requested event sink
func (e *Event) GetSink(args *structs.EventSinkSpecificRequest, reply *structs.EventSinkResponse) error {
if done, err := e.srv.forward("Event.GetSink", args, args, reply); done {
return err
Expand Down Expand Up @@ -141,6 +146,7 @@ func (e *Event) GetSink(args *structs.EventSinkSpecificRequest, reply *structs.E
return e.srv.blockingRPC(&opts)
}

// DeleteSink deletes an event sink
func (e *Event) DeleteSink(args *structs.EventSinkDeleteRequest, reply *structs.GenericResponse) error {
if done, err := e.srv.forward("Event.DeleteSink", args, args, reply); done {
return err
Expand Down
28 changes: 28 additions & 0 deletions nomad/event_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,34 @@ func TestEvent_UpsertSink(t *testing.T) {
require.EqualValues(t, sink, out)
}

func TestEvent_UpsertSink_Invalid(t *testing.T) {
t.Parallel()

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()

codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

sink := &structs.EventSink{
Type: structs.SinkWebhook,
}

req := &structs.EventSinkUpsertRequest{
Sink: sink,
WriteRequest: structs.WriteRequest{Region: "global"},
}

var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Event.UpsertSink", req, &resp)
require.Error(t, err)

require.Contains(t, err.Error(), "Missing sink ID")
require.Contains(t, err.Error(), "Sink must be in a namespace")
require.Contains(t, err.Error(), "Webhook sink requires a valid Address")

}

func TestEvent_GetSink(t *testing.T) {
t.Parallel()

Expand Down
37 changes: 37 additions & 0 deletions nomad/structs/event.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
package structs

import (
"fmt"
"net/url"
"strings"

multierror "github.com/hashicorp/go-multierror"
)

// EventStreamRequest is used to stream events from a servers EventBroker
type EventStreamRequest struct {
Topics map[Topic][]string
Expand Down Expand Up @@ -116,3 +124,32 @@ type EventSink struct {
CreateIndex uint64
ModifyIndex uint64
}

func (e *EventSink) Validate() error {
var mErr multierror.Error

if e.ID == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Missing sink ID"))
} else if strings.Contains(e.ID, " ") {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Sink ID contains a space"))
} else if strings.Contains(e.ID, "\000") {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Sink ID contains a null character"))
}

if e.Namespace == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Sink must be in a namespace"))
}

switch e.Type {
case SinkWebhook:
if e.Address == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Webhook sink requires a valid Address"))
} else if _, err := url.Parse(e.Address); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Webhook sink Address must be a valid url: %w", err))
}
default:
mErr.Errors = append(mErr.Errors, fmt.Errorf("Sink type invalid"))
}

return mErr.ErrorOrNil()
}

0 comments on commit 1f3ccc9

Please sign in to comment.