Skip to content

Commit

Permalink
util/mon: pass reserved account by reference
Browse files Browse the repository at this point in the history
This commit makes it so that we now pass the `reserved` memory account
when starting up memory monitors by reference. Previously, due to
passing by value, when the monitor is stopped, the copy of the value
would get used so that the actual "reserved" memory account would get
out of sync with its user. This is now fixed. However, this bug doesn't
really have any production impact since we use this "reserved" feature
in a handful of places and these "reserved" memory accounts are not
reused between different usages.

Additionally, this commit renames `MakeStandaloneBudget` to
`NewStandaloneBudget` (since it now returns a reference) and adds
a separate "start" method when the caller doesn't want to pre-reserve
anything when starting up a monitor.

Release note: None
  • Loading branch information
yuzefovich committed Jul 6, 2022
1 parent f6d0d84 commit 9661219
Show file tree
Hide file tree
Showing 57 changed files with 182 additions and 141 deletions.
2 changes: 1 addition & 1 deletion pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func TempStorageConfigFromEnv(
maxSizeBytes/10, /* noteworthy */
st,
)
monitor.Start(ctx, nil /* pool */, mon.MakeStandaloneBudget(maxSizeBytes))
monitor.Start(ctx, nil /* pool */, mon.NewStandaloneBudget(maxSizeBytes))
return TempStorageConfig{
InMemory: inMem,
Mon: monitor,
Expand Down
2 changes: 1 addition & 1 deletion pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func DefaultTestTempStorageConfigWithSize(
maxSizeBytes/10, /* noteworthy */
st,
)
monitor.Start(context.Background(), nil /* pool */, mon.MakeStandaloneBudget(maxSizeBytes))
monitor.Start(context.Background(), nil /* pool */, mon.NewStandaloneBudget(maxSizeBytes))
return TempStorageConfig{
InMemory: true,
Mon: monitor,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8027,7 +8027,7 @@ func TestReadBackupManifestMemoryMonitoring(t *testing.T) {
require.NoError(t, err)

m := mon.NewMonitor("test-monitor", mon.MemoryResource, nil, nil, 0, 0, st)
m.Start(ctx, nil, mon.MakeStandaloneBudget(128<<20))
m.Start(ctx, nil, mon.NewStandaloneBudget(128<<20))
mem := m.MakeBoundAccount()
encOpts := &jobspb.BackupEncryptionOptions{
Mode: jobspb.EncryptionMode_Passphrase,
Expand Down Expand Up @@ -9040,7 +9040,7 @@ func TestBackupMemMonitorSSTSinkQueueSize(t *testing.T) {
)
ctx := context.Background()
byteLimit := 14 << 20 // 14 MiB
memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(int64(byteLimit)))
memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(int64(byteLimit)))
defer memoryMonitor.Stop(ctx)
params := base.TestClusterArgs{}
knobs := base.TestingKnobs{
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
}
limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.flowCtx.Cfg.Settings.SV)
kvFeedMemMon := mon.NewMonitorInheritWithLimit("kvFeed", limit, pool)
kvFeedMemMon.Start(ctx, pool, mon.BoundAccount{})
kvFeedMemMon.StartNoReserved(ctx, pool)
ca.kvFeedMemMon = kvFeedMemMon

// The job registry has a set of metrics used to monitor the various jobs it
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6443,7 +6443,7 @@ func startMonitorWithBudget(budget int64) *mon.BytesMonitor {
nil, nil,
128 /* small allocation increment */, 100,
cluster.MakeTestingClusterSettings())
mm.Start(context.Background(), nil, mon.MakeStandaloneBudget(budget))
mm.Start(context.Background(), nil, mon.NewStandaloneBudget(budget))
return mm
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func getBoundAccountWithBudget(budget int64) (account mon.BoundAccount, cleanup
nil, nil,
128 /* small allocation increment */, 100,
cluster.MakeTestingClusterSettings())
mm.Start(context.Background(), nil, mon.MakeStandaloneBudget(budget))
mm.Start(context.Background(), nil, mon.NewStandaloneBudget(budget))
return mm.MakeBoundAccount(), func() { mm.Stop(context.Background()) }
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/bulk/kv_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ func TestKvBuf(t *testing.T) {
src, totalSize := makeTestData(50000)

ctx := context.Background()
none := mon.NewMonitorWithLimit("none", mon.MemoryResource, 0, nil, nil, 0, 0, nil).MakeBoundAccount()
noneMonitor := mon.NewMonitorWithLimit("none", mon.MemoryResource, 0, nil, nil, 0, 0, nil)
noneMonitor.StartNoReserved(ctx, nil /* pool */)
none := noneMonitor.MakeBoundAccount()
lots := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil).MakeBoundAccount()

// Write everything to our buf.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvstreamer/results_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestInOrderResultsBuffer(t *testing.T) {
math.MaxInt64, /* noteworthy */
st,
)
diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64))
diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64))
defer diskMonitor.Stop(ctx)

budget := newBudget(nil /* acc */, math.MaxInt /* limitBytes */)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvstreamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestStreamerBudgetErrorInEnqueue(t *testing.T) {
math.MaxInt64, /* noteworthy */
cluster.MakeTestingClusterSettings(),
)
rootMemMonitor.Start(ctx, nil /* pool */, mon.MakeStandaloneBudget(rootPoolSize))
rootMemMonitor.Start(ctx, nil /* pool */, mon.NewStandaloneBudget(rootPoolSize))
defer rootMemMonitor.Stop(ctx)

acc := rootMemMonitor.MakeBoundAccount()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/rangefeed/db_adapter_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func startMonitorWithBudget(budget int64) *mon.BytesMonitor {
nil, nil,
128 /* small allocation increment */, 100,
cluster.MakeTestingClusterSettings())
mm.Start(context.Background(), nil, mon.MakeStandaloneBudget(budget))
mm.Start(context.Background(), nil, mon.NewStandaloneBudget(budget))
return mm
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rangefeed/budget.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,14 @@ func NewBudgetFactory(ctx context.Context, config BudgetFactoryConfig) *BudgetFa
systemRangeFeedBudget, config.rootMon)
systemRangeMonitor.SetMetrics(metrics.SystemBytesCount, nil /* maxHist */)
systemRangeMonitor.Start(ctx, config.rootMon,
mon.MakeStandaloneBudget(systemRangeFeedBudget))
mon.NewStandaloneBudget(systemRangeFeedBudget))

rangeFeedPoolMonitor := mon.NewMonitorInheritWithLimit(
"rangefeed-monitor",
config.totalRangeReedBudget,
config.rootMon)
rangeFeedPoolMonitor.SetMetrics(metrics.SharedBytesCount, nil /* maxHist */)
rangeFeedPoolMonitor.Start(ctx, config.rootMon, mon.BoundAccount{})
rangeFeedPoolMonitor.StartNoReserved(ctx, config.rootMon)

return &BudgetFactory{
limit: config.provisionalFeedLimit,
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/rangefeed/budget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestFeedBudget(t *testing.T) {
*FeedBudget, *mon.BytesMonitor, *mon.BoundAccount,
) {
m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
m.Start(context.Background(), nil, mon.MakeStandaloneBudget(poolSize))
m.Start(context.Background(), nil, mon.NewStandaloneBudget(poolSize))
b := m.MakeBoundAccount()

s := cluster.MakeTestingClusterSettings()
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestBudgetFactory(t *testing.T) {
s := cluster.MakeTestingClusterSettings()

rootMon := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, s)
rootMon.Start(context.Background(), nil, mon.MakeStandaloneBudget(10000000))
rootMon.Start(context.Background(), nil, mon.NewStandaloneBudget(10000000))
bf := NewBudgetFactory(context.Background(),
CreateBudgetFactoryConfig(rootMon, 10000, time.Second*5, budgetLowThresholdFn(10000), &s.SV))

Expand All @@ -214,7 +214,7 @@ func TestDisableBudget(t *testing.T) {
s := cluster.MakeTestingClusterSettings()

rootMon := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, s)
rootMon.Start(context.Background(), nil, mon.MakeStandaloneBudget(10000000))
rootMon.Start(context.Background(), nil, mon.NewStandaloneBudget(10000000))
bf := NewBudgetFactory(context.Background(),
CreateBudgetFactoryConfig(rootMon, 10000, time.Second*5, func(_ int64) int64 {
return 0
Expand All @@ -228,7 +228,7 @@ func TestDisableBudgetOnTheFly(t *testing.T) {
s := cluster.MakeTestingClusterSettings()

m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
m.Start(context.Background(), nil, mon.MakeStandaloneBudget(100000))
m.Start(context.Background(), nil, mon.NewStandaloneBudget(100000))
bf := NewBudgetFactory(context.Background(),
CreateBudgetFactoryConfig(
m,
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestDisableBudgetOnTheFly(t *testing.T) {
func TestConfigFactory(t *testing.T) {
s := cluster.MakeTestingClusterSettings()
rootMon := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
rootMon.Start(context.Background(), nil, mon.MakeStandaloneBudget(10000000))
rootMon.Start(context.Background(), nil, mon.NewStandaloneBudget(10000000))

// Check provisionalFeedLimit is computed.
config := CreateBudgetFactoryConfig(rootMon, 100000, time.Second*5, budgetLowThresholdFn(10000),
Expand All @@ -285,7 +285,7 @@ func TestConfigFactory(t *testing.T) {
func TestBudgetLimits(t *testing.T) {
s := cluster.MakeTestingClusterSettings()
rootMon := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
rootMon.Start(context.Background(), nil, mon.MakeStandaloneBudget(10000000))
rootMon.Start(context.Background(), nil, mon.NewStandaloneBudget(10000000))

provisionalSize := int64(10000)
adjustedSize := int64(1000)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) {

s := cluster.MakeTestingClusterSettings()
m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
m.Start(context.Background(), nil, mon.MakeStandaloneBudget(math.MaxInt64))
m.Start(context.Background(), nil, mon.NewStandaloneBudget(math.MaxInt64))
//budgetEnabled := int32(1)
b := m.MakeBoundAccount()
fb := NewFeedBudget(&b, 0, &s.SV)
Expand Down Expand Up @@ -1228,7 +1228,7 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) {
func newTestBudget(limit int64) *FeedBudget {
s := cluster.MakeTestingClusterSettings()
m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil)
m.Start(context.Background(), nil, mon.MakeStandaloneBudget(limit))
m.Start(context.Background(), nil, mon.NewStandaloneBudget(limit))
b := m.MakeBoundAccount()
fb := NewFeedBudget(&b, 0, &s.SV)
return fb
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/ts_maintenance_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func TestTimeSeriesMaintenanceQueueServer(t *testing.T) {
math.MaxInt64, /* noteworthy */
cluster.MakeTestingClusterSettings(),
)
memMon.Start(context.Background(), nil /* pool */, mon.MakeStandaloneBudget(math.MaxInt64))
memMon.Start(context.Background(), nil /* pool */, mon.NewStandaloneBudget(math.MaxInt64))
defer memMon.Stop(context.Background())
memContext := ts.MakeQueryMemoryContext(
memMon,
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
})
kvMemoryMonitor := mon.NewMonitorInheritWithLimit(
"kv-mem", 0 /* limit */, sqlMonitorAndMetrics.rootSQLMemoryMonitor)
kvMemoryMonitor.Start(ctx, sqlMonitorAndMetrics.rootSQLMemoryMonitor, mon.BoundAccount{})
kvMemoryMonitor.StartNoReserved(ctx, sqlMonitorAndMetrics.rootSQLMemoryMonitor)
rangeReedBudgetFactory := serverrangefeed.NewBudgetFactory(
ctx,
serverrangefeed.CreateBudgetFactoryConfig(
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func newRootSQLMemoryMonitor(opts monitorAndMetricsOptions) monitorAndMetrics {
// serves as a parent for a memory monitor that accounts for memory used in
// the KV layer at the same node.
rootSQLMemoryMonitor.Start(
context.Background(), nil, mon.MakeStandaloneBudget(opts.memoryPoolSize))
context.Background(), nil, mon.NewStandaloneBudget(opts.memoryPoolSize))
return monitorAndMetrics{
rootSQLMemoryMonitor: rootSQLMemoryMonitor,
rootSQLMetrics: rootSQLMetrics,
Expand Down Expand Up @@ -496,15 +496,15 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
bulkMetrics := bulk.MakeBulkMetrics(cfg.HistogramWindowInterval())
cfg.registry.AddMetricStruct(bulkMetrics)
bulkMemoryMonitor.SetMetrics(bulkMetrics.CurBytesCount, bulkMetrics.MaxBytesHist)
bulkMemoryMonitor.Start(context.Background(), rootSQLMemoryMonitor, mon.BoundAccount{})
bulkMemoryMonitor.StartNoReserved(context.Background(), rootSQLMemoryMonitor)

backfillMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backfill-mon")
backupMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backup-mon")

serverCacheMemoryMonitor := mon.NewMonitorInheritWithLimit(
"server-cache-mon", 0 /* limit */, rootSQLMemoryMonitor,
)
serverCacheMemoryMonitor.Start(context.Background(), rootSQLMemoryMonitor, mon.BoundAccount{})
serverCacheMemoryMonitor.StartNoReserved(context.Background(), rootSQLMemoryMonitor)

// Set up the DistSQL temp engine.

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/catalog/descs/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ func TestKVDescriptorsProperlyUsesMemoryMonitoring(t *testing.T) {
nil, nil, -1, 0, cluster.MakeTestingClusterSettings())

// Start the monitor with unlimited budget.
monitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64))
monitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64))

// Create a `Collection` with monitor hooked up.
col := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).CollectionFactory.
Expand All @@ -650,7 +650,7 @@ func TestKVDescriptorsProperlyUsesMemoryMonitoring(t *testing.T) {

// Restart the monitor to a smaller budget (in fact, let's be bold by setting it to be only one byte below
// what has been allocated in the previous round).
monitor.Start(ctx, nil, mon.MakeStandaloneBudget(allocatedMemoryInBytes-1))
monitor.Start(ctx, nil, mon.NewStandaloneBudget(allocatedMemoryInBytes-1))
require.Equal(t, int64(0), monitor.AllocBytes())

// Repeat the process again and assert this time memory allocation will err out.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/closed_session_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewClosedSessionCache(

c.mu.acc = monitor.MakeBoundAccount()
c.mon = monitor
c.mon.Start(context.Background(), parentMon, mon.BoundAccount{})
c.mon.StartNoReserved(context.Background(), parentMon)
return c
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/colexecargs/monitor_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (r *MonitorRegistry) CreateMemAccountForSpillStrategyWithLimit(
}
monitorName := r.getMemMonitorName(opName, processorID, "limited" /* suffix */)
bufferingOpMemMonitor := mon.NewMonitorInheritWithLimit(monitorName, limit, flowCtx.EvalCtx.Mon)
bufferingOpMemMonitor.Start(ctx, flowCtx.EvalCtx.Mon, mon.BoundAccount{})
bufferingOpMemMonitor.StartNoReserved(ctx, flowCtx.EvalCtx.Mon)
r.monitors = append(r.monitors, bufferingOpMemMonitor)
bufferingMemAccount := bufferingOpMemMonitor.MakeBoundAccount()
r.accounts = append(r.accounts, &bufferingMemAccount)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/draining_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestDrainingAfterRemoteError(t *testing.T) {
math.MaxInt64,
cluster.MakeTestingClusterSettings(),
)
diskMonitor.Start(ctx, nil /* pool */, mon.MakeStandaloneBudget(1))
diskMonitor.Start(ctx, nil /* pool */, mon.NewStandaloneBudget(1))

// Set up a two node cluster.
tempStorageConfig := base.TempStorageConfig{InMemory: true, Mon: diskMonitor}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/explain_vec.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func convertToVecTree(
math.MaxInt64, /* noteworthy */
flowCtx.Cfg.Settings,
)
memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64))
memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64))
defer memoryMonitor.Stop(ctx)
defer creator.cleanup(ctx)
opChains, _, err = creator.setupFlow(ctx, flowCtx, flow.Processors, localProcessors, fuseOpt)
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/colflow/vectorized_flow_space_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ func TestVectorizeInternalMemorySpaceError(t *testing.T) {
}
memMon := mon.NewMonitor("MemoryMonitor", mon.MemoryResource, nil, nil, 0, math.MaxInt64, st)
if success {
memMon.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64))
memMon.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64))
} else {
memMon.Start(ctx, nil, mon.MakeStandaloneBudget(1))
memMon.Start(ctx, nil, mon.NewStandaloneBudget(1))
}
defer memMon.Stop(ctx)
acc := memMon.MakeBoundAccount()
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestVectorizeAllocatorSpaceError(t *testing.T) {
memMon := mon.NewMonitor("MemoryMonitor", mon.MemoryResource, nil, nil, 0, math.MaxInt64, st)
flowCtx.Cfg.TestingKnobs = execinfra.TestingKnobs{}
if expectNoMemoryError {
memMon.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64))
memMon.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64))
if !success {
// These are the cases that we expect in-memory operators to hit a
// memory error. To enable testing this case, force disk spills. We
Expand All @@ -213,7 +213,7 @@ func TestVectorizeAllocatorSpaceError(t *testing.T) {
flowCtx.Cfg.TestingKnobs.ForceDiskSpill = true
}
} else {
memMon.Start(ctx, nil, mon.MakeStandaloneBudget(1))
memMon.Start(ctx, nil, mon.NewStandaloneBudget(1))
flowCtx.Cfg.TestingKnobs.ForceDiskSpill = true
}
defer memMon.Stop(ctx)
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ func (h ConnectionHandler) GetQueryCancelKey() pgwirecancel.BackendKeyData {
// If not nil, reserved represents memory reserved for the connection. The
// connExecutor takes ownership of this memory.
func (s *Server) ServeConn(
ctx context.Context, h ConnectionHandler, reserved mon.BoundAccount, cancel context.CancelFunc,
ctx context.Context, h ConnectionHandler, reserved *mon.BoundAccount, cancel context.CancelFunc,
) error {
defer func() {
r := recover()
Expand Down Expand Up @@ -989,7 +989,7 @@ func (s *Server) newConnExecutorWithTxn(

// The new transaction stuff below requires active monitors and traces, so
// we need to activate the executor now.
ex.activate(ctx, parentMon, mon.BoundAccount{})
ex.activate(ctx, parentMon, &mon.BoundAccount{})

// Perform some surgery on the executor - replace its state machine and
// initialize the state.
Expand Down Expand Up @@ -1724,15 +1724,15 @@ func (ex *connExecutor) sessionData() *sessiondata.SessionData {
// reserved: Memory reserved for the connection. The connExecutor takes
// ownership of this memory.
func (ex *connExecutor) activate(
ctx context.Context, parentMon *mon.BytesMonitor, reserved mon.BoundAccount,
ctx context.Context, parentMon *mon.BytesMonitor, reserved *mon.BoundAccount,
) {
// Note: we pass `reserved` to sessionRootMon where it causes it to act as a
// buffer. This is not done for sessionMon nor state.mon: these monitors don't
// start with any buffer, so they'll need to ask their "parent" for memory as
// soon as the first allocation. This is acceptable because the session is
// single threaded, and the point of buffering is just to avoid contention.
ex.mon.Start(ctx, parentMon, reserved)
ex.sessionMon.Start(ctx, ex.mon, mon.BoundAccount{})
ex.sessionMon.StartNoReserved(ctx, ex.mon)

// Enable the trace if configured.
if traceSessionEventLogEnabled.Get(&ex.server.cfg.Settings.SV) {
Expand Down Expand Up @@ -1787,7 +1787,7 @@ func (ex *connExecutor) activate(
func (ex *connExecutor) run(
ctx context.Context,
parentMon *mon.BytesMonitor,
reserved mon.BoundAccount,
reserved *mon.BoundAccount,
onCancel context.CancelFunc,
) (err error) {
if !ex.activated {
Expand Down Expand Up @@ -2397,7 +2397,7 @@ func (ex *connExecutor) execCopyIn(
// HACK: We're reaching inside ex.state and starting the monitor. Normally
// that's driven by the state machine, but we're bypassing the state machine
// here.
ex.state.mon.Start(ctx, ex.sessionMon, mon.BoundAccount{} /* reserved */)
ex.state.mon.StartNoReserved(ctx, ex.sessionMon)
monToStop = ex.state.mon
}
txnOpt.resetPlanner = func(ctx context.Context, p *planner, txn *kv.Txn, txnTS time.Time, stmtTS time.Time) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/conn_executor_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func startConnExecutor(
// routine, we're going to push commands into the StmtBuf and, from time to
// time, collect and check their results.
go func() {
finished <- s.ServeConn(ctx, conn, mon.BoundAccount{}, nil /* cancel */)
finished <- s.ServeConn(ctx, conn, &mon.BoundAccount{}, nil /* cancel */)
}()
return buf, syncResults, finished, stopper, resultChannel, nil
}
Expand Down Expand Up @@ -402,7 +402,7 @@ CREATE TEMPORARY TABLE foo();

done := make(chan error)
go func() {
done <- srv.ServeConn(ctx, connHandler, mon.BoundAccount{}, nil /* cancel */)
done <- srv.ServeConn(ctx, connHandler, &mon.BoundAccount{}, nil /* cancel */)
}()
results := <-flushed
require.Len(t, results, 6) // We expect results for 5 statements + sync.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/delete_preserving_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ func fetchIndex(
var fetcher row.Fetcher
var alloc tree.DatumAlloc

mm := mon.MakeStandaloneBudget(1 << 30)
mm := mon.NewStandaloneBudget(1 << 30)
idx, err := table.FindIndexWithName(indexName)
require.NoError(t, err)
colIdxMap := catalog.ColumnIDToOrdinalMap(table.PublicColumns())
Expand Down
Loading

0 comments on commit 9661219

Please sign in to comment.