Skip to content

Commit

Permalink
flowinfra: clean up flow stats propagation in row-based flows
Browse files Browse the repository at this point in the history
Previously, we would attach `FlowStats` (like max memory usage) to the
"stream component" stats object. I don't really understand why that was
the case, probably it was due to misunderstanding how tracing works (in
particular, the TODOs that are now removed mentioned "flow level span",
but we don't need to attach the metadata to a particular tracing span).

This simplifies the code a bit but also simplifies the work on adding
region information to `ComponentID` object.

Release note: None
  • Loading branch information
yuzefovich committed Jul 10, 2023
1 parent 5e8c311 commit 531c8cf
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 37 deletions.
24 changes: 0 additions & 24 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,30 +346,6 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.nodeLevelStats.NetworkBytesSentGroupedByNode[originInstanceID] += bytes
}

// The row execution flow attaches flow stats to a stream stat with the
// last outbox, so we need to check stream stats for max memory and disk
// usage.
// TODO(cathymw): maxMemUsage shouldn't be attached to span stats that
// are associated with streams, since it's a flow level stat. However,
// due to the row exec engine infrastructure, it is too complicated to
// attach this to a flow level span. If the row exec engine gets
// removed, getting maxMemUsage from streamStats should be removed as
// well.
if stats.stats.FlowStats.MaxMemUsage.HasValue() {
memUsage := int64(stats.stats.FlowStats.MaxMemUsage.Value())
if memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[originInstanceID] {
a.nodeLevelStats.MaxMemoryUsageGroupedByNode[originInstanceID] = memUsage
}
}
if stats.stats.FlowStats.MaxDiskUsage.HasValue() {
if diskUsage := int64(stats.stats.FlowStats.MaxDiskUsage.Value()); diskUsage > a.nodeLevelStats.MaxDiskUsageGroupedByNode[originInstanceID] {
a.nodeLevelStats.MaxDiskUsageGroupedByNode[originInstanceID] = diskUsage
}
}
if stats.stats.FlowStats.ConsumedRU.HasValue() {
a.nodeLevelStats.RUEstimateGroupedByNode[originInstanceID] += int64(stats.stats.FlowStats.ConsumedRU.Value())
}

numMessages, err := getNumNetworkMessagesFromComponentsStats(stats.stats)
if err != nil {
errs = errors.CombineErrors(errs, errors.Wrap(err, "error calculating number of network messages"))
Expand Down
24 changes: 11 additions & 13 deletions pkg/sql/flowinfra/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type Outbox struct {
}

statsCollectionEnabled bool
stats execinfrapb.ComponentStats
streamStats, flowStats execinfrapb.ComponentStats

// numOutboxes is an atomic that keeps track of how many outboxes are left.
// When there is one outbox left, the flow-level stats are added to the last
Expand Down Expand Up @@ -109,7 +109,8 @@ func NewOutbox(
m.streamID = streamID
m.numOutboxes = numOutboxes
m.isGatewayNode = isGatewayNode
m.stats.Component = flowCtx.StreamComponentID(streamID)
m.streamStats.Component = flowCtx.StreamComponentID(streamID)
m.flowStats.Component = execinfrapb.FlowComponentID(flowCtx.NodeID.SQLInstanceID(), flowCtx.ID)
return m
}

Expand Down Expand Up @@ -149,7 +150,7 @@ func (m *Outbox) AddRow(
mustFlush = true
}
if m.statsCollectionEnabled {
m.stats.NetTx.TuplesSent.Add(1)
m.streamStats.NetTx.TuplesSent.Add(1)
}
}
m.numRows++
Expand Down Expand Up @@ -178,8 +179,8 @@ func (m *Outbox) flush(ctx context.Context) error {
}
sendErr := m.stream.Send(msg)
if m.statsCollectionEnabled {
m.stats.NetTx.BytesSent.Add(int64(msg.Size()))
m.stats.NetTx.MessagesSent.Add(1)
m.streamStats.NetTx.BytesSent.Add(int64(msg.Size()))
m.streamStats.NetTx.MessagesSent.Add(1)
}
for _, rpm := range msg.Data.Metadata {
if metricsMeta, ok := rpm.Value.(*execinfrapb.RemoteProducerMetadata_Metrics_); ok {
Expand Down Expand Up @@ -310,15 +311,12 @@ func (m *Outbox) mainLoop(ctx context.Context, wg *sync.WaitGroup) (retErr error
return err
}
if !m.isGatewayNode && m.numOutboxes != nil && atomic.AddInt32(m.numOutboxes, -1) == 0 {
// TODO(cathymw): maxMemUsage shouldn't be attached to span stats that are associated with streams,
// since it's a flow level stat. However, due to the row exec engine infrastructure, it is too
// complicated to attach this to a flow level span. If the row exec engine gets removed, getting
// maxMemUsage from streamStats should be removed as well.
m.stats.FlowStats.MaxMemUsage.Set(uint64(m.flowCtx.Mon.MaximumBytes()))
m.stats.FlowStats.MaxDiskUsage.Set(uint64(m.flowCtx.DiskMonitor.MaximumBytes()))
m.stats.FlowStats.ConsumedRU.Set(uint64(m.flowCtx.TenantCPUMonitor.EndCollection(ctx)))
m.flowStats.FlowStats.MaxMemUsage.Set(uint64(m.flowCtx.Mon.MaximumBytes()))
m.flowStats.FlowStats.MaxDiskUsage.Set(uint64(m.flowCtx.DiskMonitor.MaximumBytes()))
m.flowStats.FlowStats.ConsumedRU.Set(uint64(m.flowCtx.TenantCPUMonitor.EndCollection(ctx)))
}
span.RecordStructured(&m.stats)
span.RecordStructured(&m.streamStats)
span.RecordStructured(&m.flowStats)
if !m.flowCtx.Gateway {
if trace := tracing.SpanFromContext(ctx).GetConfiguredRecording(); trace != nil {
err := m.AddRow(ctx, nil, &execinfrapb.ProducerMetadata{TraceData: trace})
Expand Down

0 comments on commit 531c8cf

Please sign in to comment.