Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csi: move volume claim release into volumewatcher #7794

Merged
merged 5 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyCSIVolumeDeregister(buf[1:], log.Index)
case structs.CSIVolumeClaimRequestType:
return n.applyCSIVolumeClaim(buf[1:], log.Index)
case structs.CSIVolumeClaimBatchRequestType:
return n.applyCSIVolumeBatchClaim(buf[1:], log.Index)
case structs.ScalingEventRegisterRequestType:
return n.applyUpsertScalingEvent(buf[1:], log.Index)
}
Expand Down Expand Up @@ -1156,33 +1158,35 @@ func (n *nomadFSM) applyCSIVolumeDeregister(buf []byte, index uint64) interface{
return nil
}

func (n *nomadFSM) applyCSIVolumeClaim(buf []byte, index uint64) interface{} {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note for review: it's worth looking at this file without the diff... the diff really tangled this up because the applyCSIVolumeBatchClaim is similar to the applyCSIVolumeClaim. This should be entirely an addition.

var req structs.CSIVolumeClaimRequest
if err := structs.Decode(buf, &req); err != nil {
func (n *nomadFSM) applyCSIVolumeBatchClaim(buf []byte, index uint64) interface{} {
var batch *structs.CSIVolumeClaimBatchRequest
if err := structs.Decode(buf, &batch); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_volume_claim"}, time.Now())
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_volume_batch_claim"}, time.Now())

ws := memdb.NewWatchSet()
alloc, err := n.state.AllocByID(ws, req.AllocationID)
if err != nil {
n.logger.Error("AllocByID failed", "error", err)
return err
}
if alloc == nil {
n.logger.Error("AllocByID failed to find alloc", "alloc_id", req.AllocationID)
for _, req := range batch.Claims {
err := n.state.CSIVolumeClaim(index, req.RequestNamespace(),
req.VolumeID, req.ToClaim())
if err != nil {
return err
n.logger.Error("CSIVolumeClaim for batch failed", "error", err)
return err // note: fails the remaining batch
}
}
return nil
}

return structs.ErrUnknownAllocationPrefix
func (n *nomadFSM) applyCSIVolumeClaim(buf []byte, index uint64) interface{} {
var req structs.CSIVolumeClaimRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_csi_volume_claim"}, time.Now())

if err := n.state.CSIVolumeClaim(index, req.RequestNamespace(), req.VolumeID, req.ToClaim()); err != nil {
n.logger.Error("CSIVolumeClaim failed", "error", err)
return err
}

return nil
}

Expand Down
11 changes: 8 additions & 3 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2068,9 +2068,14 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s
return err
}

err = volume.Claim(claim, alloc)
if err != nil {
return err
// in the case of a job deregistration, there will be no allocation ID
// for the claim but we still want to write an updated index to the volume
// so that volume reaping is triggered
if claim.AllocationID != "" {
err = volume.Claim(claim, alloc)
if err != nil {
return err
}
}

volume.ModifyIndex = index
Expand Down
4 changes: 4 additions & 0 deletions nomad/structs/csi.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ const (
CSIVolumeClaimRelease
)

type CSIVolumeClaimBatchRequest struct {
Claims []CSIVolumeClaimRequest
}

type CSIVolumeClaimRequest struct {
VolumeID string
AllocationID string
Expand Down
1 change: 1 addition & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ const (
CSIVolumeRegisterRequestType
CSIVolumeDeregisterRequestType
CSIVolumeClaimRequestType
CSIVolumeClaimBatchRequestType
ScalingEventRegisterRequestType
)

Expand Down
125 changes: 125 additions & 0 deletions nomad/volumewatcher/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package volumewatcher
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


import (
"context"
"time"

"github.com/hashicorp/nomad/nomad/structs"
)

// VolumeUpdateBatcher is used to batch the updates for volume claims
type VolumeUpdateBatcher struct {
// batch is the batching duration
batch time.Duration

// raft is used to actually commit the updates
raft VolumeRaftEndpoints

// workCh is used to pass evaluations to the daemon process
workCh chan *updateWrapper

// ctx is used to exit the daemon batcher
ctx context.Context
}

// NewVolumeUpdateBatcher returns an VolumeUpdateBatcher that uses the
// passed raft endpoints to create the updates to volume claims, and
// exits the batcher when the passed exit channel is closed.
func NewVolumeUpdateBatcher(batchDuration time.Duration, raft VolumeRaftEndpoints, ctx context.Context) *VolumeUpdateBatcher {
b := &VolumeUpdateBatcher{
batch: batchDuration,
raft: raft,
ctx: ctx,
workCh: make(chan *updateWrapper, 10),
}

go b.batcher()
return b
}

// CreateUpdate batches the volume claim update and returns a future
// that tracks the completion of the request.
func (b *VolumeUpdateBatcher) CreateUpdate(claims []structs.CSIVolumeClaimRequest) *BatchFuture {
wrapper := &updateWrapper{
claims: claims,
f: make(chan *BatchFuture, 1),
}

b.workCh <- wrapper
return <-wrapper.f
}

type updateWrapper struct {
claims []structs.CSIVolumeClaimRequest
f chan *BatchFuture
}

// batcher is the long lived batcher goroutine
func (b *VolumeUpdateBatcher) batcher() {
var timerCh <-chan time.Time
claims := make(map[string]structs.CSIVolumeClaimRequest)
future := NewBatchFuture()
for {
select {
case <-b.ctx.Done():
// note: we can't flush here because we're likely no
// longer the leader
return
case w := <-b.workCh:
if timerCh == nil {
timerCh = time.After(b.batch)
}

// de-dupe and store the claim update, and attach the future
for _, upd := range w.claims {
claims[upd.VolumeID+upd.RequestNamespace()] = upd
}
w.f <- future
case <-timerCh:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general for this pattern, I'd expect that we'd want to have a batch max size or timer, where either of those would apply the batch. I know the raft lib has some recent support for splitting large commits, but it seems like we're missing batch max from this (and deployments). I think we're good to go ahead, but should consider using an interface to allow this and deployments to use the same batcher code later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Spun out to #7838

// Capture the future and create a new one
f := future
future = NewBatchFuture()

// Create the batch request
req := structs.CSIVolumeClaimBatchRequest{}
for _, claim := range claims {
req.Claims = append(req.Claims, claim)
}

// Upsert the claims in a go routine
go f.Set(b.raft.UpsertVolumeClaims(&req))

// Reset the claims list and timer
claims = make(map[string]structs.CSIVolumeClaimRequest)
timerCh = nil
}
}
}

// BatchFuture is a future that can be used to retrieve the index for
// the update or any error in the update process
type BatchFuture struct {
index uint64
err error
waitCh chan struct{}
}

// NewBatchFuture returns a new BatchFuture
func NewBatchFuture() *BatchFuture {
return &BatchFuture{
waitCh: make(chan struct{}),
}
}

// Set sets the results of the future, unblocking any client.
func (f *BatchFuture) Set(index uint64, err error) {
f.index = index
f.err = err
close(f.waitCh)
}

// Results returns the creation index and any error.
func (f *BatchFuture) Results() (uint64, error) {
<-f.waitCh
return f.index, f.err
}
85 changes: 85 additions & 0 deletions nomad/volumewatcher/batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package volumewatcher

import (
"context"
"fmt"
"sync"
"testing"

"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

// TestVolumeWatch_Batcher tests the update batching logic
func TestVolumeWatch_Batcher(t *testing.T) {
t.Parallel()
require := require.New(t)

ctx, exitFn := context.WithCancel(context.Background())
defer exitFn()

srv := &MockBatchingRPCServer{}
srv.state = state.TestStateStore(t)
srv.volumeUpdateBatcher = NewVolumeUpdateBatcher(CrossVolumeUpdateBatchDuration, srv, ctx)

plugin := mock.CSIPlugin()
node := testNode(nil, plugin, srv.State())

// because we wait for the results to return from the batch for each
// Watcher.updateClaims, we can't test that we're batching except across
// multiple volume watchers. create 2 volumes and their watchers here.
alloc0 := mock.Alloc()
alloc0.ClientStatus = structs.AllocClientStatusComplete
vol0 := testVolume(nil, plugin, alloc0, node.ID)
w0 := &volumeWatcher{
v: vol0,
rpc: srv,
state: srv.State(),
updateClaims: srv.UpdateClaims,
logger: testlog.HCLogger(t),
}

alloc1 := mock.Alloc()
alloc1.ClientStatus = structs.AllocClientStatusComplete
vol1 := testVolume(nil, plugin, alloc1, node.ID)
w1 := &volumeWatcher{
v: vol1,
rpc: srv,
state: srv.State(),
updateClaims: srv.UpdateClaims,
logger: testlog.HCLogger(t),
}

srv.nextCSIControllerDetachError = fmt.Errorf("some controller plugin error")

var wg sync.WaitGroup
wg.Add(2)

go func() {
w0.volumeReapImpl(vol0)
wg.Done()
}()
go func() {
w1.volumeReapImpl(vol1)
wg.Done()
}()

wg.Wait()

require.Equal(structs.CSIVolumeClaimStateNodeDetached, vol0.PastClaims[alloc0.ID].State)
require.Equal(structs.CSIVolumeClaimStateNodeDetached, vol1.PastClaims[alloc1.ID].State)
require.Equal(2, srv.countCSINodeDetachVolume)
require.Equal(2, srv.countCSIControllerDetachVolume)
require.Equal(2, srv.countUpdateClaims)

// note: it's technically possible that the volumeReapImpl
// goroutines get de-scheduled and we don't write both updates in
// the same batch. but this seems really unlikely, so we're
// testing for both cases here so that if we start seeing a flake
// here in the future we have a clear cause for it.
require.GreaterOrEqual(srv.countUpsertVolumeClaims, 1)
require.Equal(1, srv.countUpsertVolumeClaims)
}
28 changes: 28 additions & 0 deletions nomad/volumewatcher/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package volumewatcher
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for review: these interfaces exist solely to give us handles for RPC mocks in the tests (so we don't have to set up clients with CSI plugins in unit tests).


import (
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
)

// VolumeRaftEndpoints exposes the volume watcher to a set of functions
// to apply data transforms via Raft.
type VolumeRaftEndpoints interface {

// UpsertVolumeClaims applys a batch of claims to raft
UpsertVolumeClaims(*structs.CSIVolumeClaimBatchRequest) (uint64, error)
}

// ClientRPC is a minimal interface of the Server, intended as an aid
// for testing logic surrounding server-to-server or server-to-client
// RPC calls and to avoid circular references between the nomad
// package and the volumewatcher
type ClientRPC interface {
ControllerDetachVolume(args *cstructs.ClientCSIControllerDetachVolumeRequest, reply *cstructs.ClientCSIControllerDetachVolumeResponse) error
NodeDetachVolume(args *cstructs.ClientCSINodeDetachVolumeRequest, reply *cstructs.ClientCSINodeDetachVolumeResponse) error
}

// claimUpdater is the function used to update claims on behalf of a volume
// (used to wrap batch updates so that we can test
// volumeWatcher methods synchronously without batching)
type updateClaimsFn func(claims []structs.CSIVolumeClaimRequest) (uint64, error)
Loading