-
Notifications
You must be signed in to change notification settings - Fork 2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
csi: don't pass volume claim releases thru GC eval (#8021)
Following the new volumewatcher in #7794 and performance improvements to it that landed afterwards, there's no particular reason we should be threading claim releases through the GC eval rather than writing an empty `CSIVolumeClaimRequest` with the mode set to `CSIVolumeClaimRelease`, just as the GC evaluation would do. Also, by batching up these raft messages, we can reduce the amount of raft writes by 1 and cross-server RPCs by 1 per volume we release claims on.
- Loading branch information
Showing
5 changed files
with
210 additions
and
110 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
package nomad | ||
|
||
import ( | ||
log "github.com/hashicorp/go-hclog" | ||
multierror "github.com/hashicorp/go-multierror" | ||
"github.com/hashicorp/nomad/nomad/structs" | ||
) | ||
|
||
// csiBatchRelease is a helper for any time we need to release a bunch | ||
// of volume claims at once. It de-duplicates the volumes and batches | ||
// the raft messages into manageable chunks. Intended for use by RPCs | ||
// that have already been forwarded to the leader. | ||
type csiBatchRelease struct { | ||
srv *Server | ||
logger log.Logger | ||
|
||
maxBatchSize int | ||
seen map[string]struct{} | ||
batches []*structs.CSIVolumeClaimBatchRequest | ||
} | ||
|
||
func newCSIBatchRelease(srv *Server, logger log.Logger, max int) *csiBatchRelease { | ||
return &csiBatchRelease{ | ||
srv: srv, | ||
logger: logger, | ||
maxBatchSize: max, | ||
seen: map[string]struct{}{}, | ||
batches: []*structs.CSIVolumeClaimBatchRequest{{}}, | ||
} | ||
} | ||
|
||
// add the volume ID + namespace to the deduplicated batches | ||
func (c *csiBatchRelease) add(vol, namespace string) { | ||
id := vol + "\x00" + namespace | ||
|
||
// ignore duplicates | ||
_, seen := c.seen[id] | ||
if seen { | ||
return | ||
} | ||
c.seen[id] = struct{}{} | ||
|
||
req := structs.CSIVolumeClaimRequest{ | ||
VolumeID: vol, | ||
Claim: structs.CSIVolumeClaimRelease, | ||
} | ||
req.Namespace = namespace | ||
req.Region = c.srv.config.Region | ||
|
||
for _, batch := range c.batches { | ||
// otherwise append to the first non-full batch | ||
if len(batch.Claims) < c.maxBatchSize { | ||
batch.Claims = append(batch.Claims, req) | ||
return | ||
} | ||
} | ||
// no non-full batch found, make a new one | ||
newBatch := &structs.CSIVolumeClaimBatchRequest{ | ||
Claims: []structs.CSIVolumeClaimRequest{req}} | ||
c.batches = append(c.batches, newBatch) | ||
} | ||
|
||
// apply flushes the batches to raft | ||
func (c *csiBatchRelease) apply() error { | ||
var result *multierror.Error | ||
for _, batch := range c.batches { | ||
if len(batch.Claims) > 0 { | ||
_, _, err := c.srv.raftApply(structs.CSIVolumeClaimBatchRequestType, batch) | ||
if err != nil { | ||
c.logger.Error("csi raft apply failed", "error", err, "method", "claim") | ||
result = multierror.Append(result, err) | ||
} | ||
} | ||
} | ||
return result.ErrorOrNil() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package nomad | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestCSI_Batcher(t *testing.T) { | ||
t.Parallel() | ||
srv, shutdown := TestServer(t, func(c *Config) { | ||
c.NumSchedulers = 0 // Prevent automatic dequeue | ||
}) | ||
defer shutdown() | ||
|
||
batcher := newCSIBatchRelease(srv, nil, 5) | ||
|
||
batcher.add("vol0", "global") | ||
batcher.add("vol", "0global") | ||
batcher.add("vol1", "global") | ||
batcher.add("vol1", "global") | ||
batcher.add("vol2", "global") | ||
batcher.add("vol2", "other") | ||
batcher.add("vol3", "global") | ||
batcher.add("vol4", "global") | ||
batcher.add("vol5", "global") | ||
batcher.add("vol6", "global") | ||
|
||
require.Len(t, batcher.batches, 2) | ||
require.Len(t, batcher.batches[0].Claims, 5, "first batch") | ||
require.Equal(t, batcher.batches[0].Claims[4].VolumeID, "vol2") | ||
require.Equal(t, batcher.batches[0].Claims[4].Namespace, "other") | ||
require.Len(t, batcher.batches[1].Claims, 4, "second batch") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.