Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
106242: pg_class: populate pg_class.relreplident r=rafiss a=otan

Release note (sql change): pg_class's relreplident field was previously unpopulated. It is now populated with `d` for all tables (as each table has a primary key) and n otherwise.

Informs: #84505

106546: flowinfra: clean up flow stats propagation in row-based flows r=yuzefovich a=yuzefovich

Previously, we would attach `FlowStats` (like max memory usage) to the "stream component" stats object. I don't really understand why that was the case, probably it was due to misunderstanding how tracing works (in particular, the TODOs that are now removed mentioned "flow level span", but we don't need to attach the metadata to a particular tracing span).

This simplifies the code a bit but also simplifies the work on adding region information to `ComponentID` object.

Epic: None

Release note: None

Co-authored-by: Oliver Tan <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
3 people committed Jul 10, 2023
3 parents f797060 + fcd9111 + 531c8cf commit 33d9616
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 88 deletions.
24 changes: 0 additions & 24 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,30 +346,6 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.nodeLevelStats.NetworkBytesSentGroupedByNode[originInstanceID] += bytes
}

// The row execution flow attaches flow stats to a stream stat with the
// last outbox, so we need to check stream stats for max memory and disk
// usage.
// TODO(cathymw): maxMemUsage shouldn't be attached to span stats that
// are associated with streams, since it's a flow level stat. However,
// due to the row exec engine infrastructure, it is too complicated to
// attach this to a flow level span. If the row exec engine gets
// removed, getting maxMemUsage from streamStats should be removed as
// well.
if stats.stats.FlowStats.MaxMemUsage.HasValue() {
memUsage := int64(stats.stats.FlowStats.MaxMemUsage.Value())
if memUsage > a.nodeLevelStats.MaxMemoryUsageGroupedByNode[originInstanceID] {
a.nodeLevelStats.MaxMemoryUsageGroupedByNode[originInstanceID] = memUsage
}
}
if stats.stats.FlowStats.MaxDiskUsage.HasValue() {
if diskUsage := int64(stats.stats.FlowStats.MaxDiskUsage.Value()); diskUsage > a.nodeLevelStats.MaxDiskUsageGroupedByNode[originInstanceID] {
a.nodeLevelStats.MaxDiskUsageGroupedByNode[originInstanceID] = diskUsage
}
}
if stats.stats.FlowStats.ConsumedRU.HasValue() {
a.nodeLevelStats.RUEstimateGroupedByNode[originInstanceID] += int64(stats.stats.FlowStats.ConsumedRU.Value())
}

numMessages, err := getNumNetworkMessagesFromComponentsStats(stats.stats)
if err != nil {
errs = errors.CombineErrors(errs, errors.Wrap(err, "error calculating number of network messages"))
Expand Down
24 changes: 11 additions & 13 deletions pkg/sql/flowinfra/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type Outbox struct {
}

statsCollectionEnabled bool
stats execinfrapb.ComponentStats
streamStats, flowStats execinfrapb.ComponentStats

// numOutboxes is an atomic that keeps track of how many outboxes are left.
// When there is one outbox left, the flow-level stats are added to the last
Expand Down Expand Up @@ -109,7 +109,8 @@ func NewOutbox(
m.streamID = streamID
m.numOutboxes = numOutboxes
m.isGatewayNode = isGatewayNode
m.stats.Component = flowCtx.StreamComponentID(streamID)
m.streamStats.Component = flowCtx.StreamComponentID(streamID)
m.flowStats.Component = execinfrapb.FlowComponentID(flowCtx.NodeID.SQLInstanceID(), flowCtx.ID)
return m
}

Expand Down Expand Up @@ -149,7 +150,7 @@ func (m *Outbox) AddRow(
mustFlush = true
}
if m.statsCollectionEnabled {
m.stats.NetTx.TuplesSent.Add(1)
m.streamStats.NetTx.TuplesSent.Add(1)
}
}
m.numRows++
Expand Down Expand Up @@ -178,8 +179,8 @@ func (m *Outbox) flush(ctx context.Context) error {
}
sendErr := m.stream.Send(msg)
if m.statsCollectionEnabled {
m.stats.NetTx.BytesSent.Add(int64(msg.Size()))
m.stats.NetTx.MessagesSent.Add(1)
m.streamStats.NetTx.BytesSent.Add(int64(msg.Size()))
m.streamStats.NetTx.MessagesSent.Add(1)
}
for _, rpm := range msg.Data.Metadata {
if metricsMeta, ok := rpm.Value.(*execinfrapb.RemoteProducerMetadata_Metrics_); ok {
Expand Down Expand Up @@ -310,15 +311,12 @@ func (m *Outbox) mainLoop(ctx context.Context, wg *sync.WaitGroup) (retErr error
return err
}
if !m.isGatewayNode && m.numOutboxes != nil && atomic.AddInt32(m.numOutboxes, -1) == 0 {
// TODO(cathymw): maxMemUsage shouldn't be attached to span stats that are associated with streams,
// since it's a flow level stat. However, due to the row exec engine infrastructure, it is too
// complicated to attach this to a flow level span. If the row exec engine gets removed, getting
// maxMemUsage from streamStats should be removed as well.
m.stats.FlowStats.MaxMemUsage.Set(uint64(m.flowCtx.Mon.MaximumBytes()))
m.stats.FlowStats.MaxDiskUsage.Set(uint64(m.flowCtx.DiskMonitor.MaximumBytes()))
m.stats.FlowStats.ConsumedRU.Set(uint64(m.flowCtx.TenantCPUMonitor.EndCollection(ctx)))
m.flowStats.FlowStats.MaxMemUsage.Set(uint64(m.flowCtx.Mon.MaximumBytes()))
m.flowStats.FlowStats.MaxDiskUsage.Set(uint64(m.flowCtx.DiskMonitor.MaximumBytes()))
m.flowStats.FlowStats.ConsumedRU.Set(uint64(m.flowCtx.TenantCPUMonitor.EndCollection(ctx)))
}
span.RecordStructured(&m.stats)
span.RecordStructured(&m.streamStats)
span.RecordStructured(&m.flowStats)
if !m.flowCtx.Gateway {
if trace := tracing.SpanFromContext(ctx).GetConfiguredRecording(); trace != nil {
err := m.AddRow(ctx, nil, &execinfrapb.ProducerMetadata{TraceData: trace})
Expand Down
74 changes: 37 additions & 37 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -657,38 +657,38 @@ t6_expr_idx1 false i 1 0 false false
mv1 false m 2 0 false true
mv1_pkey false i 1 0 false false

query TBBBITT colnames,rowsort
SELECT relname, relhasrules, relhastriggers, relhassubclass, relfrozenxid, relacl, reloptions
query TBBBITTT colnames,rowsort
SELECT relname, relhasrules, relhastriggers, relhassubclass, relfrozenxid, relacl, reloptions, relreplident
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = 'public'
----
relname relhasrules relhastriggers relhassubclass relfrozenxid relacl reloptions
t1 false false false 0 NULL NULL
t1_pkey false false false 0 NULL NULL
t1_a_key false false false 0 NULL NULL
index_key false false false 0 NULL NULL
t1_m_seq false false false 0 NULL NULL
t1_n_seq false false false 0 NULL NULL
t2 false false false 0 NULL NULL
t2_pkey false false false 0 NULL NULL
t2_t1_id_idx false false false 0 NULL NULL
t3 false false false 0 NULL NULL
t3_pkey false false false 0 NULL NULL
t3_a_b_idx false false false 0 NULL NULL
v1 false false false 0 NULL NULL
t4 false false false 0 NULL NULL
t4_pkey false false false 0 NULL NULL
t5 false false false 0 NULL NULL
t5_pkey false false false 0 NULL NULL
t6 false false false 0 NULL NULL
t6_pkey false false false 0 NULL NULL
t6_expr_idx false false false 0 NULL NULL
t6_expr_expr1_idx false false false 0 NULL NULL
t6_expr_key false false false 0 NULL NULL
t6_expr_idx1 false false false 0 NULL NULL
mv1 false false false 0 NULL NULL
mv1_pkey false false false 0 NULL NULL
relname relhasrules relhastriggers relhassubclass relfrozenxid relacl reloptions relreplident
t1 false false false 0 NULL NULL d
t1_pkey false false false 0 NULL NULL n
t1_a_key false false false 0 NULL NULL n
index_key false false false 0 NULL NULL n
t1_m_seq false false false 0 NULL NULL n
t1_n_seq false false false 0 NULL NULL n
t2 false false false 0 NULL NULL d
t2_pkey false false false 0 NULL NULL n
t2_t1_id_idx false false false 0 NULL NULL n
t3 false false false 0 NULL NULL d
t3_pkey false false false 0 NULL NULL n
t3_a_b_idx false false false 0 NULL NULL n
v1 false false false 0 NULL NULL n
t4 false false false 0 NULL NULL d
t4_pkey false false false 0 NULL NULL n
t5 false false false 0 NULL NULL d
t5_pkey false false false 0 NULL NULL n
t6 false false false 0 NULL NULL d
t6_pkey false false false 0 NULL NULL n
t6_expr_idx false false false 0 NULL NULL n
t6_expr_expr1_idx false false false 0 NULL NULL n
t6_expr_key false false false 0 NULL NULL n
t6_expr_idx1 false false false 0 NULL NULL n
mv1 false false false 0 NULL NULL d
mv1_pkey false false false 0 NULL NULL n

## pg_catalog.pg_attribute

Expand Down Expand Up @@ -924,20 +924,20 @@ CREATE MATERIALIZED VIEW mv_test AS SELECT 1
statement ok
CREATE SEQUENCE seq_test

query TT
SELECT relname, relkind
query TTT
SELECT relname, relkind, relreplident
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = 'public'
ORDER BY relname
----
mv_test m
mv_test_pkey i
seq_test S
tbl_test r
tbl_test_pkey i
tbl_test_v_idx i
view_test v
mv_test m d
mv_test_pkey i n
seq_test S n
tbl_test r d
tbl_test_pkey i n
tbl_test_v_idx i n
view_test v n

statement ok
DROP DATABASE relkinds
Expand Down
32 changes: 18 additions & 14 deletions pkg/sql/pg_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,15 +664,19 @@ https://www.postgresql.org/docs/9.5/catalog-pg-class.html`,
// The only difference between tables, views and sequences are the relkind and relam columns.
relKind := relKindTable
relAm := forwardIndexOid
replIdent := "d" // default;
if table.IsView() {
relKind = relKindView
if table.MaterializedView() {
relKind = relKindMaterializedView
} else {
replIdent = "n"
}
relAm = oidZero
} else if table.IsSequence() {
relKind = relKindSequence
relAm = oidZero
replIdent = "n"
}
relPersistence := relPersistencePermanent
if table.IsTemporary() {
Expand Down Expand Up @@ -724,13 +728,13 @@ https://www.postgresql.org/docs/9.5/catalog-pg-class.html`,
tree.DNull, // relacl
relOptions, // reloptions
// These columns were automatically created by pg_catalog_test's missing column generator.
tree.DNull, // relforcerowsecurity
tree.DNull, // relispartition
tree.DNull, // relispopulated
tree.DNull, // relreplident
tree.DNull, // relrewrite
tree.DNull, // relrowsecurity
tree.DNull, // relpartbound
tree.DNull, // relforcerowsecurity
tree.DNull, // relispartition
tree.DNull, // relispopulated
tree.NewDString(replIdent), // relreplident
tree.DNull, // relrewrite
tree.DNull, // relrowsecurity
tree.DNull, // relpartbound
// These columns were automatically created by pg_catalog_test's missing column generator.
tree.DNull, // relminmxid
); err != nil {
Expand Down Expand Up @@ -784,13 +788,13 @@ https://www.postgresql.org/docs/9.5/catalog-pg-class.html`,
tree.DNull, // relacl
tree.DNull, // reloptions
// These columns were automatically created by pg_catalog_test's missing column generator.
tree.DNull, // relforcerowsecurity
tree.DNull, // relispartition
tree.DNull, // relispopulated
tree.DNull, // relreplident
tree.DNull, // relrewrite
tree.DNull, // relrowsecurity
tree.DNull, // relpartbound
tree.DNull, // relforcerowsecurity
tree.DNull, // relispartition
tree.DNull, // relispopulated
tree.NewDString("n"), // relreplident
tree.DNull, // relrewrite
tree.DNull, // relrowsecurity
tree.DNull, // relpartbound
// These columns were automatically created by pg_catalog_test's missing column generator.
tree.DNull, // relminmxid
)
Expand Down

0 comments on commit 33d9616

Please sign in to comment.