Skip to content

Commit

Permalink
batcheval: reject addsstable if store capacity remaining too low
Browse files Browse the repository at this point in the history
```
➜  ./cockroach sql --insecure -e "SET CLUSTER SETTING kv.bulk_io_write.min_capacity_remaining_fraction = 0.75"
SET CLUSTER SETTING

➜ ./cockroach workload fixtures import tpcc --warehouses=10 --checks=false
I220325 23:56:17.451734 1 ccl/workloadccl/fixture.go:318  [-] 1  starting import of 9 tables
Error: importing fixture: importing table warehouse: pq: addsstable [/Table/106/1/0/0,/Table/106/1/9/0/NULL):
  insufficient remaining store capacity 648 GiB, or 69.9% to add sstable (min: 75.0%)

```

Release note (ops change): Bulk ingest operations like IMPORT, RESTORE or
CREATE INDEX will now fail if they try to write to a node that has less
than 5% storage capacity remaining, configurable via the setting kv.bulk_io_write.min_capacity_remaining_fraction.
  • Loading branch information
dt committed Mar 26, 2022
1 parent 7001882 commit 5a7a7bc
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 0 deletions.
25 changes: 25 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ var AddSSTableRequireAtRequestTimestamp = settings.RegisterBoolSetting(
false,
)

// addSSTableCapacityRemainingLimit is the fraction of remaining store capacity
// under which addsstable requests are rejected.
var addSSTableCapacityRemainingLimit = settings.RegisterFloatSetting(
settings.SystemOnly,
"kv.bulk_io_write.min_capacity_remaining_fraction",
"remaining store capacity fraction below which an addsstable request is rejected",
0.05,
)

var forceRewrite = util.ConstantWithMetamorphicTestBool("addsst-rewrite-forced", false)

// EvalAddSSTable evaluates an AddSSTable command. For details, see doc comment
Expand All @@ -92,6 +101,22 @@ func EvalAddSSTable(
defer span.Finish()
log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", start.Key, end.Key)

if min := addSSTableCapacityRemainingLimit.Get(&cArgs.EvalCtx.ClusterSettings().SV); min > 0 {
cap, err := cArgs.EvalCtx.GetEngineCapacity()
if err != nil {
return result.Result{}, err
}
if remaining := float64(cap.Available) / float64(cap.Capacity); remaining < min {
return result.Result{}, &roachpb.InsufficientSpaceError{
StoreID: cArgs.EvalCtx.StoreID(),
Op: "ingest data",
Available: cap.Available,
Capacity: cap.Capacity,
Required: min,
}
}
}

// Reject AddSSTable requests not writing at the request timestamp if requested.
if cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.MVCCAddSSTable) &&
AddSSTableRequireAtRequestTimestamp.Get(&cArgs.EvalCtx.ClusterSettings().SV) &&
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ type EvalContext interface {
GetResponseMemoryAccount() *mon.BoundAccount

GetMaxBytes() int64

// GetEngineCapacity returns the store's underlying engine capacity; other
// StoreCapacity fields not related to engine capacity are not populated.
GetEngineCapacity() (roachpb.StoreCapacity, error)
}

// MockEvalCtx is a dummy implementation of EvalContext for testing purposes.
Expand Down Expand Up @@ -280,3 +284,6 @@ func (m *mockEvalCtxImpl) GetMaxBytes() int64 {
}
return math.MaxInt64
}
func (m *mockEvalCtxImpl) GetEngineCapacity() (roachpb.StoreCapacity, error) {
return roachpb.StoreCapacity{Available: 1, Capacity: 1}, nil
}
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -2019,6 +2019,12 @@ func (r *Replica) GetResponseMemoryAccount() *mon.BoundAccount {
return nil
}

// GetEngineCapacity returns the store's underlying engine capacity; other
// StoreCapacity fields not related to engine capacity are not populated.
func (r *Replica) GetEngineCapacity() (roachpb.StoreCapacity, error) {
return r.store.Engine().Capacity()
}

func init() {
tracing.RegisterTagRemapping("r", "range")
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica_eval_context_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,8 @@ func (rec *SpanSetReplicaEvalContext) GetResponseMemoryAccount() *mon.BoundAccou
func (rec *SpanSetReplicaEvalContext) GetMaxBytes() int64 {
return rec.i.GetMaxBytes()
}

// GetEngineCapacity implements the batcheval.EvalContext interface.
func (rec *SpanSetReplicaEvalContext) GetEngineCapacity() (roachpb.StoreCapacity, error) {
return rec.i.GetEngineCapacity()
}
6 changes: 6 additions & 0 deletions pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/util/caller"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -1415,3 +1416,8 @@ func (e *RefreshFailedError) Type() ErrorDetailType {
}

var _ ErrorDetailInterface = &RefreshFailedError{}

func (e *InsufficientSpaceError) Error() string {
return fmt.Sprintf("store %d has insufficient remaining capacity to %s (remaining: %s / %.1f%%, min required: %.1f%%)",
e.StoreID, e.Op, humanizeutil.IBytes(e.Available), float64(e.Available)/float64(e.Capacity)*100, e.Required*100)
}
20 changes: 20 additions & 0 deletions pkg/roachpb/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -712,3 +712,23 @@ message Error {

reserved 2;
}


// InsufficientSpaceError is an error due to insufficient space remaining.
message InsufficientSpaceError {
// StoreID is the store that had insufficient space.
optional int64 store_id = 1 [(gogoproto.nullable) = false,
(gogoproto.customname) = "StoreID", (gogoproto.casttype) = "StoreID"];

// Op is the operaton that was unable to be performed.
optional string op = 2 [(gogoproto.nullable) = false];

// Available is remaining capacity.
optional int64 available = 3 [(gogoproto.nullable) = false];

// Capacity is total capacity.
optional int64 capacity = 4 [(gogoproto.nullable) = false];

// RequiredFraction is the required remaining capacity fraction.
optional double required = 5 [(gogoproto.nullable) = false];
}

0 comments on commit 5a7a7bc

Please sign in to comment.