diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 545c3f3201b..65ce87813db 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1048,6 +1048,9 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error { if err := upsertCSIPluginsForNode(txn, node, index); err != nil { return fmt.Errorf("csi plugin update failed: %v", err) } + if err := upsertHostVolumeForNode(txn, node, index); err != nil { + return fmt.Errorf("dynamic host volumes update failed: %v", err) + } return nil } diff --git a/nomad/state/state_store_host_volumes.go b/nomad/state/state_store_host_volumes.go index 27013b05d90..dd5a68040f2 100644 --- a/nomad/state/state_store_host_volumes.go +++ b/nomad/state/state_store_host_volumes.go @@ -75,7 +75,7 @@ func (s *StateStore) UpsertHostVolumes(index uint64, volumes []*structs.HostVolu } // If the fingerprint is written from the node before the create RPC - // handler completes, we'll never update from the initial pending , so + // handler completes, we'll never update from the initial pending, so // reconcile that here node, err := s.NodeByID(nil, v.NodeID) if err != nil { @@ -190,3 +190,35 @@ func (s *StateStore) hostVolumesIter(ws memdb.WatchSet, index string, sort SortO ws.Add(iter.WatchCh()) return iter, nil } + +// upsertHostVolumeForNode sets newly fingerprinted host volumes to ready state +func upsertHostVolumeForNode(txn *txn, node *structs.Node, index uint64) error { + if len(node.HostVolumes) == 0 { + return nil + } + iter, err := txn.Get(TableHostVolumes, indexNodeID, node.ID) + if err != nil { + return err + } + for { + raw := iter.Next() + if raw == nil { + return nil + } + vol := raw.(*structs.HostVolume) + switch vol.State { + case structs.HostVolumeStateUnknown, structs.HostVolumeStatePending: + if _, ok := node.HostVolumes[vol.Name]; ok { + vol = vol.Copy() + vol.State = structs.HostVolumeStateReady + vol.ModifyIndex = index + err = txn.Insert(TableHostVolumes, vol) + if err != nil { + return fmt.Errorf("host volume insert: %w", err) + } + } + default: + // don't touch ready or soft-deleted volumes + } + } +} diff --git a/nomad/state/state_store_host_volumes_test.go b/nomad/state/state_store_host_volumes_test.go index 11b8371152e..af4c77a729b 100644 --- a/nomad/state/state_store_host_volumes_test.go +++ b/nomad/state/state_store_host_volumes_test.go @@ -9,6 +9,7 @@ import ( memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/shoenig/test/must" @@ -163,3 +164,91 @@ func TestStateStore_HostVolumes_CRUD(t *testing.T) { got = consumeIter(iter) must.MapLen(t, 3, got, must.Sprint(`expected 3 volumes remain`)) } + +func TestStateStore_UpdateHostVolumesFromFingerprint(t *testing.T) { + ci.Parallel(t) + store := testStateStore(t) + index, err := store.LatestIndex() + must.NoError(t, err) + + node := mock.Node() + node.HostVolumes = map[string]*structs.ClientHostVolumeConfig{ + "static-vol": {Name: "static-vol", Path: "/srv/static"}, + "dhv-zero": {Name: "dhv-zero", Path: "/var/nomad/alloc_mounts" + uuid.Generate()}, + } + index++ + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, + index, node, NodeUpsertWithNodePool)) + otherNode := mock.Node() + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, + index, otherNode, NodeUpsertWithNodePool)) + + ns := structs.DefaultNamespace + + vols := []*structs.HostVolume{ + mock.HostVolume(), + mock.HostVolume(), + mock.HostVolume(), + mock.HostVolume(), + } + + // a volume that's been fingerprinted before we can write it to state + vols[0].Name = "dhv-zero" + vols[0].NodeID = node.ID + + // a volume that will match the new fingerprint + vols[1].Name = "dhv-one" + vols[1].NodeID = node.ID + + // a volume that matches the new fingerprint but on the wrong node + vols[2].Name = "dhv-one" + vols[2].NodeID = otherNode.ID + + // a volume that won't be fingerprinted + vols[3].Name = "dhv-two" + vols[3].NodeID = node.ID + + index++ + oldIndex := index + must.NoError(t, store.UpsertHostVolumes(index, vols)) + + vol0, err := store.HostVolumeByID(nil, ns, vols[0].ID, false) + must.NoError(t, err) + must.Eq(t, structs.HostVolumeStateReady, vol0.State, + must.Sprint("previously-fingerprinted volume should be in ready state")) + + // update the fingerprint + + node = node.Copy() + node.HostVolumes["dhv-one"] = &structs.ClientHostVolumeConfig{ + Name: "dhv-one", + Path: "/var/nomad/alloc_mounts" + uuid.Generate(), + } + + index++ + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, node)) + + vol0, err = store.HostVolumeByID(nil, ns, vols[0].ID, false) + must.NoError(t, err) + must.Eq(t, oldIndex, vol0.ModifyIndex, must.Sprint("expected no further update")) + must.Eq(t, structs.HostVolumeStateReady, vol0.State) + + vol1, err := store.HostVolumeByID(nil, ns, vols[1].ID, false) + must.NoError(t, err) + must.Eq(t, index, vol1.ModifyIndex, + must.Sprint("fingerprint should update pending volume")) + must.Eq(t, structs.HostVolumeStateReady, vol1.State) + + vol2, err := store.HostVolumeByID(nil, ns, vols[2].ID, false) + must.NoError(t, err) + must.Eq(t, oldIndex, vol2.ModifyIndex, + must.Sprint("volume on other node should not change")) + must.Eq(t, structs.HostVolumeStatePending, vol2.State) + + vol3, err := store.HostVolumeByID(nil, ns, vols[3].ID, false) + must.NoError(t, err) + must.Eq(t, oldIndex, vol3.ModifyIndex, + must.Sprint("volume not fingerprinted should not change")) + must.Eq(t, structs.HostVolumeStatePending, vol3.State) + +}