diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index 79c359a65c5c..0b285e389d5a 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -72,6 +72,15 @@ var AddSSTableRequireAtRequestTimestamp = settings.RegisterBoolSetting( false, ) +// addSSTableCapacityRemainingLimit is the fraction of remaining store capacity +// under which addsstable requests are rejected. +var addSSTableCapacityRemainingLimit = settings.RegisterFloatSetting( + settings.SystemOnly, + "kv.bulk_io_write.min_capacity_remaining_fraction", + "remaining store capacity fraction below which an addsstable request is rejected", + 0.05, +) + var forceRewrite = util.ConstantWithMetamorphicTestBool("addsst-rewrite-forced", false) // EvalAddSSTable evaluates an AddSSTable command. For details, see doc comment @@ -92,6 +101,22 @@ func EvalAddSSTable( defer span.Finish() log.Eventf(ctx, "evaluating AddSSTable [%s,%s)", start.Key, end.Key) + if min := addSSTableCapacityRemainingLimit.Get(&cArgs.EvalCtx.ClusterSettings().SV); min > 0 { + cap, err := cArgs.EvalCtx.GetEngineCapacity() + if err != nil { + return result.Result{}, err + } + if remaining := float64(cap.Available) / float64(cap.Capacity); remaining < min { + return result.Result{}, &roachpb.InsufficientSpaceError{ + StoreID: cArgs.EvalCtx.StoreID(), + Op: "ingest data", + Available: cap.Available, + Capacity: cap.Capacity, + Required: min, + } + } + } + // Reject AddSSTable requests not writing at the request timestamp if requested. if cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.MVCCAddSSTable) && AddSSTableRequireAtRequestTimestamp.Get(&cArgs.EvalCtx.ClusterSettings().SV) && diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 83b167247df3..d8536d9781a0 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -137,6 +137,10 @@ type EvalContext interface { GetResponseMemoryAccount() *mon.BoundAccount GetMaxBytes() int64 + + // GetEngineCapacity returns the store's underlying engine capacity; other + // StoreCapacity fields not related to engine capacity are not populated. + GetEngineCapacity() (roachpb.StoreCapacity, error) } // MockEvalCtx is a dummy implementation of EvalContext for testing purposes. @@ -280,3 +284,6 @@ func (m *mockEvalCtxImpl) GetMaxBytes() int64 { } return math.MaxInt64 } +func (m *mockEvalCtxImpl) GetEngineCapacity() (roachpb.StoreCapacity, error) { + return roachpb.StoreCapacity{Available: 1, Capacity: 1}, nil +} diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index a5327382d5da..d35bd5c91e93 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -2019,6 +2019,12 @@ func (r *Replica) GetResponseMemoryAccount() *mon.BoundAccount { return nil } +// GetEngineCapacity returns the store's underlying engine capacity; other +// StoreCapacity fields not related to engine capacity are not populated. +func (r *Replica) GetEngineCapacity() (roachpb.StoreCapacity, error) { + return r.store.Engine().Capacity() +} + func init() { tracing.RegisterTagRemapping("r", "range") } diff --git a/pkg/kv/kvserver/replica_eval_context_span.go b/pkg/kv/kvserver/replica_eval_context_span.go index 7e2ed525d598..21f767adafbd 100644 --- a/pkg/kv/kvserver/replica_eval_context_span.go +++ b/pkg/kv/kvserver/replica_eval_context_span.go @@ -262,3 +262,8 @@ func (rec *SpanSetReplicaEvalContext) GetResponseMemoryAccount() *mon.BoundAccou func (rec *SpanSetReplicaEvalContext) GetMaxBytes() int64 { return rec.i.GetMaxBytes() } + +// GetEngineCapacity implements the batcheval.EvalContext interface. +func (rec *SpanSetReplicaEvalContext) GetEngineCapacity() (roachpb.StoreCapacity, error) { + return rec.i.GetEngineCapacity() +} diff --git a/pkg/roachpb/errors.go b/pkg/roachpb/errors.go index c7626a7b9b73..a5312ed09203 100644 --- a/pkg/roachpb/errors.go +++ b/pkg/roachpb/errors.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/caller" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -1415,3 +1416,8 @@ func (e *RefreshFailedError) Type() ErrorDetailType { } var _ ErrorDetailInterface = &RefreshFailedError{} + +func (e *InsufficientSpaceError) Error() string { + return fmt.Sprintf("store %d has insufficient remaining capacity to %s (remaining: %s / %.1f%%, min required: %.1f%%)", + e.StoreID, e.Op, humanizeutil.IBytes(e.Available), float64(e.Available)/float64(e.Capacity)*100, e.Required*100) +} diff --git a/pkg/roachpb/errors.proto b/pkg/roachpb/errors.proto index c90ba5a0dcfb..617c1b7ea4b2 100644 --- a/pkg/roachpb/errors.proto +++ b/pkg/roachpb/errors.proto @@ -712,3 +712,23 @@ message Error { reserved 2; } + + +// InsufficientSpaceError is an error due to insufficient space remaining. +message InsufficientSpaceError { + // StoreID is the store that had insufficient space. + optional int64 store_id = 1 [(gogoproto.nullable) = false, + (gogoproto.customname) = "StoreID", (gogoproto.casttype) = "StoreID"]; + + // Op is the operaton that was unable to be performed. + optional string op = 2 [(gogoproto.nullable) = false]; + + // Available is remaining capacity. + optional int64 available = 3 [(gogoproto.nullable) = false]; + + // Capacity is total capacity. + optional int64 capacity = 4 [(gogoproto.nullable) = false]; + + // RequiredFraction is the required remaining capacity fraction. + optional double required = 5 [(gogoproto.nullable) = false]; +} \ No newline at end of file