Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csi: plugins track jobs in addition to allocations, and use job information to set expected counts #8699

Merged
merged 20 commits into from
Aug 27, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
5d9ad99
nomad/structs/csi: add explicit job support
langmartin Aug 18, 2020
15c59f9
nomad/state/state_store: capture job updates directly
langmartin Aug 18, 2020
8ae586d
command/agent/csi_endpoint: add the JobDescriptionToApi
langmartin Aug 19, 2020
1db9d68
api/csi: add JobDescription
langmartin Aug 19, 2020
a1f2141
api/nodes: CSIInfo needs the AllocID
langmartin Aug 20, 2020
01d6d1e
command/agent/csi_endpoint: AllocID was missing, JobDescription
langmartin Aug 20, 2020
4ed3f86
nomad/state/state_store: restore provider & version
langmartin Aug 22, 2020
1970511
Update nomad/state/state_store.go
langmartin Aug 24, 2020
47b0b9f
nomad/structs/csi: comment the map types
langmartin Aug 24, 2020
1fe8961
nomad/state/state_store: boilerplate left in by accident
langmartin Aug 24, 2020
62e5e09
nomad/structs/csi: IsEmpty handles jobs correctly, nil summary == 0
langmartin Aug 24, 2020
c1056b9
nomad/csi_endpoint_test: plugin lifecycle with job
langmartin Aug 24, 2020
6fa359e
nomad/state/state_store: cleanup plugins when allocs missing
langmartin Aug 24, 2020
875d9f1
nomad/structs/csi: headfake the linter
langmartin Aug 24, 2020
f4d19d6
nomad/structs/csi: nil safe methods
langmartin Aug 25, 2020
857a5bb
api/csi: back out api change
langmartin Aug 26, 2020
85f22b2
command/agent/csi_endpoint: back out api change
langmartin Aug 26, 2020
464c166
nomad/structs/csi: don't track version
langmartin Aug 26, 2020
2de6679
nomad/state/state_store: typos
langmartin Aug 26, 2020
3e96d1c
nomad/structs/csi: comments, remove AsSlice
langmartin Aug 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,10 @@ type CSIPlugin struct {
Allocations []*AllocationListStub
ControllersHealthy int
ControllersExpected int
ControllerJobs []JobDescription
NodesHealthy int
NodesExpected int
NodeJobs []JobDescription
CreateIndex uint64
ModifyIndex uint64
}
Expand Down Expand Up @@ -246,6 +248,12 @@ func (v CSIPluginIndexSort) Swap(i, j int) {
v[i], v[j] = v[j], v[i]
}

type JobDescription struct {
Namespace string
ID string
Version uint64
}

// CSIPlugins returns a handle on the CSIPlugins endpoint
func (c *Client) CSIPlugins() *CSIPlugins {
return &CSIPlugins{client: c}
Expand Down
1 change: 1 addition & 0 deletions api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ type CSIControllerInfo struct {
// as plugin health changes on the node.
type CSIInfo struct {
PluginID string
AllocID string
Healthy bool
HealthDescription string
UpdateTime time.Time
Expand Down
24 changes: 22 additions & 2 deletions command/agent/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,11 +278,13 @@ func structsCSIPluginToApi(plug *structs.CSIPlugin) *api.CSIPlugin {
Allocations: make([]*api.AllocationListStub, 0, len(plug.Allocations)),
ControllerRequired: plug.ControllerRequired,
ControllersHealthy: plug.ControllersHealthy,
ControllersExpected: len(plug.Controllers),
ControllersExpected: plug.ControllersExpected,
Controllers: make(map[string]*api.CSIInfo, len(plug.Controllers)),
ControllerJobs: make([]api.JobDescription, 0, plug.ControllersExpected),
NodesHealthy: plug.NodesHealthy,
NodesExpected: len(plug.Nodes),
NodesExpected: plug.NodesExpected,
Nodes: make(map[string]*api.CSIInfo, len(plug.Nodes)),
NodeJobs: make([]api.JobDescription, 0, plug.NodesExpected),
CreateIndex: plug.CreateIndex,
ModifyIndex: plug.ModifyIndex,
}
Expand All @@ -299,9 +301,26 @@ func structsCSIPluginToApi(plug *structs.CSIPlugin) *api.CSIPlugin {
out.Allocations = append(out.Allocations, structsAllocListStubToApi(a))
}

for _, jd := range plug.ControllerJobs.AsSlice() {
out.ControllerJobs = append(out.ControllerJobs, structsJobDescriptionToApi(jd))
}

for _, jd := range plug.NodeJobs.AsSlice() {
out.NodeJobs = append(out.NodeJobs, structsJobDescriptionToApi(jd))
}

return out
}

// structsJobDescriptionToApi converts the struct
func structsJobDescriptionToApi(desc structs.JobDescription) api.JobDescription {
return api.JobDescription{
Namespace: desc.Namespace,
ID: desc.ID,
Version: desc.Version,
}
}

// structsCSIVolumeToApi converts CSIVolume, creating the allocation array
func structsCSIVolumeToApi(vol *structs.CSIVolume) *api.CSIVolume {
if vol == nil {
Expand Down Expand Up @@ -358,6 +377,7 @@ func structsCSIInfoToApi(info *structs.CSIInfo) *api.CSIInfo {
}
out := &api.CSIInfo{
PluginID: info.PluginID,
AllocID: info.AllocID,
Healthy: info.Healthy,
HealthDescription: info.HealthDescription,
UpdateTime: info.UpdateTime,
Expand Down
11 changes: 4 additions & 7 deletions command/agent/csi_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
)

Expand All @@ -33,11 +32,13 @@ func TestHTTP_CSIEndpointPlugin(t *testing.T) {
out, ok := obj.(*api.CSIPlugin)
require.True(t, ok)

require.Equal(t, 1, out.ControllersExpected)
// ControllersExpected is 0 because this plugin was created without a job,
// which sets expected
require.Equal(t, 0, out.ControllersExpected)
require.Equal(t, 1, out.ControllersHealthy)
require.Len(t, out.Controllers, 1)

require.Equal(t, 2, out.NodesExpected)
require.Equal(t, 0, out.NodesExpected)
require.Equal(t, 2, out.NodesHealthy)
require.Len(t, out.Nodes, 2)
})
Expand Down Expand Up @@ -92,11 +93,7 @@ func TestHTTP_CSIEndpointVolume(t *testing.T) {
out, ok := raw.(*api.CSIVolume)
require.True(t, ok)

pretty.Log(out)

require.Equal(t, 1, out.ControllersExpected)
require.Equal(t, 1, out.ControllersHealthy)
require.Equal(t, 2, out.NodesExpected)
require.Equal(t, 2, out.NodesHealthy)
})
}
Expand Down
152 changes: 135 additions & 17 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,10 +1050,14 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
return nil
}
plug = structs.NewCSIPlugin(info.PluginID, index)
plug.Provider = info.Provider
plug.Version = info.ProviderVersion
}

// the plugin may have been created by the job being updated, in which case
// this data will not be configured, it's only available to the fingerprint
// system
plug.Provider = info.Provider
plug.Version = info.ProviderVersion

err = plug.AddPlugin(node.ID, info)
if err != nil {
return err
Expand Down Expand Up @@ -1203,10 +1207,15 @@ func updateOrGCPlugin(index uint64, txn *memdb.Txn, plug *structs.CSIPlugin) err
return nil
}

// deleteJobFromPlugin removes the allocations of this job from any plugins the job is
// deleteJobFromPlugins removes the allocations of this job from any plugins the job is
// running, possibly deleting the plugin if it's no longer in use. It's called in DeleteJobTxn
func (s *StateStore) deleteJobFromPlugin(index uint64, txn *memdb.Txn, job *structs.Job) error {
func (s *StateStore) deleteJobFromPlugins(index uint64, txn *memdb.Txn, job *structs.Job) error {
ws := memdb.NewWatchSet()
summary, err := s.JobSummaryByID(ws, job.Namespace, job.ID)
if err != nil {
return fmt.Errorf("error gettting job summary: %v", err)
}

allocs, err := s.AllocsByJob(ws, job.Namespace, job.ID, false)
if err != nil {
return fmt.Errorf("error getting allocations: %v", err)
Expand All @@ -1218,7 +1227,6 @@ func (s *StateStore) deleteJobFromPlugin(index uint64, txn *memdb.Txn, job *stru
}

plugAllocs := []*pair{}
plugins := map[string]*structs.CSIPlugin{}

for _, a := range allocs {
tg := a.Job.LookupTaskGroup(a.TaskGroup)
Expand All @@ -1232,6 +1240,8 @@ func (s *StateStore) deleteJobFromPlugin(index uint64, txn *memdb.Txn, job *stru
}
}

plugins := map[string]*structs.CSIPlugin{}

for _, x := range plugAllocs {
plug, ok := plugins[x.pluginID]

Expand All @@ -1255,6 +1265,7 @@ func (s *StateStore) deleteJobFromPlugin(index uint64, txn *memdb.Txn, job *stru
}

for _, plug := range plugins {
plug.DeleteJob(job, summary)
err = updateOrGCPlugin(index, txn, plug)
if err != nil {
return err
Expand Down Expand Up @@ -1354,6 +1365,7 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b

// Check if the job already exists
existing, err := txn.First("jobs", "id", job.Namespace, job.ID)
var existingJob *structs.Job
if err != nil {
return fmt.Errorf("job lookup failed: %v", err)
}
Expand All @@ -1363,7 +1375,7 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b
job.CreateIndex = existing.(*structs.Job).CreateIndex
job.ModifyIndex = index

existingJob := existing.(*structs.Job)
existingJob = existing.(*structs.Job)

// Bump the version unless asked to keep it. This should only be done
// when changing an internal field such as Stable. A spec change should
Expand Down Expand Up @@ -1413,6 +1425,10 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b
return fmt.Errorf("unable to update job scaling policies: %v", err)
}

if err := s.updateJobCSIPlugins(index, job, existingJob, txn); err != nil {
return fmt.Errorf("unable to update job scaling policies: %v", err)
}

// Insert the job
if err := txn.Insert("jobs", job); err != nil {
return fmt.Errorf("job insert failed: %v", err)
Expand Down Expand Up @@ -1507,6 +1523,12 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
return err
}

// Cleanup plugins registered by this job, before we delete the summary
err = s.deleteJobFromPlugins(index, txn, job)
if err != nil {
return fmt.Errorf("deleting job from plugin: %v", err)
}

// Delete the job summary
if _, err = txn.DeleteAll("job_summary", "id", namespace, jobID); err != nil {
return fmt.Errorf("deleting job summary failed: %v", err)
Expand All @@ -1528,12 +1550,6 @@ func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn
return fmt.Errorf("index update failed: %v", err)
}

// Cleanup plugins registered by this job
err = s.deleteJobFromPlugin(index, txn, job)
if err != nil {
return fmt.Errorf("deleting job from plugin: %v", err)
}

return nil
}

Expand Down Expand Up @@ -2222,10 +2238,10 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs
vol.ControllerRequired = plug.ControllerRequired
vol.ControllersHealthy = plug.ControllersHealthy
vol.NodesHealthy = plug.NodesHealthy
// This number is incorrect! The expected number of node plugins is actually this +
// the number of blocked evaluations for the jobs controlling these plugins
vol.ControllersExpected = len(plug.Controllers)
vol.NodesExpected = len(plug.Nodes)

// This value may be stale, but stale is ok
vol.ControllersExpected = plug.ControllersExpected
vol.NodesExpected = plug.NodesExpected

vol.Schedulable = vol.NodesHealthy > 0
if vol.ControllerRequired {
Expand Down Expand Up @@ -2327,7 +2343,7 @@ func (s *StateStore) CSIPluginByID(ws memdb.WatchSet, id string) (*structs.CSIPl
return plug, nil
}

// CSIPluginDenormalize returns a CSIPlugin with allocation details
// CSIPluginDenormalize returns a CSIPlugin with allocation details. Always called on a copy of the plugin.
func (s *StateStore) CSIPluginDenormalize(ws memdb.WatchSet, plug *structs.CSIPlugin) (*structs.CSIPlugin, error) {
if plug == nil {
return nil, nil
Expand Down Expand Up @@ -4498,6 +4514,70 @@ func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, tx
return nil
}

// updateJobCSIPlugins runs on job update indexes the job in the plugin
langmartin marked this conversation as resolved.
Show resolved Hide resolved
func (s *StateStore) updateJobCSIPlugins(index uint64, job, prev *structs.Job, txn *memdb.Txn) error {
ws := memdb.NewWatchSet()
plugIns := make(map[string]*structs.CSIPlugin)

loop := func(job *structs.Job, delete bool) error {
for _, tg := range job.TaskGroups {
for _, t := range tg.Tasks {
if t.CSIPluginConfig == nil {
continue
}

plugIn, ok := plugIns[t.CSIPluginConfig.ID]
if !ok {
p, err := s.CSIPluginByID(ws, t.CSIPluginConfig.ID)
if err != nil {
return fmt.Errorf("%v", err)
langmartin marked this conversation as resolved.
Show resolved Hide resolved
}
if p == nil {
plugIn = structs.NewCSIPlugin(t.CSIPluginConfig.ID, index)
} else {
plugIn = p.Copy()
plugIn.ModifyIndex = index
}
plugIns[plugIn.ID] = plugIn
}

if delete {
plugIn.DeleteJob(job, nil)
} else {
plugIn.AddJob(job, nil)
}
}
}

return nil
}

if prev != nil {
err := loop(prev, true)
if err != nil {
return fmt.Errorf("%v", err)
}
}

err := loop(job, false)
if err != nil {
return fmt.Errorf("%v", err)
}

for _, plugIn := range plugIns {
err = txn.Insert("csi_plugins", plugIn)
if err != nil {
return fmt.Errorf("csi_plugins insert error: %v", err)
}
}

if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

return nil
}

// updateDeploymentWithAlloc is used to update the deployment state associated
// with the given allocation. The passed alloc may be updated if the deployment
// status has changed to capture the modify index at which it has changed.
Expand Down Expand Up @@ -4709,6 +4789,8 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
if summaryChanged {
jobSummary.ModifyIndex = index

s.updatePluginWithJobSummary(index, jobSummary, alloc, txn)

// Update the indexes table for job summary
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
Expand Down Expand Up @@ -4745,6 +4827,7 @@ func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocati
return nil
}
plug = plug.Copy()

err = plug.DeleteAlloc(alloc.ID, alloc.NodeID)
if err != nil {
return err
Expand All @@ -4759,6 +4842,41 @@ func (s *StateStore) updatePluginWithAlloc(index uint64, alloc *structs.Allocati
return nil
}

// updatePluginWithJobSummary updates the CSI plugins for a job when the
// job summary is updated by an alloc
func (s *StateStore) updatePluginWithJobSummary(index uint64, summary *structs.JobSummary, alloc *structs.Allocation,
txn *memdb.Txn) error {

ws := memdb.NewWatchSet()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
return nil
}

for _, t := range tg.Tasks {
if t.CSIPluginConfig != nil {
pluginID := t.CSIPluginConfig.ID
plug, err := s.CSIPluginByID(ws, pluginID)
if err != nil {
return err
}
if plug == nil {
plug = structs.NewCSIPlugin(pluginID, index)
langmartin marked this conversation as resolved.
Show resolved Hide resolved
} else {
plug = plug.Copy()
}

plug.UpdateExpectedWithJob(alloc.Job, summary, alloc.ServerTerminalStatus())
err = updateOrGCPlugin(index, txn, plug)
if err != nil {
return err
}
}
}

return nil
}

// UpsertACLPolicies is used to create or update a set of ACL policies
func (s *StateStore) UpsertACLPolicies(index uint64, policies []*structs.ACLPolicy) error {
txn := s.db.Txn(true)
Expand Down
Loading