Skip to content

Commit

Permalink
colbuilder: reuse the unlimited monitors for the same operators
Browse files Browse the repository at this point in the history
This commit reduces the number of monitors that we create for different
operators. Previously, for each unlimited memory account we would
instantiate a separate memory monitor, but that doesn't give us much
benefit and incurs extra allocations, so this commit reuses the same
unlimited monitor for as many accounts as needed for a component.

Additionally this commit reduces the allocations when constructing
a unique memory monitor name which were introduced when we marked the
monitor name as `redact.RedactableString`.

```
name                                           old time/op    new time/op    delta
FlowSetup/vectorize=true/distribute=true-24       162µs ± 4%     160µs ± 6%    ~     (p=0.087 n=18+20)
FlowSetup/vectorize=true/distribute=false-24      162µs ± 4%     161µs ± 5%    ~     (p=0.181 n=19+19)
FlowSetup/vectorize=false/distribute=true-24      159µs ± 4%     160µs ± 7%    ~     (p=0.862 n=20+20)
FlowSetup/vectorize=false/distribute=false-24     160µs ±11%     155µs ± 6%  -3.40%  (p=0.009 n=20+20)

name                                           old alloc/op   new alloc/op   delta
FlowSetup/vectorize=true/distribute=true-24      18.9kB ± 5%    18.8kB ± 6%    ~     (p=0.097 n=18+20)
FlowSetup/vectorize=true/distribute=false-24     17.7kB ± 1%    17.5kB ± 7%  -0.82%  (p=0.001 n=17+18)
FlowSetup/vectorize=false/distribute=true-24     25.4kB ± 0%    25.4kB ± 0%    ~     (p=0.824 n=16+17)
FlowSetup/vectorize=false/distribute=false-24    24.4kB ± 0%    24.4kB ± 1%    ~     (p=0.558 n=16+16)

name                                           old allocs/op  new allocs/op  delta
FlowSetup/vectorize=true/distribute=true-24         203 ± 1%       196 ± 2%  -3.39%  (p=0.000 n=19+19)
FlowSetup/vectorize=true/distribute=false-24        194 ± 1%       187 ± 3%  -3.59%  (p=0.000 n=19+19)
FlowSetup/vectorize=false/distribute=true-24        197 ± 0%       197 ± 0%    ~     (all equal)
FlowSetup/vectorize=false/distribute=false-24       187 ± 0%       187 ± 0%    ~     (p=0.515 n=16+17)
```

Release justification: low-risk improvement.

Release note: None
  • Loading branch information
yuzefovich committed Aug 27, 2022
1 parent 74092ab commit 649113d
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 117 deletions.
162 changes: 69 additions & 93 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,21 +379,17 @@ func (r opResult) createDiskBackedSort(
// The input is already partially ordered. Use a chunks sorter to avoid
// loading all the rows into memory.
opName := opNamePrefix + "sort-chunks"
deselectorUnlimitedAllocator := colmem.NewAllocator(
ctx, args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, opName, processorID,
), factory,
accounts := args.MonitorRegistry.CreateUnlimitedMemAccounts(
ctx, flowCtx, opName, processorID, 2, /* numAccounts */
)
deselectorUnlimitedAllocator := colmem.NewAllocator(ctx, accounts[0], factory)
var sortChunksMemAccount *mon.BoundAccount
sortChunksMemAccount, sorterMemMonitorName = args.MonitorRegistry.CreateMemAccountForSpillStrategyWithLimit(
ctx, flowCtx, spoolMemLimit, opName, processorID,
)
unlimitedMemAcc := args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, opName, processorID,
)
inMemorySorter = colexec.NewSortChunks(
deselectorUnlimitedAllocator,
colmem.NewLimitedAllocator(ctx, sortChunksMemAccount, unlimitedMemAcc, factory),
colmem.NewLimitedAllocator(ctx, sortChunksMemAccount, accounts[1], factory),
input, inputTypes, ordering.Columns, int(matchLen), maxOutputBatchMemSize,
)
} else {
Expand Down Expand Up @@ -425,21 +421,15 @@ func (r opResult) createDiskBackedSort(
sorterMemMonitorName,
func(input colexecop.Operator) colexecop.Operator {
opName := opNamePrefix + "external-sorter"
// We are using unlimited memory monitors here because external
// We are using unlimited memory accounts here because external
// sort itself is responsible for making sure that we stay within
// the memory limit.
sortUnlimitedAllocator := colmem.NewAllocator(
ctx, args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, opName+"-sort", processorID,
), factory)
mergeUnlimitedAllocator := colmem.NewAllocator(
ctx, args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, opName+"-merge", processorID,
), factory)
outputUnlimitedAllocator := colmem.NewAllocator(
ctx, args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, opName+"-output", processorID,
), factory)
accounts := args.MonitorRegistry.CreateUnlimitedMemAccounts(
ctx, flowCtx, opName, processorID, 3, /* numAccounts */
)
sortUnlimitedAllocator := colmem.NewAllocator(ctx, accounts[0], factory)
mergeUnlimitedAllocator := colmem.NewAllocator(ctx, accounts[1], factory)
outputUnlimitedAllocator := colmem.NewAllocator(ctx, accounts[2], factory)
diskAccount := args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, processorID)
es := colexecdisk.NewExternalSorter(
sortUnlimitedAllocator,
Expand Down Expand Up @@ -748,16 +738,14 @@ func NewColOperator(
// We have to create a separate account in order for the cFetcher to
// be able to precisely track the size of its output batch. This
// memory account is "streaming" in its nature, so we create an
// unlimited one.
cFetcherMemAcc := args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, "cfetcher" /* opName */, spec.ProcessorID,
)
kvFetcherMemAcc := args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, "kvfetcher" /* opName */, spec.ProcessorID,
// unlimited one. We also need another unlimited account for the
// KV fetcher.
accounts := args.MonitorRegistry.CreateUnlimitedMemAccounts(
ctx, flowCtx, "cfetcher" /* opName */, spec.ProcessorID, 2, /* numAccounts */
)
estimatedRowCount := spec.EstimatedRowCount
scanOp, err := colfetcher.NewColBatchScan(
ctx, colmem.NewAllocator(ctx, cFetcherMemAcc, factory), kvFetcherMemAcc,
ctx, colmem.NewAllocator(ctx, accounts[0], factory), accounts[1],
flowCtx, core.TableReader, post, estimatedRowCount, args.TypeResolver,
)
if err != nil {
Expand All @@ -775,17 +763,12 @@ func NewColOperator(
// We have to create a separate account in order for the cFetcher to
// be able to precisely track the size of its output batch. This
// memory account is "streaming" in its nature, so we create an
// unlimited one.
cFetcherMemAcc := args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, "cfetcher" /* opName */, spec.ProcessorID,
)
kvFetcherMemAcc := args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, "kvfetcher" /* opName */, spec.ProcessorID,
)
// We might use the Streamer API which requires a separate memory
// account that is bound to an unlimited memory monitor.
streamerBudgetAcc := args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, "streamer" /* opName */, spec.ProcessorID,
// unlimited one. We also need another unlimited account for the
// KV fetcher. Additionally, we might use the Streamer API which
// requires yet another separate memory account that is bound to an
// unlimited memory monitor.
accounts := args.MonitorRegistry.CreateUnlimitedMemAccounts(
ctx, flowCtx, "index-join" /* opName */, spec.ProcessorID, 3, /* numAccounts */
)
streamerDiskMonitor := args.MonitorRegistry.CreateDiskMonitor(
ctx, flowCtx, "streamer" /* opName */, spec.ProcessorID,
Expand All @@ -794,8 +777,8 @@ func NewColOperator(
copy(inputTypes, spec.Input[0].ColumnTypes)
indexJoinOp, err := colfetcher.NewColIndexJoin(
ctx, getStreamingAllocator(ctx, args),
colmem.NewAllocator(ctx, cFetcherMemAcc, factory),
kvFetcherMemAcc, streamerBudgetAcc, flowCtx,
colmem.NewAllocator(ctx, accounts[0], factory),
accounts[1], accounts[2], flowCtx,
inputs[0].Root, core.JoinReader, post, inputTypes,
streamerDiskMonitor, args.TypeResolver,
)
Expand Down Expand Up @@ -873,32 +856,27 @@ func NewColOperator(

if needHash {
opName := redact.RedactableString("hash-aggregator")
outputUnlimitedAllocator := colmem.NewAllocator(
ctx,
args.MonitorRegistry.CreateUnlimitedMemAccount(ctx, flowCtx, opName+"-output", spec.ProcessorID),
factory,
)
// We have separate unit tests that instantiate the in-memory
// hash aggregators, so we don't need to look at
// args.TestingKnobs.DiskSpillingDisabled and always instantiate
// a disk-backed one here.
diskSpillingDisabled := !colexec.HashAggregationDiskSpillingEnabled.Get(&flowCtx.Cfg.Settings.SV)
if diskSpillingDisabled {
// The disk spilling is disabled by the cluster setting, so
// we give an unlimited memory account to the in-memory
// hash aggregator and don't set up the disk spiller.
hashAggregatorUnlimitedMemAccount := args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, opName, spec.ProcessorID,
)
hashTableUnlimitedMemAccount := args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, opName+"-hashtable", spec.ProcessorID,
// we give unlimited memory accounts to the in-memory hash
// aggregator and all of its components and don't set up the
// disk spiller.
accounts := args.MonitorRegistry.CreateUnlimitedMemAccounts(
ctx, flowCtx, opName, spec.ProcessorID, 3, /* numAccounts */
)
hashAggregatorUnlimitedMemAccount := accounts[0]
newAggArgs.Allocator = colmem.NewAllocator(
ctx, hashAggregatorUnlimitedMemAccount, factory,
)
newAggArgs.MemAccount = hashAggregatorUnlimitedMemAccount
evalCtx.SingleDatumAggMemAccount = hashAggregatorUnlimitedMemAccount
hashTableAllocator := colmem.NewAllocator(ctx, hashTableUnlimitedMemAccount, factory)
hashTableAllocator := colmem.NewAllocator(ctx, accounts[1], factory)
outputUnlimitedAllocator := colmem.NewAllocator(ctx, accounts[2], factory)
maxOutputBatchMemSize := execinfra.GetWorkMemLimit(flowCtx)
// The second argument is nil because we disable the
// tracking of the input tuples.
Expand Down Expand Up @@ -931,38 +909,40 @@ func NewColOperator(
hashTableMemAccount := args.MonitorRegistry.CreateExtraMemAccountForSpillStrategy(
string(hashAggregatorMemMonitorName),
)
spillingQueueMemMonitorName := hashAggregatorMemMonitorName + "-spilling-queue"
// We need to create a separate memory account for the
// spilling queue because it looks at how much memory it has
// already used in order to decide when to spill to disk.
spillingQueueMemAccount := args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, spillingQueueMemMonitorName, spec.ProcessorID,
// We need to create four unlimited memory accounts so that
// each component could track precisely its own usage. The
// components are
// - the hash aggregator
// - the hash table
// - output batch of the hash aggregator
// - the spilling queue for the input tuples tracking.
accounts := args.MonitorRegistry.CreateUnlimitedMemAccounts(
ctx, flowCtx, opName, spec.ProcessorID, 4, /* numAccounts */
)
hashAggUnlimitedAcc := args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, opName, spec.ProcessorID,
)
newAggArgs.Allocator = colmem.NewLimitedAllocator(ctx, hashAggregatorMemAccount, hashAggUnlimitedAcc, factory)
newAggArgs.Allocator = colmem.NewLimitedAllocator(ctx, hashAggregatorMemAccount, accounts[0], factory)
newAggArgs.MemAccount = hashAggregatorMemAccount
hashTableUnlimitedAcc := args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, opName, spec.ProcessorID,
)
hashTableAllocator := colmem.NewLimitedAllocator(ctx, hashTableMemAccount, hashTableUnlimitedAcc, factory)
hashTableAllocator := colmem.NewLimitedAllocator(ctx, hashTableMemAccount, accounts[1], factory)
inMemoryHashAggregator := colexec.NewHashAggregator(
newAggArgs,
&colexecutils.NewSpillingQueueArgs{
UnlimitedAllocator: colmem.NewAllocator(ctx, spillingQueueMemAccount, factory),
UnlimitedAllocator: colmem.NewAllocator(ctx, accounts[2], factory),
Types: inputTypes,
MemoryLimit: inputTuplesTrackingMemLimit,
DiskQueueCfg: args.DiskQueueCfg,
FDSemaphore: args.FDSemaphore,
DiskAcc: args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, spillingQueueMemMonitorName, spec.ProcessorID),
DiskAcc: args.MonitorRegistry.CreateDiskAccount(
ctx, flowCtx, hashAggregatorMemMonitorName+"-spilling-queue", spec.ProcessorID,
),
},
hashTableAllocator,
outputUnlimitedAllocator,
colmem.NewAllocator(ctx, accounts[3], factory),
maxOutputBatchMemSize,
)
ehaOpName := redact.RedactableString("external-hash-aggregator")
ehaMemAccount := args.MonitorRegistry.CreateUnlimitedMemAccount(ctx, flowCtx, ehaOpName, spec.ProcessorID)
ehaAccounts := args.MonitorRegistry.CreateUnlimitedMemAccounts(
ctx, flowCtx, ehaOpName, spec.ProcessorID, 3, /* numAccounts */
)
ehaMemAccount := ehaAccounts[0]
// Note that we will use an unlimited memory account here
// even for the in-memory hash aggregator since it is easier
// to do so than to try to replace the memory account if the
Expand All @@ -982,23 +962,15 @@ func NewColOperator(
newAggArgs.Allocator = colmem.NewAllocator(ctx, ehaMemAccount, factory)
newAggArgs.MemAccount = ehaMemAccount
newAggArgs.Input = input
ehaHashTableMemAccount := args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, ehaOpName+"-hashtable", spec.ProcessorID,
)
ehaHashTableAllocator := colmem.NewAllocator(ctx, ehaHashTableMemAccount, factory)
ehaHashTableAllocator := colmem.NewAllocator(ctx, ehaAccounts[1], factory)
eha, toClose := colexecdisk.NewExternalHashAggregator(
flowCtx,
args,
&newAggArgs,
result.makeDiskBackedSorterConstructor(ctx, flowCtx, args, ehaOpName, factory),
args.MonitorRegistry.CreateDiskAccount(ctx, flowCtx, ehaOpName, spec.ProcessorID),
ehaHashTableAllocator,
// Note that here we can use the same allocator
// object as we passed to the in-memory hash
// aggregator because only one (either in-memory
// or external) operator will reach the output
// state.
outputUnlimitedAllocator,
colmem.NewAllocator(ctx, ehaAccounts[2], factory),
maxOutputBatchMemSize,
)
result.ToClose = append(result.ToClose, toClose)
Expand Down Expand Up @@ -1116,9 +1088,13 @@ func NewColOperator(
hashJoinerMemAccount, hashJoinerMemMonitorName := args.MonitorRegistry.CreateMemAccountForSpillStrategy(
ctx, flowCtx, opName, spec.ProcessorID,
)
hashJoinerUnlimitedAllocator := colmem.NewAllocator(
ctx, args.MonitorRegistry.CreateUnlimitedMemAccount(ctx, flowCtx, opName, spec.ProcessorID), factory,
// Create two unlimited memory accounts (one for the output
// batch and another for the "overdraft" accounting when
// spilling to disk occurs).
accounts := args.MonitorRegistry.CreateUnlimitedMemAccounts(
ctx, flowCtx, opName, spec.ProcessorID, 2, /* numAccounts */
)
outputUnlimitedAllocator := colmem.NewAllocator(ctx, accounts[0], factory)
hjSpec := colexecjoin.MakeHashJoinerSpec(
core.HashJoiner.Type,
core.HashJoiner.LeftEqColumns,
Expand All @@ -1128,12 +1104,9 @@ func NewColOperator(
core.HashJoiner.RightEqColumnsAreKey,
)

hashJoinerUnlimitedAcc := args.MonitorRegistry.CreateUnlimitedMemAccount(
ctx, flowCtx, opName, spec.ProcessorID,
)
inMemoryHashJoiner := colexecjoin.NewHashJoiner(
colmem.NewLimitedAllocator(ctx, hashJoinerMemAccount, hashJoinerUnlimitedAcc, factory),
hashJoinerUnlimitedAllocator, hjSpec, inputs[0].Root, inputs[1].Root,
colmem.NewLimitedAllocator(ctx, hashJoinerMemAccount, accounts[1], factory),
outputUnlimitedAllocator, hjSpec, inputs[0].Root, inputs[1].Root,
colexecjoin.HashJoinerInitialNumBuckets,
)
if args.TestingKnobs.DiskSpillingDisabled {
Expand Down Expand Up @@ -1807,12 +1780,15 @@ func (r opResult) finishBufferedWindowerArgs(
needsBuffer bool,
) {
args.DiskAcc = monitorRegistry.CreateDiskAccount(ctx, flowCtx, opName, processorID)
mainAcc := monitorRegistry.CreateUnlimitedMemAccount(ctx, flowCtx, opName, processorID)
args.MainAllocator = colmem.NewAllocator(ctx, mainAcc, factory)
var mainAcc *mon.BoundAccount
if needsBuffer {
bufferAcc := monitorRegistry.CreateUnlimitedMemAccount(ctx, flowCtx, opName, processorID)
args.BufferAllocator = colmem.NewAllocator(ctx, bufferAcc, factory)
accounts := monitorRegistry.CreateUnlimitedMemAccounts(ctx, flowCtx, opName, processorID, 2 /* numAccounts */)
mainAcc = accounts[0]
args.BufferAllocator = colmem.NewAllocator(ctx, accounts[1], factory)
} else {
mainAcc = monitorRegistry.CreateUnlimitedMemAccount(ctx, flowCtx, opName, processorID)
}
args.MainAllocator = colmem.NewAllocator(ctx, mainAcc, factory)
}

func (r opResult) finishScanPlanning(op colfetcher.ScanOperator, resultTypes []*types.T) {
Expand Down
59 changes: 36 additions & 23 deletions pkg/sql/colexec/colexecargs/monitor_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colexecargs

import (
"context"
"strconv"

"github.com/cockroachdb/cockroach/pkg/sql/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
Expand Down Expand Up @@ -45,7 +46,8 @@ func (r *MonitorRegistry) NewStreamingMemAccount(flowCtx *execinfra.FlowCtx) *mo
func (r *MonitorRegistry) getMemMonitorName(
opName redact.RedactableString, processorID int32, suffix redact.RedactableString,
) redact.RedactableString {
return redact.Sprintf("%s-%d-%s-%d", opName, processorID, suffix, len(r.monitors))
return opName + redact.RedactableString(strconv.Itoa(int(processorID))) + suffix +
redact.RedactableString(strconv.Itoa(len(r.monitors)))
}

// CreateMemAccountForSpillStrategy instantiates a memory monitor and a memory
Expand Down Expand Up @@ -115,42 +117,53 @@ func (r *MonitorRegistry) CreateExtraMemAccountForSpillStrategy(
return nil
}

// CreateUnlimitedMemAccount instantiates an unlimited memory monitor and a
// memory account to be used with a buffering disk-backed colexecop.Operator (or
// in special circumstances in place of a streaming account when the precise
// memory usage is needed by an operator). The receiver is updated to have
// references to both objects. Note that the returned account is only
// "unlimited" in that it does not have a hard limit that it enforces, but a
// limit might be enforced by a root monitor.
// CreateUnlimitedMemAccounts instantiates an unlimited memory monitor (with a
// unique monitor name) and a number of memory accounts bound to it. The
// receiver is updated to have references to all objects. Note that the returned
// accounts are only "unlimited" in that they do not have a hard limit that they
// enforce, but a limit might be enforced by a root monitor.
//
// Note that the memory monitor name is not returned (unlike above) because no
// caller actually needs it.
func (r *MonitorRegistry) CreateUnlimitedMemAccounts(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
opName redact.RedactableString,
processorID int32,
numAccounts int,
) []*mon.BoundAccount {
monitorName := r.getMemMonitorName(opName, processorID, "unlimited" /* suffix */)
_, accounts := r.createUnlimitedMemAccounts(ctx, flowCtx, monitorName, numAccounts)
return accounts
}

// CreateUnlimitedMemAccount is a light wrapper around
// CreateUnlimitedMemAccounts when only a single account is desired.
func (r *MonitorRegistry) CreateUnlimitedMemAccount(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
opName redact.RedactableString,
processorID int32,
) *mon.BoundAccount {
monitorName := r.getMemMonitorName(opName, processorID, "unlimited" /* suffix */)
bufferingOpUnlimitedMemMonitor := execinfra.NewMonitor(
ctx, flowCtx.EvalCtx.Mon, monitorName,
)
r.monitors = append(r.monitors, bufferingOpUnlimitedMemMonitor)
bufferingMemAccount := bufferingOpUnlimitedMemMonitor.MakeBoundAccount()
r.accounts = append(r.accounts, &bufferingMemAccount)
return &bufferingMemAccount
return r.CreateUnlimitedMemAccounts(ctx, flowCtx, opName, processorID, 1 /* numAccounts */)[0]
}

// CreateUnlimitedMemAccounts instantiates an unlimited memory monitor and
// numAccounts memory accounts. It should only be used when the component
// supports spilling to disk and is made aware of a memory usage limit
// separately. The receiver is updated to have a reference to all created
// components.
func (r *MonitorRegistry) CreateUnlimitedMemAccounts(
// CreateUnlimitedMemAccountsWithName is similar to CreateUnlimitedMemAccounts
// with the only difference that the monitor name is provided by the caller.
func (r *MonitorRegistry) CreateUnlimitedMemAccountsWithName(
ctx context.Context, flowCtx *execinfra.FlowCtx, name redact.RedactableString, numAccounts int,
) (*mon.BytesMonitor, []*mon.BoundAccount) {
return r.createUnlimitedMemAccounts(ctx, flowCtx, name+"-unlimited", numAccounts)
}

func (r *MonitorRegistry) createUnlimitedMemAccounts(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
monitorName redact.RedactableString,
numAccounts int,
) (*mon.BytesMonitor, []*mon.BoundAccount) {
bufferingOpUnlimitedMemMonitor := execinfra.NewMonitor(
ctx, flowCtx.EvalCtx.Mon, name+"-unlimited",
ctx, flowCtx.EvalCtx.Mon, monitorName,
)
r.monitors = append(r.monitors, bufferingOpUnlimitedMemMonitor)
oldLen := len(r.accounts)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ func (s *vectorizedFlowCreator) setupRouter(
}
mmName := "hash-router-[" + streamIDs + "]"

hashRouterMemMonitor, accounts := s.monitorRegistry.CreateUnlimitedMemAccounts(ctx, flowCtx, mmName, len(output.Streams))
hashRouterMemMonitor, accounts := s.monitorRegistry.CreateUnlimitedMemAccountsWithName(ctx, flowCtx, mmName, len(output.Streams))
allocators := make([]*colmem.Allocator, len(output.Streams))
for i := range allocators {
allocators[i] = colmem.NewAllocator(ctx, accounts[i], factory)
Expand Down

0 comments on commit 649113d

Please sign in to comment.