diff --git a/command/agent/event_endpoint.go b/command/agent/event_endpoint.go index d28ca84e772..49cacbee7f2 100644 --- a/command/agent/event_endpoint.go +++ b/command/agent/event_endpoint.go @@ -17,6 +17,103 @@ import ( "golang.org/x/sync/errgroup" ) +func (s *HTTPServer) EventSinksRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != http.MethodGet { + return nil, CodedError(405, ErrInvalidMethod) + } + + args := structs.EventSinkListRequest{} + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + var out structs.EventSinkListResponse + if err := s.agent.RPC("Event.ListSinks", &args, &out); err != nil { + return nil, err + } + + if out.Sinks == nil { + out.Sinks = make([]*structs.EventSink, 0) + } + setMeta(resp, &out.QueryMeta) + return out.Sinks, nil +} + +func (s *HTTPServer) EventSinkSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + name := strings.TrimPrefix(req.URL.Path, "/v1/event/sink/") + if len(name) == 0 { + return nil, CodedError(400, "Missing Policy Name") + } + switch req.Method { + case http.MethodGet: + return s.eventSinkGet(resp, req, name) + case http.MethodPost, http.MethodPut: + return s.eventSinkUpdate(resp, req, name) + case http.MethodDelete: + return s.eventSinkDelete(resp, req, name) + default: + return nil, CodedError(405, ErrInvalidMethod) + } +} + +func (s *HTTPServer) eventSinkGet(resp http.ResponseWriter, req *http.Request, sink string) (interface{}, error) { + args := structs.EventSinkSpecificRequest{ + ID: sink, + } + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + var out structs.EventSinkResponse + if err := s.agent.RPC("Event.GetSink", &args, &out); err != nil { + return nil, err + } + setMeta(resp, &out.QueryMeta) + if out.Sink == nil { + return nil, CodedError(404, "event sink not found") + } + return out.Sink, nil +} + +func (s *HTTPServer) eventSinkUpdate(resp http.ResponseWriter, req *http.Request, sinkName string) (interface{}, error) { + var sink structs.EventSink + if err := decodeBody(req, &sink); err != nil { + return nil, CodedError(500, err.Error()) + } + + if sink.ID != sinkName { + return nil, CodedError(400, "Event sink name does not match request path") + } + + args := structs.EventSinkUpsertRequest{ + Sink: &sink, + } + s.parseWriteRequest(req, &args.WriteRequest) + + var out structs.GenericResponse + if err := s.agent.RPC("Event.UpsertSink", &args, &out); err != nil { + return nil, err + } + + setIndex(resp, out.Index) + return nil, nil +} + +func (s *HTTPServer) eventSinkDelete(resp http.ResponseWriter, req *http.Request, sink string) (interface{}, error) { + + args := structs.EventSinkDeleteRequest{ + IDs: []string{sink}, + } + s.parseWriteRequest(req, &args.WriteRequest) + + var out structs.GenericResponse + if err := s.agent.RPC("Event.DeleteSink", &args, &out); err != nil { + return nil, err + } + setIndex(resp, out.Index) + return nil, nil +} + func (s *HTTPServer) EventStream(resp http.ResponseWriter, req *http.Request) (interface{}, error) { query := req.URL.Query() diff --git a/command/agent/event_endpoint_test.go b/command/agent/event_endpoint_test.go index c450a917988..1f80c40a675 100644 --- a/command/agent/event_endpoint_test.go +++ b/command/agent/event_endpoint_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" @@ -22,6 +23,95 @@ type testEvent struct { ID string } +func TestHTTP_EventSinkList(t *testing.T) { + t.Parallel() + + httpTest(t, nil, func(s *TestAgent) { + s1 := mock.EventSink() + s2 := mock.EventSink() + + require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, s1)) + require.NoError(t, s.Agent.server.State().UpsertEventSink(1001, s2)) + + req, err := http.NewRequest("GET", "/v1/event/sinks", nil) + require.NoError(t, err) + + respW := httptest.NewRecorder() + obj, err := s.Server.EventSinksRequest(respW, req) + require.NoError(t, err) + + require.Equal(t, "1001", respW.HeaderMap.Get("X-Nomad-Index")) + + n := obj.([]*structs.EventSink) + require.Len(t, n, 2) + }) +} + +func TestHTTP_EventSinkGet(t *testing.T) { + httpTest(t, nil, func(s *TestAgent) { + s1 := mock.EventSink() + + require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, s1)) + + req, err := http.NewRequest("GET", "/v1/event/sink/"+s1.ID, nil) + require.NoError(t, err) + + respW := httptest.NewRecorder() + obj, err := s.Server.EventSinkSpecificRequest(respW, req) + require.NoError(t, err) + + require.Equal(t, "1000", respW.HeaderMap.Get("X-Nomad-Index")) + + n := obj.(*structs.EventSink) + require.Equal(t, s1, n) + }) +} + +func TestHTTP_EventSinkUpsert(t *testing.T) { + httpTest(t, nil, func(s *TestAgent) { + s1 := mock.EventSink() + + buf := encodeReq(s1) + + req, err := http.NewRequest("POST", "/v1/event/sink/"+s1.ID, buf) + require.NoError(t, err) + + respW := httptest.NewRecorder() + _, err = s.Server.EventSinkSpecificRequest(respW, req) + require.NoError(t, err) + + require.NotEqual(t, "", respW.HeaderMap.Get("X-Nomad-Index")) + + state := s.Agent.server.State() + out, err := state.EventSinkByID(nil, s1.ID) + require.NoError(t, err) + require.Equal(t, s1.Address, out.Address) + require.Equal(t, s1.ID, out.ID) + }) +} + +func TestHTTP_EventSinkDelete(t *testing.T) { + httpTest(t, nil, func(s *TestAgent) { + s1 := mock.EventSink() + + require.NoError(t, s.Agent.server.State().UpsertEventSink(1000, s1)) + + req, err := http.NewRequest("DELETE", "/v1/event/sink/"+s1.ID, nil) + require.NoError(t, err) + + respW := httptest.NewRecorder() + _, err = s.Server.EventSinkSpecificRequest(respW, req) + require.NoError(t, err) + + require.NotEqual(t, "", respW.HeaderMap.Get("X-Nomad-Index")) + + state := s.Agent.server.State() + out, err := state.EventSinkByID(nil, s1.ID) + require.NoError(t, err) + require.Nil(t, out) + }) +} + func TestEventStream(t *testing.T) { t.Parallel() diff --git a/command/agent/http.go b/command/agent/http.go index 6dee1a4f58f..82c4b21ddd6 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -327,6 +327,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/operator/scheduler/configuration", s.wrap(s.OperatorSchedulerConfiguration)) s.mux.HandleFunc("/v1/event/stream", s.wrap(s.EventStream)) + s.mux.HandleFunc("/v1/event/sinks", s.wrap(s.EventSinksRequest)) + s.mux.HandleFunc("/v1/event/sink/", s.wrap(s.EventSinkSpecificRequest)) s.mux.HandleFunc("/v1/namespaces", s.wrap(s.NamespacesRequest)) s.mux.HandleFunc("/v1/namespace", s.wrap(s.NamespaceCreateRequest)) diff --git a/helper/raftutil/msgtypes.go b/helper/raftutil/msgtypes.go index 49daa8dc115..c485a4d7640 100644 --- a/helper/raftutil/msgtypes.go +++ b/helper/raftutil/msgtypes.go @@ -45,6 +45,8 @@ var msgTypeNames = map[structs.MessageType]string{ structs.ScalingEventRegisterRequestType: "ScalingEventRegisterRequestType", structs.CSIVolumeClaimBatchRequestType: "CSIVolumeClaimBatchRequestType", structs.CSIPluginDeleteRequestType: "CSIPluginDeleteRequestType", + structs.EventSinkUpsertRequestType: "EventSinkUpsertRequestType", + structs.EventSinkDeleteRequestType: "EventSinkDeleteRequestType", structs.NamespaceUpsertRequestType: "NamespaceUpsertRequestType", structs.NamespaceDeleteRequestType: "NamespaceDeleteRequestType", } diff --git a/nomad/event_endpoint.go b/nomad/event_endpoint.go index 3b980981496..9f8a7b24906 100644 --- a/nomad/event_endpoint.go +++ b/nomad/event_endpoint.go @@ -7,9 +7,12 @@ import ( "io/ioutil" "time" + metrics "github.com/armon/go-metrics" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" ) @@ -22,6 +25,150 @@ func (e *Event) register() { e.srv.streamingRpcs.Register("Event.Stream", e.stream) } +// ListSinks is used to list the event sinks registered in Nomad +func (e *Event) ListSinks(args *structs.EventSinkListRequest, reply *structs.EventSinkListResponse) error { + if done, err := e.srv.forward("Event.ListSinks", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "event", "list_sinks"}, time.Now()) + + if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowOperatorRead() { + return structs.ErrPermissionDenied + } + + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, state *state.StateStore) error { + iter, err := state.EventSinks(ws) + if err != nil { + return err + } + + var sinks []*structs.EventSink + for { + raw := iter.Next() + if raw == nil { + break + } + + sink := raw.(*structs.EventSink) + sinks = append(sinks, sink) + } + reply.Sinks = sinks + + index, err := state.Index("event_sink") + if err != nil { + return err + } + + // Ensure we never set the index to zero, otherwise a blocking query cannot be used. + // We floor the index at one, since realistically the first write must have a higher index. + if index == 0 { + index = 1 + } + + reply.Index = index + return nil + }, + } + + return e.srv.blockingRPC(&opts) +} + +// UpsertSink is used to create or update an event sink +func (e *Event) UpsertSink(args *structs.EventSinkUpsertRequest, reply *structs.GenericResponse) error { + if done, err := e.srv.forward("Event.UpsertSink", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "event", "upsert_sink"}, time.Now()) + + if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.IsManagement() { + return structs.ErrPermissionDenied + } + + if err := args.Sink.Validate(); err != nil { + return err + } + + // Update via Raft + _, index, err := e.srv.raftApply(structs.EventSinkUpsertRequestType, args) + if err != nil { + return err + } + + reply.Index = index + return nil +} + +// GetSink returns the requested event sink +func (e *Event) GetSink(args *structs.EventSinkSpecificRequest, reply *structs.EventSinkResponse) error { + if done, err := e.srv.forward("Event.GetSink", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "event", "get_sink"}, time.Now()) + + if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowOperatorRead() { + return structs.ErrPermissionDenied + } + + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, state *state.StateStore) error { + s, err := state.EventSinkByID(ws, args.ID) + if err != nil { + return nil + } + + reply.Sink = s + + index, err := state.Index("event_sink") + if err != nil { + return err + } + + if index == 0 { + index = 1 + } + + reply.Index = index + return nil + }, + } + + return e.srv.blockingRPC(&opts) +} + +// DeleteSink deletes an event sink +func (e *Event) DeleteSink(args *structs.EventSinkDeleteRequest, reply *structs.GenericResponse) error { + if done, err := e.srv.forward("Event.DeleteSink", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "event", "delete_sink"}, time.Now()) + + if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.IsManagement() { + return structs.ErrPermissionDenied + } + + // Update via Raft + _, index, err := e.srv.raftApply(structs.EventSinkDeleteRequestType, args) + if err != nil { + return err + } + + reply.Index = index + return nil +} + func (e *Event) stream(conn io.ReadWriteCloser) { defer conn.Close() diff --git a/nomad/event_endpoint_test.go b/nomad/event_endpoint_test.go index 26757d37ca3..97d962867c5 100644 --- a/nomad/event_endpoint_test.go +++ b/nomad/event_endpoint_test.go @@ -10,7 +10,9 @@ import ( "time" "github.com/hashicorp/go-msgpack/codec" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" @@ -508,3 +510,151 @@ func TestEventStream_ACL(t *testing.T) { }) } } + +func TestEvent_UpsertSink(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + sink := mock.EventSink() + req := &structs.EventSinkUpsertRequest{ + Sink: sink, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + var resp structs.GenericResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Event.UpsertSink", req, &resp)) + require.NotEqual(t, 0, resp.Index) + + // Check for the sink in the FSM + + state := s1.fsm.State() + out, err := state.EventSinkByID(nil, sink.ID) + require.NoError(t, err) + + // set the index so we can compare values + sink.CreateIndex = resp.Index + sink.ModifyIndex = resp.Index + + require.EqualValues(t, sink, out) +} + +func TestEvent_UpsertSink_Invalid(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + sink := &structs.EventSink{ + Type: structs.SinkWebhook, + } + + req := &structs.EventSinkUpsertRequest{ + Sink: sink, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Event.UpsertSink", req, &resp) + require.Error(t, err) + + require.Contains(t, err.Error(), "Missing sink ID") + require.Contains(t, err.Error(), "Webhook sink requires a valid Address") +} + +func TestEvent_GetSink(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + sink := mock.EventSink() + + require.NoError(t, s1.fsm.State().UpsertEventSink(1000, sink)) + + get := &structs.EventSinkSpecificRequest{ + ID: sink.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + }, + } + + var resp structs.EventSinkResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Event.GetSink", get, &resp)) + require.EqualValues(t, 1000, resp.Index) + require.Equal(t, sink.ID, resp.Sink.ID) + + // Query for a non-existent sink + get.ID = uuid.Generate() + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Event.GetSink", get, &resp)) + + require.EqualValues(t, 1000, resp.Index) + require.Nil(t, resp.Sink) +} + +func TestEvent_DeleteSink(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + sink := mock.EventSink() + + require.NoError(t, s1.fsm.State().UpsertEventSink(1000, sink)) + + get := &structs.EventSinkDeleteRequest{ + IDs: []string{sink.ID}, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + + var resp structs.GenericResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Event.DeleteSink", get, &resp)) + require.NotEqual(t, uint64(0), resp.Index) + + state := s1.fsm.State() + out, err := state.EventSinkByID(nil, sink.ID) + require.NoError(t, err) + require.Nil(t, out) +} + +func TestEvent_ListSinks(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + sink := mock.EventSink() + sink2 := mock.EventSink() + + require.NoError(t, s1.fsm.State().UpsertEventSink(1000, sink)) + require.NoError(t, s1.fsm.State().UpsertEventSink(1001, sink2)) + + get := &structs.EventSinkListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + }, + } + + var resp structs.EventSinkListResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Event.ListSinks", get, &resp)) + require.Len(t, resp.Sinks, 2) + +} diff --git a/nomad/fsm.go b/nomad/fsm.go index f1d7f0f20cd..452d9ce7ccb 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -52,7 +52,7 @@ const ( CSIPluginSnapshot SnapshotType = 17 CSIVolumeSnapshot SnapshotType = 18 ScalingEventsSnapshot SnapshotType = 19 - + EventSinkSnapshot SnapshotType = 20 // Namespace appliers were moved from enterprise and therefore start at 64 NamespaceSnapshot SnapshotType = 64 ) @@ -293,6 +293,10 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyNamespaceUpsert(buf[1:], log.Index) case structs.NamespaceDeleteRequestType: return n.applyNamespaceDelete(buf[1:], log.Index) + case structs.EventSinkUpsertRequestType: + return n.applyUpsertEventSink(buf[1:], log.Index) + case structs.EventSinkDeleteRequestType: + return n.applyDeleteEventSink(buf[1:], log.Index) } // Check enterprise only message types. @@ -1301,6 +1305,35 @@ func (n *nomadFSM) applyNamespaceDelete(buf []byte, index uint64) interface{} { if err := n.state.DeleteNamespaces(index, req.Namespaces); err != nil { n.logger.Error("DeleteNamespaces failed", "error", err) + } + + return nil +} + +func (n *nomadFSM) applyUpsertEventSink(buf []byte, index uint64) interface{} { + var req structs.EventSinkUpsertRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_upsert_event_sink"}, time.Now()) + + if err := n.state.UpsertEventSink(index, req.Sink); err != nil { + n.logger.Error("UpsertEventSink failed", "error", err) + return err + } + + return nil +} + +func (n *nomadFSM) applyDeleteEventSink(buf []byte, index uint64) interface{} { + var req structs.EventSinkDeleteRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_delete_event_sink"}, time.Now()) + + if err := n.state.DeleteEventSinks(index, req.IDs); err != nil { + n.logger.Error("DeleteEventSink failed", "error", err) return err } @@ -1583,6 +1616,16 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { return err } + case EventSinkSnapshot: + sink := new(structs.EventSink) + if err := dec.Decode(sink); err != nil { + return err + } + + if err := restore.EventSinkRestore(sink); err != nil { + return err + } + default: // Check if this is an enterprise only object being restored restorer, ok := n.enterpriseRestorers[snapType] @@ -1900,6 +1943,10 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + if err := s.persistEventSinks(sink, encoder); err != nil { + sink.Cancel() + return err + } return nil } @@ -2431,6 +2478,29 @@ func (s *nomadSnapshot) persistCSIVolumes(sink raft.SnapshotSink, return nil } +func (s *nomadSnapshot) persistEventSinks(sink raft.SnapshotSink, + encoder *codec.Encoder) error { + + sinks, err := s.snap.EventSinks(nil) + if err != nil { + return err + } + + for { + raw := sinks.Next() + if raw == nil { + break + } + + es := raw.(*structs.EventSink) + sink.Write([]byte{byte(EventSinkSnapshot)}) + if err := encoder.Encode(es); err != nil { + return err + } + } + return nil +} + // Release is a no-op, as we just need to GC the pointer // to the state store snapshot. There is nothing to explicitly // cleanup. diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 927abef3526..c93b1f58fca 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -1516,3 +1516,11 @@ func Namespace() *structs.Namespace { ns.SetHash() return ns } + +func EventSink() *structs.EventSink { + return &structs.EventSink{ + ID: fmt.Sprintf("webhook-sink-%s", uuid.Generate()[0:8]), + Type: structs.SinkWebhook, + Address: "http://127.0.0.1/", + } +} diff --git a/nomad/server.go b/nomad/server.go index 05735fe4ab3..76189b4067a 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -1194,6 +1194,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { server.Register(s.staticEndpoints.FileSystem) server.Register(s.staticEndpoints.Agent) server.Register(s.staticEndpoints.Namespace) + server.Register(s.staticEndpoints.Event) // Create new dynamic endpoints and add them to the RPC server. node := &Node{srv: s, ctx: ctx, logger: s.logger.Named("client")} diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 00ea753e026..923b4461713 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -57,6 +57,7 @@ func init() { scalingPolicyTableSchema, scalingEventTableSchema, namespaceTableSchema, + eventSinkTableSchema, }...) } @@ -930,3 +931,23 @@ func namespaceTableSchema() *memdb.TableSchema { }, } } + +func eventSinkTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: "event_sink", + Indexes: map[string]*memdb.IndexSchema{ + // Primary index is used for event sink management and simple + // direct lookup. ID is required to be unique. + "id": { + Name: "id", + AllowMissing: false, + Unique: true, + + // Sink ID is uniquely identifying + Indexer: &memdb.StringFieldIndex{ + Field: "ID", + }, + }, + }, + } +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 97a5761d3b1..be6516d56bb 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -5857,6 +5857,81 @@ func (s *StateStore) ScalingPolicyByTargetAndType(ws memdb.WatchSet, target map[ return nil, nil } +func (s *StateStore) EventSinks(ws memdb.WatchSet) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + // Walk the entire event sink table + iter, err := txn.Get("event_sink", "id") + if err != nil { + return nil, err + } + + ws.Add(iter.WatchCh()) + + return iter, nil +} + +func (s *StateStore) EventSinkByID(ws memdb.WatchSet, id string) (*structs.EventSink, error) { + txn := s.db.ReadTxn() + return s.eventSinkByIDTxn(ws, id, txn) +} + +func (s *StateStore) eventSinkByIDTxn(ws memdb.WatchSet, id string, txn Txn) (*structs.EventSink, error) { + watchCh, existing, err := txn.FirstWatch("event_sink", "id", id) + if err != nil { + return nil, fmt.Errorf("event sink lookup failed: %w", err) + } + ws.Add(watchCh) + + if existing != nil { + return existing.(*structs.EventSink), nil + } + return nil, nil +} + +func (s *StateStore) UpsertEventSink(idx uint64, sink *structs.EventSink) error { + txn := s.db.WriteTxn(idx) + defer txn.Abort() + + existing, err := txn.First("event_sink", "id", sink.ID) + if err != nil { + return fmt.Errorf("event sink lookup failed: %w", err) + } + + if existing != nil { + sink.CreateIndex = existing.(*structs.EventSink).CreateIndex + sink.ModifyIndex = idx + } else { + sink.CreateIndex = idx + sink.ModifyIndex = idx + } + + // Insert the sink + if err := txn.Insert("event_sink", sink); err != nil { + return fmt.Errorf("event sink insert failed: %w", err) + } + if err := txn.Insert("index", &IndexEntry{"event_sink", idx}); err != nil { + return fmt.Errorf("index update failed: %w", err) + } + + return txn.Commit() +} + +func (s *StateStore) DeleteEventSinks(idx uint64, sinks []string) error { + txn := s.db.WriteTxn(idx) + defer txn.Abort() + + for _, id := range sinks { + if _, err := txn.DeleteAll("event_sink", "id", id); err != nil { + return fmt.Errorf("deleting event sink failed: %v", err) + } + } + if err := txn.Insert("index", &IndexEntry{"event_sink", idx}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + return txn.Commit() +} + // StateSnapshot is used to provide a point-in-time snapshot type StateSnapshot struct { StateStore @@ -6118,3 +6193,10 @@ func (r *StateRestore) NamespaceRestore(ns *structs.Namespace) error { } return nil } + +func (r *StateRestore) EventSinkRestore(sink *structs.EventSink) error { + if err := r.txn.Insert("event_sink", sink); err != nil { + return fmt.Errorf("event sink insert failed: %v", err) + } + return nil +} diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 6309d8741b6..d26725626ed 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -9553,6 +9553,98 @@ func TestStateStore_RestoreScalingEvents(t *testing.T) { require.EqualValues(jobScalingEvents.ScalingEvents, out) } +func TestStateStore_UpsertEventSink(t *testing.T) { + t.Parallel() + + state := testStateStore(t) + sink := &structs.EventSink{ + ID: "webhook-sink", + Type: structs.SinkWebhook, + } + + require.NoError(t, state.UpsertEventSink(100, sink)) + + out, err := state.EventSinkByID(nil, "webhook-sink") + require.NoError(t, err) + require.Equal(t, structs.SinkWebhook, out.Type) +} + +func TestStateStore_DeleteEventSinks(t *testing.T) { + t.Parallel() + + state := testStateStore(t) + s1 := mock.EventSink() + s2 := mock.EventSink() + s3 := mock.EventSink() + s4 := mock.EventSink() + + require.NoError(t, state.UpsertEventSink(100, s1)) + require.NoError(t, state.UpsertEventSink(101, s2)) + require.NoError(t, state.UpsertEventSink(102, s3)) + require.NoError(t, state.UpsertEventSink(103, s4)) + + require.NoError(t, state.DeleteEventSinks(1000, []string{s1.ID, s2.ID, s3.ID})) + + out, err := state.EventSinkByID(nil, s4.ID) + require.NoError(t, err) + require.NotNil(t, out) + + out, err = state.EventSinkByID(nil, s1.ID) + require.NoError(t, err) + require.Nil(t, out) + +} + +func TestStateStore_EventSinks(t *testing.T) { + t.Parallel() + + state := testStateStore(t) + s1 := mock.EventSink() + s2 := mock.EventSink() + s3 := mock.EventSink() + + require.NoError(t, state.UpsertEventSink(100, s1)) + require.NoError(t, state.UpsertEventSink(101, s2)) + require.NoError(t, state.UpsertEventSink(102, s3)) + + iter, err := state.EventSinks(nil) + require.NoError(t, err) + + var out []*structs.EventSink + for { + raw := iter.Next() + if raw == nil { + break + } + sink := raw.(*structs.EventSink) + out = append(out, sink) + } + require.Len(t, out, 3) + +} + +func TestStateStore_RestoreEventSink(t *testing.T) { + t.Parallel() + require := require.New(t) + + state := testStateStore(t) + eventSink := &structs.EventSink{ + ID: "eventsink", + } + + restore, err := state.Restore() + require.NoError(err) + + err = restore.EventSinkRestore(eventSink) + require.NoError(err) + require.NoError(restore.Commit()) + + out, err := state.EventSinkByID(nil, "eventsink") + require.NoError(err) + require.NotNil(out) + require.EqualValues(eventSink, out) +} + func TestStateStore_Abandon(t *testing.T) { t.Parallel() diff --git a/nomad/structs/event.go b/nomad/structs/event.go new file mode 100644 index 00000000000..1747dd9167d --- /dev/null +++ b/nomad/structs/event.go @@ -0,0 +1,150 @@ +package structs + +import ( + "fmt" + "net/url" + "strings" + + multierror "github.com/hashicorp/go-multierror" +) + +// EventStreamRequest is used to stream events from a servers EventBroker +type EventStreamRequest struct { + Topics map[Topic][]string + Index int + + QueryOptions +} + +type EventStreamWrapper struct { + Error *RpcError + Event *EventJson +} + +type Topic string + +const ( + TopicDeployment Topic = "Deployment" + TopicEval Topic = "Eval" + TopicAlloc Topic = "Alloc" + TopicJob Topic = "Job" + TopicNode Topic = "Node" + TopicAll Topic = "*" +) + +// Event represents a change in Nomads state. +type Event struct { + // Topic represeents the primary object for the event + Topic Topic + + // Type is a short string representing the reason for the event + Type string + + // Key is the primary identifier of the Event, The involved objects ID + Key string + + // Namespace is the namespace of the object, If the object is not namespace + // aware (Node) it is left blank + Namespace string + + // FilterKeys are a set of additional related keys that are used to include + // events during filtering. + FilterKeys []string + + // Index is the raft index that corresponds to the event + Index uint64 + + // Payload is the Event itself see state/events.go for a list of events + Payload interface{} +} + +// Events is a wrapper that contains a set of events for a given index. +type Events struct { + Index uint64 + Events []Event +} + +// EventJson is a wrapper for a JSON object +type EventJson struct { + Data []byte +} + +func (j *EventJson) Copy() *EventJson { + n := new(EventJson) + *n = *j + n.Data = make([]byte, len(j.Data)) + copy(n.Data, j.Data) + return n +} + +type EventSinkUpsertRequest struct { + Sink *EventSink + WriteRequest +} + +type EventSinkSpecificRequest struct { + ID string + QueryOptions +} + +type EventSinkResponse struct { + Sink *EventSink + QueryMeta +} + +type EventSinkDeleteRequest struct { + IDs []string + WriteRequest +} + +type EventSinkListRequest struct { + QueryOptions +} + +type EventSinkListResponse struct { + Sinks []*EventSink + QueryMeta +} + +type SinkType string + +const ( + SinkWebhook SinkType = "webhook" +) + +type EventSink struct { + ID string + Type SinkType + + Topics map[Topic][]string + + Address string + + CreateIndex uint64 + ModifyIndex uint64 +} + +func (e *EventSink) Validate() error { + var mErr multierror.Error + + if e.ID == "" { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Missing sink ID")) + } else if strings.Contains(e.ID, " ") { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Sink ID contains a space")) + } else if strings.Contains(e.ID, "\000") { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Sink ID contains a null character")) + } + + switch e.Type { + case SinkWebhook: + if e.Address == "" { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Webhook sink requires a valid Address")) + } else if _, err := url.Parse(e.Address); err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Webhook sink Address must be a valid url: %w", err)) + } + default: + mErr.Errors = append(mErr.Errors, fmt.Errorf("Sink type invalid")) + } + + return mErr.ErrorOrNil() +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 9a54bae3cba..a9a4af3c71f 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -97,6 +97,8 @@ const ( ScalingEventRegisterRequestType MessageType = 38 CSIVolumeClaimBatchRequestType MessageType = 39 CSIPluginDeleteRequestType MessageType = 40 + EventSinkUpsertRequestType MessageType = 41 + EventSinkDeleteRequestType MessageType = 42 // Namespace types were moved from enterprise and therefore start at 64 NamespaceUpsertRequestType MessageType = 64 @@ -10914,19 +10916,6 @@ type ACLTokenUpsertResponse struct { WriteMeta } -// EventStreamRequest is used to stream events from a servers EventBroker -type EventStreamRequest struct { - Topics map[Topic][]string - Index int - - QueryOptions -} - -type EventStreamWrapper struct { - Error *RpcError - Event *EventJson -} - // RpcError is used for serializing errors with a potential error code type RpcError struct { Message string @@ -10943,59 +10932,3 @@ func NewRpcError(err error, code *int64) *RpcError { func (r *RpcError) Error() string { return r.Message } - -type Topic string - -const ( - TopicDeployment Topic = "Deployment" - TopicEval Topic = "Eval" - TopicAlloc Topic = "Alloc" - TopicJob Topic = "Job" - TopicNode Topic = "Node" - TopicAll Topic = "*" -) - -// Event represents a change in Nomads state. -type Event struct { - // Topic represeents the primary object for the event - Topic Topic - - // Type is a short string representing the reason for the event - Type string - - // Key is the primary identifier of the Event, The involved objects ID - Key string - - // Namespace is the namespace of the object, If the object is not namespace - // aware (Node) it is left blank - Namespace string - - // FilterKeys are a set of additional related keys that are used to include - // events during filtering. - FilterKeys []string - - // Index is the raft index that corresponds to the event - Index uint64 - - // Payload is the Event itself see state/events.go for a list of events - Payload interface{} -} - -// Events is a wrapper that contains a set of events for a given index. -type Events struct { - Index uint64 - Events []Event -} - -// EventJson is a wrapper for a JSON object -type EventJson struct { - Data []byte -} - -func (j *EventJson) Copy() *EventJson { - n := new(EventJson) - *n = *j - n.Data = make([]byte, len(j.Data)) - copy(n.Data, j.Data) - return n -} diff --git a/tools/go.sum b/tools/go.sum index 40791a2747c..3bd9dd029a6 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -5,8 +5,6 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE github.com/OpenPeeDeeP/depguard v1.0.1 h1:VlW4R6jmBIv3/u1JNlawEvJMM4J+dPORPaZasQee8Us= github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= -github.com/a8m/tree v0.0.0-20181222104329-6a0b80129de4 h1:mK1/QgFPU4osbhjJ26B1w738kjQHaGJcon8uCLMS8fk= -github.com/a8m/tree v0.0.0-20181222104329-6a0b80129de4/go.mod h1:FSdwKX97koS5efgm8WevNf7XS3PqtyFkKDDXrz778cg= github.com/a8m/tree v0.0.0-20201019170308-9f4249a434f8 h1:CkFIJJAKEbZbM2tKmCqt/v9ivgpikjPu5lnDsk8huLE= github.com/a8m/tree v0.0.0-20201019170308-9f4249a434f8/go.mod h1:FSdwKX97koS5efgm8WevNf7XS3PqtyFkKDDXrz778cg= github.com/agext/levenshtein v1.2.1 h1:QmvMAjj2aEICytGiWzmxoE0x2KZvE0fvmqMOfy2tjT8=