From 74860be710b737e37fad42d1a6bf460b4d99cc1a Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 20 Oct 2020 21:16:25 -0700 Subject: [PATCH] core: open source namespaces --- command/agent/http.go | 4 + command/agent/http_oss.go | 4 - command/agent/namespace_endpoint.go | 119 ++++ command/agent/namespace_endpoint_test.go | 172 +++++ nomad/fsm.go | 141 ++++- nomad/namespace_endpoint.go | 371 +++++++++++ nomad/namespace_endpoint_test.go | 772 +++++++++++++++++++++++ nomad/server.go | 4 + nomad/state/schema.go | 30 + nomad/state/state_store.go | 231 ++++++- nomad/state/state_store_oss.go | 16 +- nomad/structs/structs.go | 207 ++++-- 12 files changed, 1992 insertions(+), 79 deletions(-) create mode 100644 command/agent/namespace_endpoint.go create mode 100644 command/agent/namespace_endpoint_test.go create mode 100644 nomad/namespace_endpoint.go create mode 100644 nomad/namespace_endpoint_test.go diff --git a/command/agent/http.go b/command/agent/http.go index 582cae0a2de..6dee1a4f58f 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -328,6 +328,10 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/event/stream", s.wrap(s.EventStream)) + s.mux.HandleFunc("/v1/namespaces", s.wrap(s.NamespacesRequest)) + s.mux.HandleFunc("/v1/namespace", s.wrap(s.NamespaceCreateRequest)) + s.mux.HandleFunc("/v1/namespace/", s.wrap(s.NamespaceSpecificRequest)) + if uiEnabled { s.mux.Handle("/ui/", http.StripPrefix("/ui/", s.handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()})))) } else { diff --git a/command/agent/http_oss.go b/command/agent/http_oss.go index 61de2934742..49056510b1b 100644 --- a/command/agent/http_oss.go +++ b/command/agent/http_oss.go @@ -8,10 +8,6 @@ import ( // registerEnterpriseHandlers is a no-op for the oss release func (s *HTTPServer) registerEnterpriseHandlers() { - s.mux.HandleFunc("/v1/namespaces", s.wrap(s.entOnly)) - s.mux.HandleFunc("/v1/namespace", s.wrap(s.entOnly)) - s.mux.HandleFunc("/v1/namespace/", s.wrap(s.entOnly)) - s.mux.HandleFunc("/v1/sentinel/policies", s.wrap(s.entOnly)) s.mux.HandleFunc("/v1/sentinel/policy/", s.wrap(s.entOnly)) diff --git a/command/agent/namespace_endpoint.go b/command/agent/namespace_endpoint.go new file mode 100644 index 00000000000..64f9a20ab0a --- /dev/null +++ b/command/agent/namespace_endpoint.go @@ -0,0 +1,119 @@ +package agent + +import ( + "net/http" + "strings" + + "github.com/hashicorp/nomad/nomad/structs" +) + +func (s *HTTPServer) NamespacesRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "GET" { + return nil, CodedError(405, ErrInvalidMethod) + } + + args := structs.NamespaceListRequest{} + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + var out structs.NamespaceListResponse + if err := s.agent.RPC("Namespace.ListNamespaces", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &out.QueryMeta) + if out.Namespaces == nil { + out.Namespaces = make([]*structs.Namespace, 0) + } + return out.Namespaces, nil +} + +func (s *HTTPServer) NamespaceSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + name := strings.TrimPrefix(req.URL.Path, "/v1/namespace/") + if len(name) == 0 { + return nil, CodedError(400, "Missing Namespace Name") + } + switch req.Method { + case "GET": + return s.namespaceQuery(resp, req, name) + case "PUT", "POST": + return s.namespaceUpdate(resp, req, name) + case "DELETE": + return s.namespaceDelete(resp, req, name) + default: + return nil, CodedError(405, ErrInvalidMethod) + } +} + +func (s *HTTPServer) NamespaceCreateRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "PUT" && req.Method != "POST" { + return nil, CodedError(405, ErrInvalidMethod) + } + + return s.namespaceUpdate(resp, req, "") +} + +func (s *HTTPServer) namespaceQuery(resp http.ResponseWriter, req *http.Request, + namespaceName string) (interface{}, error) { + args := structs.NamespaceSpecificRequest{ + Name: namespaceName, + } + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + var out structs.SingleNamespaceResponse + if err := s.agent.RPC("Namespace.GetNamespace", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &out.QueryMeta) + if out.Namespace == nil { + return nil, CodedError(404, "Namespace not found") + } + return out.Namespace, nil +} + +func (s *HTTPServer) namespaceUpdate(resp http.ResponseWriter, req *http.Request, + namespaceName string) (interface{}, error) { + // Parse the namespace + var namespace structs.Namespace + if err := decodeBody(req, &namespace); err != nil { + return nil, CodedError(500, err.Error()) + } + + // Ensure the namespace name matches + if namespaceName != "" && namespace.Name != namespaceName { + return nil, CodedError(400, "Namespace name does not match request path") + } + + // Format the request + args := structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{&namespace}, + } + s.parseWriteRequest(req, &args.WriteRequest) + + var out structs.GenericResponse + if err := s.agent.RPC("Namespace.UpsertNamespaces", &args, &out); err != nil { + return nil, err + } + setIndex(resp, out.Index) + return nil, nil +} + +func (s *HTTPServer) namespaceDelete(resp http.ResponseWriter, req *http.Request, + namespaceName string) (interface{}, error) { + + args := structs.NamespaceDeleteRequest{ + Namespaces: []string{namespaceName}, + } + s.parseWriteRequest(req, &args.WriteRequest) + + var out structs.GenericResponse + if err := s.agent.RPC("Namespace.DeleteNamespaces", &args, &out); err != nil { + return nil, err + } + setIndex(resp, out.Index) + return nil, nil +} diff --git a/command/agent/namespace_endpoint_test.go b/command/agent/namespace_endpoint_test.go new file mode 100644 index 00000000000..9e9a0fd0132 --- /dev/null +++ b/command/agent/namespace_endpoint_test.go @@ -0,0 +1,172 @@ +// +build ent + +package agent + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/assert" +) + +func TestHTTP_NamespaceList(t *testing.T) { + assert := assert.New(t) + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + ns1 := mock.Namespace() + ns2 := mock.Namespace() + ns3 := mock.Namespace() + args := structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1, ns2, ns3}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.Nil(s.Agent.RPC("Namespace.UpsertNamespaces", &args, &resp)) + + // Make the HTTP request + req, err := http.NewRequest("GET", "/v1/namespaces", nil) + assert.Nil(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.NamespacesRequest(respW, req) + assert.Nil(err) + + // Check for the index + assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + assert.Equal("true", respW.HeaderMap.Get("X-Nomad-KnownLeader")) + assert.NotZero(respW.HeaderMap.Get("X-Nomad-LastContact")) + + // Check the output (the 3 we register + default) + assert.Len(obj.([]*structs.Namespace), 4) + }) +} + +func TestHTTP_NamespaceQuery(t *testing.T) { + assert := assert.New(t) + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + ns1 := mock.Namespace() + args := structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.Nil(s.Agent.RPC("Namespace.UpsertNamespaces", &args, &resp)) + + // Make the HTTP request + req, err := http.NewRequest("GET", "/v1/namespace/"+ns1.Name, nil) + assert.Nil(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.NamespaceSpecificRequest(respW, req) + assert.Nil(err) + + // Check for the index + assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + assert.Equal("true", respW.HeaderMap.Get("X-Nomad-KnownLeader")) + assert.NotZero(respW.HeaderMap.Get("X-Nomad-LastContact")) + + // Check the output + assert.Equal(ns1.Name, obj.(*structs.Namespace).Name) + }) +} + +func TestHTTP_NamespaceCreate(t *testing.T) { + assert := assert.New(t) + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + // Make the HTTP request + ns1 := mock.Namespace() + buf := encodeReq(ns1) + req, err := http.NewRequest("PUT", "/v1/namespace", buf) + assert.Nil(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.NamespaceCreateRequest(respW, req) + assert.Nil(err) + assert.Nil(obj) + + // Check for the index + assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + + // Check policy was created + state := s.Agent.server.State() + out, err := state.NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + ns1.CreateIndex, ns1.ModifyIndex = out.CreateIndex, out.ModifyIndex + assert.Equal(ns1.Name, out.Name) + assert.Equal(ns1, out) + }) +} + +func TestHTTP_NamespaceUpdate(t *testing.T) { + assert := assert.New(t) + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + // Make the HTTP request + ns1 := mock.Namespace() + buf := encodeReq(ns1) + req, err := http.NewRequest("PUT", "/v1/namespace/"+ns1.Name, buf) + assert.Nil(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.NamespaceSpecificRequest(respW, req) + assert.Nil(err) + assert.Nil(obj) + + // Check for the index + assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + + // Check policy was created + state := s.Agent.server.State() + out, err := state.NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + ns1.CreateIndex, ns1.ModifyIndex = out.CreateIndex, out.ModifyIndex + assert.Equal(ns1.Name, out.Name) + assert.Equal(ns1, out) + }) +} + +func TestHTTP_NamespaceDelete(t *testing.T) { + assert := assert.New(t) + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + ns1 := mock.Namespace() + args := structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.Nil(s.Agent.RPC("Namespace.UpsertNamespaces", &args, &resp)) + + // Make the HTTP request + req, err := http.NewRequest("DELETE", "/v1/namespace/"+ns1.Name, nil) + assert.Nil(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.NamespaceSpecificRequest(respW, req) + assert.Nil(err) + assert.Nil(obj) + + // Check for the index + assert.NotZero(respW.HeaderMap.Get("X-Nomad-Index")) + + // Check policy was created + state := s.Agent.server.State() + out, err := state.NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.Nil(out) + }) +} diff --git a/nomad/fsm.go b/nomad/fsm.go index efa86768d85..f144f0d4952 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -33,27 +33,30 @@ const ( type SnapshotType byte const ( - NodeSnapshot SnapshotType = iota - JobSnapshot - IndexSnapshot - EvalSnapshot - AllocSnapshot - TimeTableSnapshot - PeriodicLaunchSnapshot - JobSummarySnapshot - VaultAccessorSnapshot - JobVersionSnapshot - DeploymentSnapshot - ACLPolicySnapshot - ACLTokenSnapshot - SchedulerConfigSnapshot - ClusterMetadataSnapshot - ServiceIdentityTokenAccessorSnapshot - ScalingPolicySnapshot - CSIPluginSnapshot - CSIVolumeSnapshot - ScalingEventsSnapshot - EventSnapshot + NodeSnapshot SnapshotType = 0 + JobSnapshot SnapshotType = 1 + IndexSnapshot SnapshotType = 2 + EvalSnapshot SnapshotType = 3 + AllocSnapshot SnapshotType = 4 + TimeTableSnapshot SnapshotType = 5 + PeriodicLaunchSnapshot SnapshotType = 6 + JobSummarySnapshot SnapshotType = 7 + VaultAccessorSnapshot SnapshotType = 8 + JobVersionSnapshot SnapshotType = 9 + DeploymentSnapshot SnapshotType = 10 + ACLPolicySnapshot SnapshotType = 11 + ACLTokenSnapshot SnapshotType = 12 + SchedulerConfigSnapshot SnapshotType = 13 + ClusterMetadataSnapshot SnapshotType = 14 + ServiceIdentityTokenAccessorSnapshot SnapshotType = 15 + ScalingPolicySnapshot SnapshotType = 16 + CSIPluginSnapshot SnapshotType = 17 + CSIVolumeSnapshot SnapshotType = 18 + ScalingEventsSnapshot SnapshotType = 19 + EventSnapshot SnapshotType = 20 + + // Namespace appliers were moved from enterprise and therefore start at 64 + NamespaceSnapshot SnapshotType = 64 ) // LogApplier is the definition of a function that can apply a Raft log @@ -295,6 +298,10 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyCSIVolumeBatchClaim(buf[1:], log.Index) case structs.CSIPluginDeleteRequestType: return n.applyCSIPluginDelete(buf[1:], log.Index) + case structs.NamespaceUpsertRequestType: + return n.applyNamespaceUpsert(buf[1:], log.Index) + case structs.NamespaceDeleteRequestType: + return n.applyNamespaceDelete(buf[1:], log.Index) } // Check enterprise only message types. @@ -1257,6 +1264,58 @@ func (n *nomadFSM) applyCSIPluginDelete(buf []byte, index uint64) interface{} { return nil } +// applyNamespaceUpsert is used to upsert a set of namespaces +func (n *nomadFSM) applyNamespaceUpsert(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_namespace_upsert"}, time.Now()) + var req structs.NamespaceUpsertRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + var trigger []string + for _, ns := range req.Namespaces { + old, err := n.state.NamespaceByName(nil, ns.Name) + if err != nil { + n.logger.Error("namespace lookup failed", "error", err) + return err + } + + // If we are changing the quota on a namespace trigger evals for the + // older quota. + if old != nil && old.Quota != "" && old.Quota != ns.Quota { + trigger = append(trigger, old.Quota) + } + } + + if err := n.state.UpsertNamespaces(index, req.Namespaces); err != nil { + n.logger.Error("UpsertNamespaces failed", "error", err) + return err + } + + // Send the unblocks + for _, quota := range trigger { + n.blockedEvals.UnblockQuota(quota, index) + } + + return nil +} + +// applyNamespaceDelete is used to delete a set of namespaces +func (n *nomadFSM) applyNamespaceDelete(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_namespace_delete"}, time.Now()) + var req structs.NamespaceDeleteRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := n.state.DeleteNamespaces(index, req.Namespaces); err != nil { + n.logger.Error("DeleteNamespaces failed", "error", err) + return err + } + + return nil +} + func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { // Create a new snapshot snap, err := n.state.Snapshot() @@ -1539,6 +1598,15 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err := restore.EventRestore(event); err != nil { return err } + case NamespaceSnapshot: + namespace := new(structs.Namespace) + if err := dec.Decode(namespace); err != nil { + return err + } + if err := restore.NamespaceRestore(namespace); err != nil { + return err + } + default: // Check if this is an enterprise only object being restored restorer, ok := n.enterpriseRestorers[snapType] @@ -1878,6 +1946,9 @@ func (s *nomadSnapshot) Persist(sink raft.SnapshotSink) error { sink.Cancel() return err } + if err := s.persistNamespaces(sink, encoder); err != nil { + return err + } if err := s.persistEnterpriseTables(sink, encoder); err != nil { sink.Cancel() return err @@ -2243,6 +2314,34 @@ func (s *nomadSnapshot) persistACLTokens(sink raft.SnapshotSink, return nil } +// persistNamespaces persists all the namespaces. +func (s *nomadSnapshot) persistNamespaces(sink raft.SnapshotSink, encoder *codec.Encoder) error { + // Get all the jobs + ws := memdb.NewWatchSet() + namespaces, err := s.snap.Namespaces(ws) + if err != nil { + return err + } + + for { + // Get the next item + raw := namespaces.Next() + if raw == nil { + break + } + + // Prepare the request struct + namespace := raw.(*structs.Namespace) + + // Write out a namespace registration + sink.Write([]byte{byte(NamespaceSnapshot)}) + if err := encoder.Encode(namespace); err != nil { + return err + } + } + return nil +} + func (s *nomadSnapshot) persistSchedulerConfig(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get scheduler config diff --git a/nomad/namespace_endpoint.go b/nomad/namespace_endpoint.go new file mode 100644 index 00000000000..7701801a15a --- /dev/null +++ b/nomad/namespace_endpoint.go @@ -0,0 +1,371 @@ +package nomad + +import ( + "fmt" + "time" + + metrics "github.com/armon/go-metrics" + memdb "github.com/hashicorp/go-memdb" + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +// Namespace endpoint is used for manipulating namespaces +type Namespace struct { + srv *Server +} + +// UpsertNamespaces is used to upsert a set of namespaces +func (n *Namespace) UpsertNamespaces(args *structs.NamespaceUpsertRequest, + reply *structs.GenericResponse) error { + args.Region = n.srv.config.AuthoritativeRegion + if done, err := n.srv.forward("Namespace.UpsertNamespaces", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "namespace", "upsert_namespaces"}, time.Now()) + + // Check management permissions + if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.IsManagement() { + return structs.ErrPermissionDenied + } + + // Validate there is at least one namespace + if len(args.Namespaces) == 0 { + return fmt.Errorf("must specify at least one namespace") + } + + // Validate the namespaces and set the hash + for _, ns := range args.Namespaces { + if err := ns.Validate(); err != nil { + return fmt.Errorf("Invalid namespace %q: %v", ns.Name, err) + } + + ns.SetHash() + } + + // Update via Raft + out, index, err := n.srv.raftApply(structs.NamespaceUpsertRequestType, args) + if err != nil { + return err + } + + // Check if there was an error when applying. + if err, ok := out.(error); ok && err != nil { + return err + } + + // Update the index + reply.Index = index + return nil +} + +// DeleteNamespaces is used to delete a namespace +func (n *Namespace) DeleteNamespaces(args *structs.NamespaceDeleteRequest, reply *structs.GenericResponse) error { + args.Region = n.srv.config.AuthoritativeRegion + if done, err := n.srv.forward("Namespace.DeleteNamespaces", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "namespace", "delete_namespaces"}, time.Now()) + + // Check management permissions + if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.IsManagement() { + return structs.ErrPermissionDenied + } + + // Validate at least one namespace + if len(args.Namespaces) == 0 { + return fmt.Errorf("must specify at least one namespace to delete") + } + + for _, ns := range args.Namespaces { + if ns == structs.DefaultNamespace { + return fmt.Errorf("can not delete default namespace") + } + } + + // Check that the deleting namespaces do not have non-terminal jobs in both + // this region and all federated regions + var mErr multierror.Error + for _, ns := range args.Namespaces { + nonTerminal, err := n.nonTerminalNamespaces(args.AuthToken, ns) + if err != nil { + multierror.Append(&mErr, err) + } else if len(nonTerminal) != 0 { + multierror.Append(&mErr, fmt.Errorf("namespace %q has non-terminal jobs in regions: %v", ns, nonTerminal)) + } + } + + if err := mErr.ErrorOrNil(); err != nil { + return err + } + + // Update via Raft + out, index, err := n.srv.raftApply(structs.NamespaceDeleteRequestType, args) + if err != nil { + return err + } + + // Check if there was an error when applying. + if err, ok := out.(error); ok && err != nil { + return err + } + + // Update the index + reply.Index = index + return nil +} + +// nonTerminalNamespaces returns whether the set of regions in which the +// namespaces contains non-terminal jobs, checking all federated regions +// including this one. +func (n *Namespace) nonTerminalNamespaces(authToken, namespace string) ([]string, error) { + regions := n.srv.Regions() + thisRegion := n.srv.Region() + terminal := make([]string, 0, len(regions)) + + // Check if this region is terminal + localTerminal, err := n.namespaceTerminalLocally(namespace) + if err != nil { + return nil, err + } + if !localTerminal { + terminal = append(terminal, thisRegion) + } + + for _, region := range regions { + if region == thisRegion { + continue + } + + remoteTerminal, err := n.namespaceTerminalInRegion(authToken, namespace, region) + if err != nil { + return nil, err + } + if !remoteTerminal { + terminal = append(terminal, region) + } + } + + return terminal, nil +} + +// namespaceTerminalLocally returns if the namespace contains only terminal jobs +// in the local region . +func (n *Namespace) namespaceTerminalLocally(namespace string) (bool, error) { + snap, err := n.srv.fsm.State().Snapshot() + if err != nil { + return false, err + } + + iter, err := snap.JobsByNamespace(nil, namespace) + if err != nil { + return false, err + } + + for { + raw := iter.Next() + if raw == nil { + break + } + + job := raw.(*structs.Job) + if job.Status != structs.JobStatusDead { + return false, nil + } + } + + return true, nil +} + +// namespaceTerminalInRegion returns if the namespace contains only terminal +// jobs in the given region . +func (n *Namespace) namespaceTerminalInRegion(authToken, namespace, region string) (bool, error) { + req := &structs.JobListRequest{ + QueryOptions: structs.QueryOptions{ + Region: region, + Namespace: namespace, + AllowStale: false, + AuthToken: authToken, + }, + } + + var resp structs.JobListResponse + done, err := n.srv.forward("Job.List", req, req, &resp) + if !done { + return false, fmt.Errorf("unexpectedly did not forward Job.List to region %q", region) + } else if err != nil { + return false, err + } + + for _, job := range resp.Jobs { + if job.Status != structs.JobStatusDead { + return false, nil + } + } + + return true, nil +} + +// ListNamespaces is used to list the namespaces +func (n *Namespace) ListNamespaces(args *structs.NamespaceListRequest, reply *structs.NamespaceListResponse) error { + if done, err := n.srv.forward("Namespace.ListNamespaces", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "namespace", "list_namespace"}, time.Now()) + + // Resolve token to acl to filter namespace list + aclObj, err := n.srv.ResolveToken(args.AuthToken) + if err != nil { + return err + } + + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, s *state.StateStore) error { + // Iterate over all the namespaces + var err error + var iter memdb.ResultIterator + if prefix := args.QueryOptions.Prefix; prefix != "" { + iter, err = s.NamespacesByNamePrefix(ws, prefix) + } else { + iter, err = s.Namespaces(ws) + } + if err != nil { + return err + } + + reply.Namespaces = nil + for { + raw := iter.Next() + if raw == nil { + break + } + ns := raw.(*structs.Namespace) + + // Only return namespaces allowed by acl + if aclObj == nil || aclObj.AllowNamespace(ns.Name) { + reply.Namespaces = append(reply.Namespaces, ns) + } + } + + // Use the last index that affected the namespace table + index, err := s.Index(state.TableNamespaces) + if err != nil { + return err + } + + // Ensure we never set the index to zero, otherwise a blocking query cannot be used. + // We floor the index at one, since realistically the first write must have a higher index. + if index == 0 { + index = 1 + } + reply.Index = index + return nil + }} + return n.srv.blockingRPC(&opts) +} + +// GetNamespace is used to get a specific namespace +func (n *Namespace) GetNamespace(args *structs.NamespaceSpecificRequest, reply *structs.SingleNamespaceResponse) error { + if done, err := n.srv.forward("Namespace.GetNamespace", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "namespace", "get_namespace"}, time.Now()) + + // Check capabilities for the given namespace permissions + if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNamespace(args.Name) { + return structs.ErrPermissionDenied + } + + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, s *state.StateStore) error { + // Look for the namespace + out, err := s.NamespaceByName(ws, args.Name) + if err != nil { + return err + } + + // Setup the output + reply.Namespace = out + if out != nil { + reply.Index = out.ModifyIndex + } else { + // Use the last index that affected the namespace table + index, err := s.Index(state.TableNamespaces) + if err != nil { + return err + } + + // Ensure we never set the index to zero, otherwise a blocking query cannot be used. + // We floor the index at one, since realistically the first write must have a higher index. + if index == 0 { + index = 1 + } + reply.Index = index + } + return nil + }} + return n.srv.blockingRPC(&opts) +} + +// GetNamespaces is used to get a set of namespaces +func (n *Namespace) GetNamespaces(args *structs.NamespaceSetRequest, reply *structs.NamespaceSetResponse) error { + if done, err := n.srv.forward("Namespace.GetNamespaces", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "namespace", "get_namespaces"}, time.Now()) + + // Check management permissions + if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.IsManagement() { + return structs.ErrPermissionDenied + } + + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, s *state.StateStore) error { + // Setup the output + reply.Namespaces = make(map[string]*structs.Namespace, len(args.Namespaces)) + + // Look for the namespace + for _, namespace := range args.Namespaces { + out, err := s.NamespaceByName(ws, namespace) + if err != nil { + return err + } + if out != nil { + reply.Namespaces[namespace] = out + } + } + + // Use the last index that affected the policy table + index, err := s.Index(state.TableNamespaces) + if err != nil { + return err + } + + // Ensure we never set the index to zero, otherwise a blocking query cannot be used. + // We floor the index at one, since realistically the first write must have a higher index. + if index == 0 { + index = 1 + } + reply.Index = index + return nil + }} + return n.srv.blockingRPC(&opts) +} diff --git a/nomad/namespace_endpoint_test.go b/nomad/namespace_endpoint_test.go new file mode 100644 index 00000000000..eec1c50bd00 --- /dev/null +++ b/nomad/namespace_endpoint_test.go @@ -0,0 +1,772 @@ +package nomad + +import ( + "fmt" + "testing" + "time" + + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/assert" +) + +func TestNamespaceEndpoint_GetNamespace(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns := mock.Namespace() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns}) + + // Lookup the namespace + get := &structs.NamespaceSpecificRequest{ + Name: ns.Name, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var resp structs.SingleNamespaceResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Equal(ns, resp.Namespace) + + // Lookup non-existing namespace + get.Name = uuid.Generate() + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Nil(resp.Namespace) +} + +func TestNamespaceEndpoint_GetNamespace_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state := s1.fsm.State() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Create the policy and tokens + validToken := mock.CreatePolicyAndToken(t, state, 1002, "test-valid", + mock.NamespacePolicy(ns1.Name, "", []string{acl.NamespaceCapabilityReadJob})) + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy(ns2.Name, "", []string{acl.NamespaceCapabilityReadJob})) + + get := &structs.NamespaceSpecificRequest{ + Name: ns1.Name, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Lookup the namespace without a token and expect failure + { + var resp structs.SingleNamespaceResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with an invalid token + get.AuthToken = invalidToken.SecretID + { + var resp structs.SingleNamespaceResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + } + + // Try with a valid token + get.AuthToken = validToken.SecretID + { + var resp structs.SingleNamespaceResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Equal(ns1, resp.Namespace) + } + + // Try with a root token + get.AuthToken = root.SecretID + { + var resp structs.SingleNamespaceResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Equal(ns1, resp.Namespace) + } +} + +func TestNamespaceEndpoint_GetNamespace_Blocking(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the namespaces + ns1 := mock.Namespace() + ns2 := mock.Namespace() + + // First create an namespace + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.UpsertNamespaces(100, []*structs.Namespace{ns1})) + }) + + // Upsert the namespace we are watching later + time.AfterFunc(200*time.Millisecond, func() { + assert.Nil(state.UpsertNamespaces(200, []*structs.Namespace{ns2})) + }) + + // Lookup the namespace + req := &structs.NamespaceSpecificRequest{ + Name: ns2.Name, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 150, + }, + } + var resp structs.SingleNamespaceResponse + start := time.Now() + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", req, &resp)) + assert.EqualValues(200, resp.Index) + assert.NotNil(resp.Namespace) + assert.Equal(ns2.Name, resp.Namespace.Name) + + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + + // Namespace delete triggers watches + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.DeleteNamespaces(300, []string{ns2.Name})) + }) + + req.QueryOptions.MinQueryIndex = 250 + var resp2 structs.SingleNamespaceResponse + start = time.Now() + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespace", req, &resp2)) + assert.EqualValues(300, resp2.Index) + assert.Nil(resp2.Namespace) + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) + } +} + +func TestNamespaceEndpoint_GetNamespaces(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Lookup the namespace + get := &structs.NamespaceSetRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var resp structs.NamespaceSetResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 2) + assert.Contains(resp.Namespaces, ns1.Name) + assert.Contains(resp.Namespaces, ns2.Name) +} + +func TestNamespaceEndpoint_GetNamespaces_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state := s1.fsm.State() + state.UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Create the policy and tokens + validToken := mock.CreatePolicyAndToken(t, state, 1002, "test-valid", + mock.NamespacePolicy(ns1.Name, "", []string{acl.NamespaceCapabilityReadJob})) + + // Lookup the namespace + get := &structs.NamespaceSetRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Lookup the namespaces without a token and expect a failure + { + var resp structs.NamespaceSetResponse + assert.NotNil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", get, &resp)) + } + + // Try with an non-management token + get.AuthToken = validToken.SecretID + { + var resp structs.NamespaceSetResponse + assert.NotNil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", get, &resp)) + } + + // Try with a root token + get.AuthToken = root.SecretID + { + var resp structs.NamespaceSetResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 2) + assert.Contains(resp.Namespaces, ns1.Name) + assert.Contains(resp.Namespaces, ns2.Name) + } +} + +func TestNamespaceEndpoint_GetNamespaces_Blocking(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the namespaces + ns1 := mock.Namespace() + ns2 := mock.Namespace() + + // First create an namespace + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.UpsertNamespaces(100, []*structs.Namespace{ns1})) + }) + + // Upsert the namespace we are watching later + time.AfterFunc(200*time.Millisecond, func() { + assert.Nil(state.UpsertNamespaces(200, []*structs.Namespace{ns2})) + }) + + // Lookup the namespace + req := &structs.NamespaceSetRequest{ + Namespaces: []string{ns2.Name}, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 150, + }, + } + var resp structs.NamespaceSetResponse + start := time.Now() + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", req, &resp)) + assert.EqualValues(200, resp.Index) + assert.Len(resp.Namespaces, 1) + assert.Contains(resp.Namespaces, ns2.Name) + + if elapsed := time.Since(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + + // Namespace delete triggers watches + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.DeleteNamespaces(300, []string{ns2.Name})) + }) + + req.QueryOptions.MinQueryIndex = 250 + var resp2 structs.NamespaceSetResponse + start = time.Now() + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.GetNamespaces", req, &resp2)) + assert.EqualValues(300, resp2.Index) + assert.Empty(resp2.Namespaces) + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) + } +} + +func TestNamespaceEndpoint_List(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + + ns1.Name = "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9" + ns2.Name = "aaaabbbb-3350-4b4b-d185-0e1992ed43e9" + assert.Nil(s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2})) + + // Lookup the namespaces + get := &structs.NamespaceListRequest{ + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var resp structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 3) + + // Lookup the namespaces by prefix + get = &structs.NamespaceListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Prefix: "aaaabb", + }, + } + var resp2 structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp2)) + assert.EqualValues(1000, resp2.Index) + assert.Len(resp2.Namespaces, 1) +} + +func TestNamespaceEndpoint_List_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state := s1.fsm.State() + + ns1.Name = "aaaaaaaa-3350-4b4b-d185-0e1992ed43e9" + ns2.Name = "bbbbbbbb-3350-4b4b-d185-0e1992ed43e9" + assert.Nil(s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2})) + + validDefToken := mock.CreatePolicyAndToken(t, state, 1001, "test-def-valid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadFS})) + validMultiToken := mock.CreatePolicyAndToken(t, state, 1002, "test-multi-valid", fmt.Sprintf("%s\n%s", + mock.NamespacePolicy(ns1.Name, "", []string{acl.NamespaceCapabilityReadJob}), + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob}))) + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy("invalid-namespace", "", []string{acl.NamespaceCapabilityReadJob})) + + get := &structs.NamespaceListRequest{ + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Lookup the namespaces without a token and expect a failure + { + var resp structs.NamespaceListResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp) + assert.Nil(err) + assert.Len(resp.Namespaces, 0) + } + + // Try with an invalid token + get.AuthToken = invalidToken.SecretID + { + var resp structs.NamespaceListResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp) + assert.Nil(err) + assert.Len(resp.Namespaces, 0) + } + + // Try with a valid token for one + get.AuthToken = validDefToken.SecretID + { + var resp structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 1) + } + + // Try with a valid token for two + get.AuthToken = validMultiToken.SecretID + { + var resp structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 2) + } + + // Try with a root token + get.AuthToken = root.SecretID + { + var resp structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", get, &resp)) + assert.EqualValues(1000, resp.Index) + assert.Len(resp.Namespaces, 3) + } +} + +func TestNamespaceEndpoint_List_Blocking(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the namespace + ns := mock.Namespace() + + // Upsert namespace triggers watches + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.UpsertNamespaces(200, []*structs.Namespace{ns})) + }) + + req := &structs.NamespaceListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 150, + }, + } + start := time.Now() + var resp structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", req, &resp)) + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + assert.EqualValues(200, resp.Index) + assert.Len(resp.Namespaces, 2) + + // Namespace deletion triggers watches + time.AfterFunc(100*time.Millisecond, func() { + assert.Nil(state.DeleteNamespaces(300, []string{ns.Name})) + }) + + req.MinQueryIndex = 200 + start = time.Now() + var resp2 structs.NamespaceListResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.ListNamespaces", req, &resp2)) + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) + } + assert.EqualValues(300, resp2.Index) + assert.Len(resp2.Namespaces, 1) +} + +func TestNamespaceEndpoint_DeleteNamespaces(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Lookup the namespaces + req := &structs.NamespaceDeleteRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp)) + assert.NotEqual(uint64(0), resp.Index) +} + +func TestNamespaceEndpoint_DeleteNamespaces_NonTerminal_Local(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Create a job in one + j := mock.Job() + j.Namespace = ns1.Name + assert.Nil(s1.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1001, j)) + + // Lookup the namespaces + req := &structs.NamespaceDeleteRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp) + if assert.NotNil(err) { + assert.Contains(err.Error(), "has non-terminal jobs") + } +} + +func TestNamespaceEndpoint_DeleteNamespaces_NonTerminal_Federated_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { + c.Region = "region1" + c.AuthoritativeRegion = "region1" + c.ACLEnabled = true + }) + defer cleanupS1() + s2, _, cleanupS2 := TestACLServer(t, func(c *Config) { + c.Region = "region2" + c.AuthoritativeRegion = "region1" + c.ACLEnabled = true + c.ReplicationBackoff = 20 * time.Millisecond + c.ReplicationToken = root.SecretID + }) + defer cleanupS2() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + codec := rpcClient(t, s1) + + // Create the register request + ns1 := mock.Namespace() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1}) + + testutil.WaitForResult(func() (bool, error) { + state := s2.State() + out, err := state.NamespaceByName(nil, ns1.Name) + return out != nil, err + }, func(err error) { + t.Fatalf("should replicate namespace") + }) + + // Create a job in the namespace on the non-authority + j := mock.Job() + j.Namespace = ns1.Name + assert.Nil(s2.fsm.State().UpsertJob(structs.MsgTypeTestSetup, 1001, j)) + + // Delete the namespaces without the correct permissions + req := &structs.NamespaceDeleteRequest{ + Namespaces: []string{ns1.Name}, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp) + if assert.NotNil(err) { + assert.EqualError(err, structs.ErrPermissionDenied.Error()) + } + + // Try with a auth token + req.AuthToken = root.SecretID + var resp2 structs.GenericResponse + err = msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp2) + if assert.NotNil(err) { + assert.Contains(err.Error(), "has non-terminal jobs") + } +} + +func TestNamespaceEndpoint_DeleteNamespaces_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state := s1.fsm.State() + s1.fsm.State().UpsertNamespaces(1000, []*structs.Namespace{ns1, ns2}) + + // Create the policy and tokens + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) + + req := &structs.NamespaceDeleteRequest{ + Namespaces: []string{ns1.Name, ns2.Name}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Delete namespaces without a token and expect failure + { + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + + // Check we did not delete the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.NotNil(out) + } + + // Try with an invalid token + req.AuthToken = invalidToken.SecretID + { + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + + // Check we did not delete the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.NotNil(out) + } + + // Try with a root token + req.AuthToken = root.SecretID + { + var resp structs.GenericResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp)) + assert.NotEqual(uint64(0), resp.Index) + + // Check we deleted the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.Nil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.Nil(out) + } +} + +func TestNamespaceEndpoint_DeleteNamespaces_Default(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Delete the default namespace + req := &structs.NamespaceDeleteRequest{ + Namespaces: []string{structs.DefaultNamespace}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.NotNil(msgpackrpc.CallWithCodec(codec, "Namespace.DeleteNamespaces", req, &resp)) +} + +func TestNamespaceEndpoint_UpsertNamespaces(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + ns1 := mock.Namespace() + ns2 := mock.Namespace() + + // Lookup the namespaces + req := &structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1, ns2}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", req, &resp)) + assert.NotEqual(uint64(0), resp.Index) + + // Check we created the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.NotNil(out) +} + +func TestNamespaceEndpoint_UpsertNamespaces_ACL(t *testing.T) { + assert := assert.New(t) + t.Parallel() + s1, root, cleanupS1 := TestACLServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + ns1 := mock.Namespace() + ns2 := mock.Namespace() + state := s1.fsm.State() + + // Create the policy and tokens + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadJob})) + + // Create the register request + req := &structs.NamespaceUpsertRequest{ + Namespaces: []*structs.Namespace{ns1, ns2}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Upsert the namespace without a token and expect failure + { + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + + // Check we did not create the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.Nil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.Nil(out) + } + + // Try with an invalid token + req.AuthToken = invalidToken.SecretID + { + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", req, &resp) + assert.NotNil(err) + assert.Equal(err.Error(), structs.ErrPermissionDenied.Error()) + + // Check we did not create the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.Nil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.Nil(out) + } + + // Try with a root token + req.AuthToken = root.SecretID + { + var resp structs.GenericResponse + assert.Nil(msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", req, &resp)) + assert.NotEqual(uint64(0), resp.Index) + + // Check we created the namespaces + out, err := s1.fsm.State().NamespaceByName(nil, ns1.Name) + assert.Nil(err) + assert.NotNil(out) + + out, err = s1.fsm.State().NamespaceByName(nil, ns2.Name) + assert.Nil(err) + assert.NotNil(out) + } +} diff --git a/nomad/server.go b/nomad/server.go index 44de55471b9..5feb8d328db 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -276,6 +276,7 @@ type endpoints struct { Scaling *Scaling Enterprise *EnterpriseEndpoints Event *Event + Namespace *Namespace // Client endpoints ClientStats *ClientStats @@ -1149,6 +1150,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { s.staticEndpoints.Status = &Status{srv: s, logger: s.logger.Named("status")} s.staticEndpoints.System = &System{srv: s, logger: s.logger.Named("system")} s.staticEndpoints.Search = &Search{srv: s, logger: s.logger.Named("search")} + s.staticEndpoints.Namespace = &Namespace{srv: s} s.staticEndpoints.Enterprise = NewEnterpriseEndpoints(s) // Client endpoints @@ -1166,6 +1168,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { s.staticEndpoints.Event = &Event{srv: s} s.staticEndpoints.Event.register() + } // Register the static handlers @@ -1190,6 +1193,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { server.Register(s.staticEndpoints.ClientCSI) server.Register(s.staticEndpoints.FileSystem) server.Register(s.staticEndpoints.Agent) + server.Register(s.staticEndpoints.Namespace) // Create new dynamic endpoints and add them to the RPC server. node := &Node{srv: s, ctx: ctx, logger: s.logger.Named("client")} diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 7f73bb4fcd6..ce3d05a3fb5 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -9,6 +9,10 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + TableNamespaces = "namespaces" +) + var ( schemaFactories SchemaFactories factoriesLock sync.Mutex @@ -53,6 +57,7 @@ func init() { scalingPolicyTableSchema, scalingEventTableSchema, eventTableSchema, + namespaceTableSchema, }...) } @@ -917,3 +922,28 @@ func eventTableSchema() *memdb.TableSchema { }, } } + +// namespaceTableSchema returns the MemDB schema for the namespace table. +func namespaceTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: TableNamespaces, + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "Name", + }, + }, + "quota": { + Name: "quota", + AllowMissing: true, + Unique: false, + Indexer: &memdb.StringFieldIndex{ + Field: "Quota", + }, + }, + }, + } +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 0bdb4978244..9526c73d4c3 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -109,8 +109,8 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { s.db = NewChangeTrackerDB(db, nil, noOpProcessChanges, 0) } - // Initialize the state store with required enterprise objects - if err := s.enterpriseInit(); err != nil { + // Initialize the state store with the default namespace. + if err := s.namespaceInit(); err != nil { return nil, fmt.Errorf("enterprise state store initialization failed: %v", err) } @@ -148,6 +148,25 @@ func (s *StateStore) deleteEvent(events *structs.Events) error { return nil } +// namespaceInit ensures the default namespace exists. +func (s *StateStore) namespaceInit() error { + // Create the default namespace. This is safe to do every time we create the + // state store. There are two main cases, a brand new cluster in which case + // each server will have the same default namespace object, or a new cluster + // in which case if the default namespace has been modified, it will be + // overridden by the restore code path. + defaultNs := &structs.Namespace{ + Name: structs.DefaultNamespace, + Description: structs.DefaultNamespaceDescription, + } + + if err := s.UpsertNamespaces(1, []*structs.Namespace{defaultNs}); err != nil { + return fmt.Errorf("inserting default namespace failed: %v", err) + } + + return nil +} + // Config returns the state store configuration. func (s *StateStore) Config() *StateStoreConfig { return s.config @@ -5485,6 +5504,206 @@ func (s *StateStore) UpsertScalingPoliciesTxn(index uint64, scalingPolicies []*s return nil } +// NamespaceByName is used to lookup a namespace by name +func (s *StateStore) NamespaceByName(ws memdb.WatchSet, name string) (*structs.Namespace, error) { + txn := s.db.ReadTxn() + return s.namespaceByNameImpl(ws, txn, name) +} + +// namespaceByNameImpl is used to lookup a namespace by name +func (s *StateStore) namespaceByNameImpl(ws memdb.WatchSet, txn *txn, name string) (*structs.Namespace, error) { + watchCh, existing, err := txn.FirstWatch(TableNamespaces, "id", name) + if err != nil { + return nil, fmt.Errorf("namespace lookup failed: %v", err) + } + ws.Add(watchCh) + + if existing != nil { + return existing.(*structs.Namespace), nil + } + return nil, nil +} + +// namespaceExists returns whether a namespace exists +func (s *StateStore) namespaceExists(txn *txn, namespace string) (bool, error) { + if namespace == structs.DefaultNamespace { + return true, nil + } + + existing, err := txn.First(TableNamespaces, "id", namespace) + if err != nil { + return false, fmt.Errorf("namespace lookup failed: %v", err) + } + + return existing != nil, nil +} + +// NamespacesByNamePrefix is used to lookup namespaces by prefix +func (s *StateStore) NamespacesByNamePrefix(ws memdb.WatchSet, namePrefix string) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + iter, err := txn.Get(TableNamespaces, "id_prefix", namePrefix) + if err != nil { + return nil, fmt.Errorf("namespaces lookup failed: %v", err) + } + ws.Add(iter.WatchCh()) + + return iter, nil +} + +// Namespaces returns an iterator over all the namespaces +func (s *StateStore) Namespaces(ws memdb.WatchSet) (memdb.ResultIterator, error) { + txn := s.db.ReadTxn() + + // Walk the entire namespace table + iter, err := txn.Get(TableNamespaces, "id") + if err != nil { + return nil, err + } + ws.Add(iter.WatchCh()) + return iter, nil +} + +func (s *StateStore) NamespaceNames() ([]string, error) { + it, err := s.Namespaces(nil) + if err != nil { + return nil, err + } + + nses := []string{} + for { + next := it.Next() + if next == nil { + break + } + ns := next.(*structs.Namespace) + nses = append(nses, ns.Name) + } + + return nses, nil +} + +// UpsertNamespace is used to register or update a set of namespaces +func (s *StateStore) UpsertNamespaces(index uint64, namespaces []*structs.Namespace) error { + txn := s.db.WriteTxn(index) + defer txn.Abort() + + for _, ns := range namespaces { + if err := s.upsertNamespaceImpl(index, txn, ns); err != nil { + return err + } + } + + if err := txn.Insert("index", &IndexEntry{TableNamespaces, index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + txn.Commit() + return nil +} + +// upsertNamespaceImpl is used to upsert a namespace +func (s *StateStore) upsertNamespaceImpl(index uint64, txn *txn, namespace *structs.Namespace) error { + // Ensure the namespace hash is non-nil. This should be done outside the state store + // for performance reasons, but we check here for defense in depth. + ns := namespace + if len(ns.Hash) == 0 { + ns.SetHash() + } + + // Check if the namespace already exists + existing, err := txn.First(TableNamespaces, "id", ns.Name) + if err != nil { + return fmt.Errorf("namespace lookup failed: %v", err) + } + + // Setup the indexes correctly and determine which quotas need to be + // reconciled + var oldQuota string + if existing != nil { + exist := existing.(*structs.Namespace) + ns.CreateIndex = exist.CreateIndex + ns.ModifyIndex = index + + // Grab the old quota on the namespace + oldQuota = exist.Quota + } else { + ns.CreateIndex = index + ns.ModifyIndex = index + } + + // Validate that the quota on the new namespace exists + if ns.Quota != "" { + exists, err := s.quotaSpecExists(txn, ns.Quota) + if err != nil { + return fmt.Errorf("looking up namespace quota %q failed: %v", ns.Quota, err) + } else if !exists { + return fmt.Errorf("namespace %q using non-existent quota %q", ns.Name, ns.Quota) + } + } + + // Insert the namespace + if err := txn.Insert(TableNamespaces, ns); err != nil { + return fmt.Errorf("namespace insert failed: %v", err) + } + + // Reconcile changed quotas + return s.quotaReconcile(index, txn, ns.Quota, oldQuota) +} + +// DeleteNamespaces is used to remove a set of namespaces +func (s *StateStore) DeleteNamespaces(index uint64, names []string) error { + txn := s.db.WriteTxn(index) + defer txn.Abort() + + for _, name := range names { + // Lookup the namespace + existing, err := txn.First(TableNamespaces, "id", name) + if err != nil { + return fmt.Errorf("namespace lookup failed: %v", err) + } + if existing == nil { + return fmt.Errorf("namespace not found") + } + + ns := existing.(*structs.Namespace) + if ns.Name == structs.DefaultNamespace { + return fmt.Errorf("default namespace can not be deleted") + } + + // Ensure that the namespace doesn't have any non-terminal jobs + iter, err := s.jobsByNamespaceImpl(nil, name, txn) + if err != nil { + return err + } + + for { + raw := iter.Next() + if raw == nil { + break + } + job := raw.(*structs.Job) + + if job.Status != structs.JobStatusDead { + return fmt.Errorf("namespace %q contains at least one non-terminal job %q. "+ + "All jobs must be terminal in namespace before it can be deleted", name, job.ID) + } + } + + // Delete the namespace + if err := txn.Delete(TableNamespaces, existing); err != nil { + return fmt.Errorf("namespace deletion failed: %v", err) + } + } + + if err := txn.Insert("index", &IndexEntry{TableNamespaces, index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + txn.Commit() + return nil +} + func (s *StateStore) DeleteScalingPolicies(index uint64, ids []string) error { txn := s.db.WriteTxn(index) defer txn.Abort() @@ -5969,3 +6188,11 @@ func (r *StateRestore) ScalingEventsRestore(jobEvents *structs.JobScalingEvents) } return nil } + +// NamespaceRestore is used to restore a namespace +func (r *StateRestore) NamespaceRestore(ns *structs.Namespace) error { + if err := r.txn.Insert(TableNamespaces, ns); err != nil { + return fmt.Errorf("namespace insert failed: %v", err) + } + return nil +} diff --git a/nomad/state/state_store_oss.go b/nomad/state/state_store_oss.go index 487f8421378..c161caf815e 100644 --- a/nomad/state/state_store_oss.go +++ b/nomad/state/state_store_oss.go @@ -6,15 +6,13 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -// enterpriseInit is used to initialize the state store with enterprise -// objects. -func (s *StateStore) enterpriseInit() error { - return nil +// quotaSpecExists on returns whether the quota exists +func (s *StateStore) quotaSpecExists(txn *txn, name string) (bool, error) { + return false, nil } -// namespaceExists returns whether a namespace exists -func (s *StateStore) namespaceExists(txn *txn, namespace string) (bool, error) { - return namespace == structs.DefaultNamespace, nil +func (s *StateStore) quotaReconcile(index uint64, txn *txn, newQuota, oldQuota string) error { + return nil } // updateEntWithAlloc is used to update Nomad Enterprise objects when an allocation is @@ -22,7 +20,3 @@ func (s *StateStore) namespaceExists(txn *txn, namespace string) (bool, error) { func (s *StateStore) updateEntWithAlloc(index uint64, new, existing *structs.Allocation, txn *txn) error { return nil } - -func (s *StateStore) NamespaceNames() ([]string, error) { - return []string{structs.DefaultNamespace}, nil -} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 44f9b1f9c41..9a54bae3cba 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -56,47 +56,51 @@ type MessageType uint8 // note: new raft message types need to be added to the end of this // list of contents const ( - NodeRegisterRequestType MessageType = iota - NodeDeregisterRequestType - NodeUpdateStatusRequestType - NodeUpdateDrainRequestType - JobRegisterRequestType - JobDeregisterRequestType - EvalUpdateRequestType - EvalDeleteRequestType - AllocUpdateRequestType - AllocClientUpdateRequestType - ReconcileJobSummariesRequestType - VaultAccessorRegisterRequestType - VaultAccessorDeregisterRequestType - ApplyPlanResultsRequestType - DeploymentStatusUpdateRequestType - DeploymentPromoteRequestType - DeploymentAllocHealthRequestType - DeploymentDeleteRequestType - JobStabilityRequestType - ACLPolicyUpsertRequestType - ACLPolicyDeleteRequestType - ACLTokenUpsertRequestType - ACLTokenDeleteRequestType - ACLTokenBootstrapRequestType - AutopilotRequestType - UpsertNodeEventsType - JobBatchDeregisterRequestType - AllocUpdateDesiredTransitionRequestType - NodeUpdateEligibilityRequestType - BatchNodeUpdateDrainRequestType - SchedulerConfigRequestType - NodeBatchDeregisterRequestType - ClusterMetadataRequestType - ServiceIdentityAccessorRegisterRequestType - ServiceIdentityAccessorDeregisterRequestType - CSIVolumeRegisterRequestType - CSIVolumeDeregisterRequestType - CSIVolumeClaimRequestType - ScalingEventRegisterRequestType - CSIVolumeClaimBatchRequestType - CSIPluginDeleteRequestType + NodeRegisterRequestType MessageType = 0 + NodeDeregisterRequestType MessageType = 1 + NodeUpdateStatusRequestType MessageType = 2 + NodeUpdateDrainRequestType MessageType = 3 + JobRegisterRequestType MessageType = 4 + JobDeregisterRequestType MessageType = 5 + EvalUpdateRequestType MessageType = 6 + EvalDeleteRequestType MessageType = 7 + AllocUpdateRequestType MessageType = 8 + AllocClientUpdateRequestType MessageType = 9 + ReconcileJobSummariesRequestType MessageType = 10 + VaultAccessorRegisterRequestType MessageType = 11 + VaultAccessorDeregisterRequestType MessageType = 12 + ApplyPlanResultsRequestType MessageType = 13 + DeploymentStatusUpdateRequestType MessageType = 14 + DeploymentPromoteRequestType MessageType = 15 + DeploymentAllocHealthRequestType MessageType = 16 + DeploymentDeleteRequestType MessageType = 17 + JobStabilityRequestType MessageType = 18 + ACLPolicyUpsertRequestType MessageType = 19 + ACLPolicyDeleteRequestType MessageType = 20 + ACLTokenUpsertRequestType MessageType = 21 + ACLTokenDeleteRequestType MessageType = 22 + ACLTokenBootstrapRequestType MessageType = 23 + AutopilotRequestType MessageType = 24 + UpsertNodeEventsType MessageType = 25 + JobBatchDeregisterRequestType MessageType = 26 + AllocUpdateDesiredTransitionRequestType MessageType = 27 + NodeUpdateEligibilityRequestType MessageType = 28 + BatchNodeUpdateDrainRequestType MessageType = 29 + SchedulerConfigRequestType MessageType = 30 + NodeBatchDeregisterRequestType MessageType = 31 + ClusterMetadataRequestType MessageType = 32 + ServiceIdentityAccessorRegisterRequestType MessageType = 33 + ServiceIdentityAccessorDeregisterRequestType MessageType = 34 + CSIVolumeRegisterRequestType MessageType = 35 + CSIVolumeDeregisterRequestType MessageType = 36 + CSIVolumeClaimRequestType MessageType = 37 + ScalingEventRegisterRequestType MessageType = 38 + CSIVolumeClaimBatchRequestType MessageType = 39 + CSIPluginDeleteRequestType MessageType = 40 + + // Namespace types were moved from enterprise and therefore start at 64 + NamespaceUpsertRequestType MessageType = 64 + NamespaceDeleteRequestType MessageType = 65 ) const ( @@ -148,6 +152,9 @@ const ( // to indicate that endpoints must search in all namespaces AllNamespacesSentinel = "*" + // maxNamespaceDescriptionLength limits a namespace description length + maxNamespaceDescriptionLength = 256 + // JitterFraction is a the limit to the amount of jitter we apply // to a user specified MaxQueryTime. We divide the specified time by // the fraction. So 16 == 6.25% limit of jitter. This jitter is also @@ -173,6 +180,11 @@ const ( DefaultBlockingRPCQueryTime = 300 * time.Second ) +var ( + // validNamespaceName is used to validate a namespace name + validNamespaceName = regexp.MustCompile("^[a-zA-Z0-9-]{1,128}$") +) + // Context defines the scope in which a search for Nomad object operates, and // is also used to query the matching index value for this context type Context string @@ -4698,6 +4710,119 @@ type MultiregionRegion struct { Meta map[string]string } +// Namespace allows logically grouping jobs and their associated objects. +type Namespace struct { + // Name is the name of the namespace + Name string + + // Description is a human readable description of the namespace + Description string + + // Quota is the quota specification that the namespace should account + // against. + Quota string + + // Hash is the hash of the namespace which is used to efficiently replicate + // cross-regions. + Hash []byte + + // Raft Indexes + CreateIndex uint64 + ModifyIndex uint64 +} + +func (n *Namespace) Validate() error { + var mErr multierror.Error + + // Validate the name and description + if !validNamespaceName.MatchString(n.Name) { + err := fmt.Errorf("invalid name %q. Must match regex %s", n.Name, validNamespaceName) + mErr.Errors = append(mErr.Errors, err) + } + if len(n.Description) > maxNamespaceDescriptionLength { + err := fmt.Errorf("description longer than %d", maxNamespaceDescriptionLength) + mErr.Errors = append(mErr.Errors, err) + } + + return mErr.ErrorOrNil() +} + +// SetHash is used to compute and set the hash of the namespace +func (n *Namespace) SetHash() []byte { + // Initialize a 256bit Blake2 hash (32 bytes) + hash, err := blake2b.New256(nil) + if err != nil { + panic(err) + } + + // Write all the user set fields + hash.Write([]byte(n.Name)) + hash.Write([]byte(n.Description)) + hash.Write([]byte(n.Quota)) + + // Finalize the hash + hashVal := hash.Sum(nil) + + // Set and return the hash + n.Hash = hashVal + return hashVal +} + +func (n *Namespace) Copy() *Namespace { + nc := new(Namespace) + *nc = *n + nc.Hash = make([]byte, len(n.Hash)) + copy(nc.Hash, n.Hash) + return nc +} + +// NamespaceListRequest is used to request a list of namespaces +type NamespaceListRequest struct { + QueryOptions +} + +// NamespaceListResponse is used for a list request +type NamespaceListResponse struct { + Namespaces []*Namespace + QueryMeta +} + +// NamespaceSpecificRequest is used to query a specific namespace +type NamespaceSpecificRequest struct { + Name string + QueryOptions +} + +// SingleNamespaceResponse is used to return a single namespace +type SingleNamespaceResponse struct { + Namespace *Namespace + QueryMeta +} + +// NamespaceSetRequest is used to query a set of namespaces +type NamespaceSetRequest struct { + Namespaces []string + QueryOptions +} + +// NamespaceSetResponse is used to return a set of namespaces +type NamespaceSetResponse struct { + Namespaces map[string]*Namespace // Keyed by namespace Name + QueryMeta +} + +// NamespaceDeleteRequest is used to delete a set of namespaces +type NamespaceDeleteRequest struct { + Namespaces []string + WriteRequest +} + +// NamespaceUpsertRequest is used to upsert a set of namespaces +type NamespaceUpsertRequest struct { + Namespaces []*Namespace + WriteRequest +} + const ( // PeriodicSpecCron is used for a cron spec. PeriodicSpecCron = "cron"