diff --git a/nomad/state/deployment_events_test.go b/nomad/state/deployment_events_test.go index 3bf9e9f6c0a..153f185ae85 100644 --- a/nomad/state/deployment_events_test.go +++ b/nomad/state/deployment_events_test.go @@ -83,7 +83,7 @@ func WaitForEvents(t *testing.T, s *StateStore, index uint64, minEvents int, tim } maxAttempts-- if maxAttempts == 0 { - require.Failf(t, "reached max attempts waiting for desired event count", "count %d", len(got)) + require.Failf(t, "reached max attempts waiting for desired event count", "count %d got: %+v", len(got), got) } time.Sleep(10 * time.Millisecond) } diff --git a/nomad/state/events.go b/nomad/state/events.go index c338133c418..a78111e5975 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -41,6 +41,8 @@ var MsgTypeEvents = map[structs.MessageType]string{ structs.ServiceRegistrationUpsertRequestType: structs.TypeServiceRegistration, structs.ServiceRegistrationDeleteByIDRequestType: structs.TypeServiceDeregistration, structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration, + structs.HostVolumeRegisterRequestType: structs.TypeHostVolumeRegistered, + structs.HostVolumeDeleteRequestType: structs.TypeHostVolumeDeleted, } func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events { @@ -181,6 +183,24 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { Service: before, }, }, true + case TableHostVolumes: + before, ok := change.Before.(*structs.HostVolume) + if !ok { + return structs.Event{}, false + } + return structs.Event{ + Topic: structs.TopicHostVolume, + Key: before.ID, + FilterKeys: []string{ + before.ID, + before.Name, + before.PluginID, + }, + Namespace: before.Namespace, + Payload: &structs.HostVolumeEvent{ + Volume: before, + }, + }, true } return structs.Event{}, false } @@ -358,6 +378,24 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { Service: after, }, }, true + case TableHostVolumes: + after, ok := change.After.(*structs.HostVolume) + if !ok { + return structs.Event{}, false + } + return structs.Event{ + Topic: structs.TopicHostVolume, + Key: after.ID, + FilterKeys: []string{ + after.ID, + after.Name, + after.PluginID, + }, + Namespace: after.Namespace, + Payload: &structs.HostVolumeEvent{ + Volume: after, + }, + }, true } return structs.Event{}, false diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 8e15e27fb9d..9b52ae3bfc8 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -1215,6 +1215,49 @@ func Test_eventsFromChanges_ACLBindingRule(t *testing.T) { must.Eq(t, bindingRule, receivedDeleteChange.Events[0].Payload.(*structs.ACLBindingRuleEvent).ACLBindingRule) } +func TestEvents_HostVolumes(t *testing.T) { + + ci.Parallel(t) + store := TestStateStoreCfg(t, TestStateStorePublisher(t)) + defer store.StopEventBroker() + + index, err := store.LatestIndex() + must.NoError(t, err) + + node := mock.Node() + index++ + must.NoError(t, store.UpsertNode(structs.NodeRegisterRequestType, index, node, NodeUpsertWithNodePool)) + + vol := mock.HostVolume() + vol.NodeID = node.ID + index++ + must.NoError(t, store.UpsertHostVolume(index, vol)) + + node = node.Copy() + node.HostVolumes = map[string]*structs.ClientHostVolumeConfig{vol.Name: { + Name: vol.Name, + Path: "/var/nomad/alloc_mounts" + uuid.Generate(), + }} + index++ + must.NoError(t, store.UpsertNode(structs.NodeRegisterRequestType, index, node, NodeUpsertWithNodePool)) + + index++ + must.NoError(t, store.DeleteHostVolume(index, vol.Namespace, vol.ID)) + + events := WaitForEvents(t, store, 0, 5, 1*time.Second) + must.Len(t, 5, events) + must.Eq(t, "Node", events[0].Topic) + must.Eq(t, "NodeRegistration", events[0].Type) + must.Eq(t, "HostVolume", events[1].Topic) + must.Eq(t, "HostVolumeRegistered", events[1].Type) + must.Eq(t, "Node", events[2].Topic) + must.Eq(t, "NodeRegistration", events[2].Type) + must.Eq(t, "HostVolume", events[3].Topic) + must.Eq(t, "NodeRegistration", events[3].Type) + must.Eq(t, "HostVolume", events[4].Topic) + must.Eq(t, "HostVolumeDeleted", events[4].Type) +} + func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) { t.Helper() diff --git a/nomad/state/state_store_host_volumes.go b/nomad/state/state_store_host_volumes.go index 7e55e6ced43..8cfe9010cf6 100644 --- a/nomad/state/state_store_host_volumes.go +++ b/nomad/state/state_store_host_volumes.go @@ -53,7 +53,7 @@ func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs // UpsertHostVolume upserts a host volume func (s *StateStore) UpsertHostVolume(index uint64, vol *structs.HostVolume) error { - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnMsgT(structs.HostVolumeRegisterRequestType, index) defer txn.Abort() if exists, err := s.namespaceExists(txn, vol.Namespace); err != nil { @@ -117,7 +117,7 @@ func (s *StateStore) UpsertHostVolume(index uint64, vol *structs.HostVolume) err // DeleteHostVolume deletes a host volume func (s *StateStore) DeleteHostVolume(index uint64, ns string, id string) error { - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnMsgT(structs.HostVolumeDeleteRequestType, index) defer txn.Abort() obj, err := txn.First(TableHostVolumes, indexID, ns, id) diff --git a/nomad/stream/event_broker.go b/nomad/stream/event_broker.go index 41d52f064e4..05a4b8449c1 100644 --- a/nomad/stream/event_broker.go +++ b/nomad/stream/event_broker.go @@ -363,6 +363,10 @@ func aclAllowsSubscription(aclObj *acl.ACL, subReq *SubscribeRequest) bool { if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok { return false } + case structs.TopicHostVolume: + if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityHostVolumeRead); !ok { + return false + } case structs.TopicNode: if ok := aclObj.AllowNodeRead(); !ok { return false diff --git a/nomad/structs/event.go b/nomad/structs/event.go index 466421d38a5..2b5c81316a8 100644 --- a/nomad/structs/event.go +++ b/nomad/structs/event.go @@ -31,6 +31,7 @@ const ( TopicACLAuthMethod Topic = "ACLAuthMethod" TopicACLBindingRule Topic = "ACLBindingRule" TopicService Topic = "Service" + TopicHostVolume Topic = "HostVolume" TopicAll Topic = "*" TypeNodeRegistration = "NodeRegistration" @@ -63,6 +64,8 @@ const ( TypeACLBindingRuleDeleted = "ACLBindingRuleDeleted" TypeServiceRegistration = "ServiceRegistration" TypeServiceDeregistration = "ServiceDeregistration" + TypeHostVolumeRegistered = "HostVolumeRegistered" + TypeHostVolumeDeleted = "HostVolumeDeleted" ) // Event represents a change in Nomads state. @@ -188,3 +191,9 @@ type ACLAuthMethodEvent struct { type ACLBindingRuleEvent struct { ACLBindingRule *ACLBindingRule } + +// HostVolumeEvent holds a newly updated or deleted dynamic host volume to be +// used as an event in the event stream +type HostVolumeEvent struct { + Volume *HostVolume +} diff --git a/website/content/api-docs/events.mdx b/website/content/api-docs/events.mdx index 0cc34c76a73..f006e2982e3 100644 --- a/website/content/api-docs/events.mdx +++ b/website/content/api-docs/events.mdx @@ -28,19 +28,20 @@ the nature of this endpoint individual topics require specific policies. Note that if you do not include a `topic` parameter all topics will be included by default, requiring a management token. -| Topic | ACL Required | -| ------------ | -------------------- | -| `*` | `management` | -| `ACLToken` | `management` | -| `ACLPolicy` | `management` | -| `ACLRole` | `management` | -| `Job` | `namespace:read-job` | -| `Allocation` | `namespace:read-job` | -| `Deployment` | `namespace:read-job` | -| `Evaluation` | `namespace:read-job` | -| `Node` | `node:read` | -| `NodePool` | `management` | -| `Service` | `namespace:read-job` | +| Topic | ACL Required | +|--------------|------------------------------| +| `*` | `management` | +| `ACLPolicy` | `management` | +| `ACLRole` | `management` | +| `ACLToken` | `management` | +| `Allocation` | `namespace:read-job` | +| `Deployment` | `namespace:read-job` | +| `Evaluation` | `namespace:read-job` | +| `HostVolume` | `namespace:host-volume-read` | +| `Job` | `namespace:read-job` | +| `NodePool` | `management` | +| `Node` | `node:read` | +| `Service` | `namespace:read-job` | ### Parameters @@ -65,50 +66,54 @@ by default, requiring a management token. ### Event Topics -| Topic | Output | -| ---------- | ------------------------------- | -| ACLToken | ACLToken | -| ACLPolicy | ACLPolicy | -| ACLRoles | ACLRole | -| Allocation | Allocation (no job information) | -| Job | Job | -| Evaluation | Evaluation | -| Deployment | Deployment | -| Node | Node | -| NodeDrain | Node | -| NodePool | NodePool | -| Service | Service Registrations | +| Topic | Output | +|------------|----------------------------------------| +| ACLPolicy | ACLPolicy | +| ACLRoles | ACLRole | +| ACLToken | ACLToken | +| Allocation | Allocation (no job information) | +| Deployment | Deployment | +| Evaluation | Evaluation | +| HostVolume | HostVolume (dynamic host volumes only) | +| Job | Job | +| Node | Node | +| NodeDrain | Node | +| NodePool | NodePool | +| Service | Service Registrations | ### Event Types | Type | -| ----------------------------- | -| ACLTokenUpserted | -| ACLTokenDeleted | -| ACLPolicyUpserted | +|-------------------------------| | ACLPolicyDeleted | -| ACLRoleUpserted | +| ACLPolicyUpserted | | ACLRoleDeleted | +| ACLRoleUpserted | +| ACLTokenDeleted | +| ACLTokenUpserted | | AllocationCreated | -| AllocationUpdated | | AllocationUpdateDesiredStatus | -| DeploymentStatusUpdate | -| DeploymentPromotion | +| AllocationUpdated | | DeploymentAllocHealth | +| DeploymentPromotion | +| DeploymentStatusUpdate | | EvaluationUpdated | -| JobRegistered | -| JobDeregistered | +| HostVolumeDeleted | +| HostVolumeRegistered | | JobBatchDeregistered | -| NodeRegistration | +| JobDeregistered | +| JobRegistered | | NodeDeregistration | -| NodeEligibility | | NodeDrain | +| NodeEligibility | | NodeEvent | -| NodePoolUpserted | | NodePoolDeleted | +| NodePoolUpserted | +| NodeRegistration | | PlanResult | -| ServiceRegistration | | ServiceDeregistration | +| ServiceRegistration | + ### Sample Request