Skip to content

Commit

Permalink
handle state/restore errors
Browse files Browse the repository at this point in the history
  • Loading branch information
gulducat committed Dec 3, 2024
1 parent 2f8231e commit 8043871
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 19 deletions.
5 changes: 4 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,10 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
cfg.HostVolumePluginDir,
cfg.AllocMountsDir)
if err != nil {
return nil, err // db TODO(1.10.0): don't fail the whole client if state restore fails?
// NewHostVolumeManager will only err if it fails to read state store,
// or if one or more required plugins do not exist, so halt the client
// because something needs to be fixed by a cluster admin.
return nil, err
}

// Set up the service registration wrapper using the Consul and Nomad
Expand Down
5 changes: 3 additions & 2 deletions client/host_volume_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package client
import (
"path/filepath"
"testing"
"time"

"github.com/hashicorp/nomad/ci"
hvm "github.com/hashicorp/nomad/client/hostvolumemanager"
Expand All @@ -28,7 +29,7 @@ func TestHostVolume(t *testing.T) {
var err error
expectDir := filepath.Join(tmp, "test-vol-id")
client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t),
client.stateDB, "/no/ext/plugins", tmp)
client.stateDB, time.Second, "/no/ext/plugins", tmp)
must.NoError(t, err)

t.Run("happy", func(t *testing.T) {
Expand Down Expand Up @@ -92,7 +93,7 @@ func TestHostVolume(t *testing.T) {
t.Run("error from plugin", func(t *testing.T) {
// "mkdir" plugin can't create a directory within a file
client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t),
client.stateDB, "/no/ext/plugins", "host_volume_endpoint_test.go")
client.stateDB, time.Second, "/no/ext/plugins", "host_volume_endpoint_test.go")
must.NoError(t, err)

req := &cstructs.ClientHostVolumeCreateRequest{
Expand Down
51 changes: 35 additions & 16 deletions client/hostvolumemanager/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
"context"
"errors"
"path/filepath"
"sync"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
)

var (
Expand Down Expand Up @@ -58,23 +59,28 @@ func (hvm *HostVolumeManager) restoreState(state HostVolumeStateManager, timeout
}

// re-"create" the volumes - plugins have the best knowledge of their
// side effects, and they should be idempotent.
var wg sync.WaitGroup
// side effects, and they must be idempotent.
group := multierror.Group{}
for _, vol := range vols {
wg.Add(1)
func() {
defer wg.Done()
group.Go(func() error { // db TODO(1.10.0): document that plugins must be safe to run concurrently
// missing plugins with associated volumes in state are considered
// client-stopping errors. they need to be fixed by cluster admins.
plug, err := hvm.getPlugin(vol.CreateReq.PluginID)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
// note: this will rewrite client state that we just restored
if _, err := hvm.Create(ctx, vol.CreateReq); err != nil {
hvm.log.Error("failed to restore", "volume_id", vol.ID, "error", err)
// db TODO: multierror w/ mutex?
if _, err := plug.Create(ctx, vol.CreateReq); err != nil {
// plugin execution errors are only logged
hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err)
}
}()
return nil
})
}
wg.Wait()
return nil
mErr := group.Wait()
return helper.FlattenMultierror(mErr.ErrorOrNil())
}

func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) {
Expand Down Expand Up @@ -110,8 +116,21 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
CreateReq: req,
}
if err := hvm.stateMgr.PutDynamicHostVolume(volState); err != nil {
hvm.log.Error("failed to save volume in state", "volume_id", req.ID, "error", err)
// db TODO: bail or nah?
// if we fail to write to state, delete the volume so it isn't left
// lying around without Nomad knowing about it.
hvm.log.Error("failed to save volume in state, so deleting", "volume_id", req.ID, "error", err)
delErr := plug.Delete(ctx, &cstructs.ClientHostVolumeDeleteRequest{
ID: req.ID,
PluginID: req.PluginID,
NodeID: req.NodeID,
HostPath: hvm.sharedMountDir,
Parameters: req.Parameters,
})
if delErr != nil {
hvm.log.Warn("error deleting volume after state store failure", "volume_id", req.ID, "error", delErr)
err = multierror.Append(err, delErr)
}
return nil, helper.FlattenMultierror(err)
}

// db TODO(1.10.0): now we need to add the volume to the node fingerprint!
Expand Down Expand Up @@ -141,7 +160,7 @@ func (hvm *HostVolumeManager) Delete(ctx context.Context,

if err := hvm.stateMgr.DeleteDynamicHostVolume(req.ID); err != nil {
hvm.log.Error("failed to delete volume in state", "volume_id", req.ID, "error", err)
// db TODO: bail or nah?
return nil, err // bail so a user may retry
}

return resp, nil
Expand Down
2 changes: 2 additions & 0 deletions client/hostvolumemanager/host_volumes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/shoenig/test/must"
)

// db TODO(1.10.0): improve hostvolumemanager tests.

func TestNewHostVolumeManager_restoreState(t *testing.T) {
log := testlog.HCLogger(t)
vol := &HostVolumeState{
Expand Down

0 comments on commit 8043871

Please sign in to comment.