diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index e928878e0a38..54e5846e8e14 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -346,30 +346,6 @@ func (a *TraceAnalyzer) ProcessStats() error { a.nodeLevelStats.NetworkBytesSentGroupedByNode[originInstanceID] += bytes } - // The row execution flow attaches flow stats to a stream stat with the - // last outbox, so we need to check stream stats for max memory and disk - // usage. - // TODO(cathymw): maxMemUsage shouldn't be attached to span stats that - // are associated with streams, since it's a flow level stat. However, - // due to the row exec engine infrastructure, it is too complicated to - // attach this to a flow level span. If the row exec engine gets - // removed, getting maxMemUsage from streamStats should be removed as - // well. - if stats.stats.FlowStats.MaxMemUsage.HasValue() { - memUsage := int64(stats.stats.FlowStats.MaxMemUsage.Value()) - if memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[originInstanceID] { - a.nodeLevelStats.MaxMemoryUsageGroupedByNode[originInstanceID] = memUsage - } - } - if stats.stats.FlowStats.MaxDiskUsage.HasValue() { - if diskUsage := int64(stats.stats.FlowStats.MaxDiskUsage.Value()); diskUsage > a.nodeLevelStats.MaxDiskUsageGroupedByNode[originInstanceID] { - a.nodeLevelStats.MaxDiskUsageGroupedByNode[originInstanceID] = diskUsage - } - } - if stats.stats.FlowStats.ConsumedRU.HasValue() { - a.nodeLevelStats.RUEstimateGroupedByNode[originInstanceID] += int64(stats.stats.FlowStats.ConsumedRU.Value()) - } - numMessages, err := getNumNetworkMessagesFromComponentsStats(stats.stats) if err != nil { errs = errors.CombineErrors(errs, errors.Wrap(err, "error calculating number of network messages")) diff --git a/pkg/sql/flowinfra/outbox.go b/pkg/sql/flowinfra/outbox.go index d3e6f65af473..8bee6c7717ff 100644 --- a/pkg/sql/flowinfra/outbox.go +++ b/pkg/sql/flowinfra/outbox.go @@ -80,7 +80,7 @@ type Outbox struct { } statsCollectionEnabled bool - stats execinfrapb.ComponentStats + streamStats, flowStats execinfrapb.ComponentStats // numOutboxes is an atomic that keeps track of how many outboxes are left. // When there is one outbox left, the flow-level stats are added to the last @@ -109,7 +109,8 @@ func NewOutbox( m.streamID = streamID m.numOutboxes = numOutboxes m.isGatewayNode = isGatewayNode - m.stats.Component = flowCtx.StreamComponentID(streamID) + m.streamStats.Component = flowCtx.StreamComponentID(streamID) + m.flowStats.Component = execinfrapb.FlowComponentID(flowCtx.NodeID.SQLInstanceID(), flowCtx.ID) return m } @@ -149,7 +150,7 @@ func (m *Outbox) AddRow( mustFlush = true } if m.statsCollectionEnabled { - m.stats.NetTx.TuplesSent.Add(1) + m.streamStats.NetTx.TuplesSent.Add(1) } } m.numRows++ @@ -178,8 +179,8 @@ func (m *Outbox) flush(ctx context.Context) error { } sendErr := m.stream.Send(msg) if m.statsCollectionEnabled { - m.stats.NetTx.BytesSent.Add(int64(msg.Size())) - m.stats.NetTx.MessagesSent.Add(1) + m.streamStats.NetTx.BytesSent.Add(int64(msg.Size())) + m.streamStats.NetTx.MessagesSent.Add(1) } for _, rpm := range msg.Data.Metadata { if metricsMeta, ok := rpm.Value.(*execinfrapb.RemoteProducerMetadata_Metrics_); ok { @@ -310,15 +311,12 @@ func (m *Outbox) mainLoop(ctx context.Context, wg *sync.WaitGroup) (retErr error return err } if !m.isGatewayNode && m.numOutboxes != nil && atomic.AddInt32(m.numOutboxes, -1) == 0 { - // TODO(cathymw): maxMemUsage shouldn't be attached to span stats that are associated with streams, - // since it's a flow level stat. However, due to the row exec engine infrastructure, it is too - // complicated to attach this to a flow level span. If the row exec engine gets removed, getting - // maxMemUsage from streamStats should be removed as well. - m.stats.FlowStats.MaxMemUsage.Set(uint64(m.flowCtx.Mon.MaximumBytes())) - m.stats.FlowStats.MaxDiskUsage.Set(uint64(m.flowCtx.DiskMonitor.MaximumBytes())) - m.stats.FlowStats.ConsumedRU.Set(uint64(m.flowCtx.TenantCPUMonitor.EndCollection(ctx))) + m.flowStats.FlowStats.MaxMemUsage.Set(uint64(m.flowCtx.Mon.MaximumBytes())) + m.flowStats.FlowStats.MaxDiskUsage.Set(uint64(m.flowCtx.DiskMonitor.MaximumBytes())) + m.flowStats.FlowStats.ConsumedRU.Set(uint64(m.flowCtx.TenantCPUMonitor.EndCollection(ctx))) } - span.RecordStructured(&m.stats) + span.RecordStructured(&m.streamStats) + span.RecordStructured(&m.flowStats) if !m.flowCtx.Gateway { if trace := tracing.SpanFromContext(ctx).GetConfiguredRecording(); trace != nil { err := m.AddRow(ctx, nil, &execinfrapb.ProducerMetadata{TraceData: trace})