Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
50835: sql: distribute the plan for inverted filtering when possible r=sumeerbhola a=sumeerbhola

The testing of this change is inadequate, since we can't use SPLIT AT
on an inverted index key.

Release note: None

50974: stmtdiagnostics: break into chunks r=RaduBerinde a=RaduBerinde

Implement breaking support bundles into chunks. The bundles contain
traces which can be large, resulting in "command is too large" errors.

Release note (bug fix): better support for large statement diagnostic
bundles.

Co-authored-by: sumeerbhola <[email protected]>
Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
3 people committed Jul 6, 2020
3 parents b2e4e2e + 47af145 + d683dff commit 6b7b297
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 18 deletions.
78 changes: 73 additions & 5 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,8 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
}
return checkSupportForPlanNode(n.input)

// TODO(sumeer): When the filtering is only a union expression, we should
// distribute, as outlined in the discussion on PR #50158.
case *invertedFilterNode:
return cannotDistribute, nil
return checkSupportForInvertedFilterNode(n)

case *invertedJoinNode:
if err := checkExpr(n.onExpr); err != nil {
Expand Down Expand Up @@ -520,6 +518,37 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) {
}
}

func checkSupportForInvertedFilterNode(n *invertedFilterNode) (distRecommendation, error) {
rec, err := checkSupportForPlanNode(n.input)
if err != nil {
return cannotDistribute, err
}
// When filtering is a union of inverted spans, it is distributable: place
// an inverted filterer on each node, which produce the primary keys in
// arbitrary order, and de-duplicate the PKs at the next stage.
// The expression is a union of inverted spans iff all the spans have been
// promoted to FactoredUnionSpans, in which case the Left and Right
// InvertedExpressions are nil.
//
// TODO(sumeer): Even if the filtering cannot be distributed, the
// placement of the inverted filter could be optimized. Specifically, when
// the input is a single processor (because the TableReader is reading
// span(s) that are all on the same node), we can place the inverted
// filterer on that input node. Currently, this approach fails because we
// don't know whether the input is a single processor at this stage, and if
// we blindly returned shouldDistribute, we encounter situations where
// remote TableReaders are feeding an inverted filterer which runs into an
// encoding problem with inverted columns. The remote code tries to decode
// the inverted column as the original type (e.g. for geospatial, tries to
// decode the int cell-id as a geometry) which obviously fails -- this is
// related to #50659. Fix this in the distSQLSpecExecFactory.
filterRec := cannotDistribute
if n.expression.Left == nil && n.expression.Right == nil {
filterRec = shouldDistribute
}
return rec.compose(filterRec), nil
}

//go:generate stringer -type=NodeStatus

// NodeStatus represents a node's health and compatibility in the context of
Expand Down Expand Up @@ -2236,9 +2265,48 @@ func (dsp *DistSQLPlanner) createPlanForInvertedFilter(
InvertedExpr: *n.expression.ToProto(),
}

plan.AddSingleGroupStage(dsp.gatewayNodeID,
// Cases:
// - Last stage is a single processor (local or remote): Place the inverted
// filterer on that last stage node. Due to the behavior of
// checkSupportForInvertedFilterNode, the remote case can only happen for
// a distributable filter.
// - Last stage has multiple processors that are on different nodes: Filtering
// must be distributable. Place an inverted filterer on each node, which
// produces the primary keys in arbitrary order, and de-duplicate the PKs
// at the next stage.
if len(plan.ResultRouters) == 1 {
// Last stage is a single processor.
lastNodeID := plan.Processors[plan.ResultRouters[0]].Node
plan.AddSingleGroupStage(lastNodeID,
execinfrapb.ProcessorCoreUnion{
InvertedFilterer: invertedFiltererSpec,
},
execinfrapb.PostProcessSpec{}, plan.ResultTypes)
return plan, nil
}
// Must be distributable.
distributable := n.expression.Left == nil && n.expression.Right == nil
if !distributable {
return nil, errors.Errorf("expected distributable inverted filterer")
}
// Instantiate one inverted filterer for every stream.
plan.AddNoGroupingStage(
execinfrapb.ProcessorCoreUnion{InvertedFilterer: invertedFiltererSpec},
execinfrapb.PostProcessSpec{}, plan.ResultTypes, execinfrapb.Ordering{})
// De-duplicate the PKs. Note that the inverted filterer output includes
// the inverted column always set to NULL, so we exclude it from the
// distinct columns.
distinctColumns := make([]uint32, 0, len(n.resultColumns)-1)
for i := 0; i < len(n.resultColumns); i++ {
if i == n.invColumn {
continue
}
distinctColumns = append(distinctColumns, uint32(i))
}
plan.AddSingleGroupStage(
dsp.gatewayNodeID,
execinfrapb.ProcessorCoreUnion{
InvertedFilterer: invertedFiltererSpec,
Distinct: &execinfrapb.DistinctSpec{DistinctColumns: distinctColumns},
},
execinfrapb.PostProcessSpec{}, plan.ResultTypes)
return plan, nil
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/explain_bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"context"
"fmt"
"io"
"math/rand"
"regexp"
"sort"
"strings"
Expand Down Expand Up @@ -43,6 +44,13 @@ func TestExplainAnalyzeDebug(t *testing.T) {
base := "statement.txt trace.json trace.txt trace-jaeger.json env.sql"
plans := "schema.sql opt.txt opt-v.txt opt-vv.txt plan.txt"

// Set a small chunk size to test splitting into chunks. The bundle files are
// on the order of 10KB.
r.Exec(t, fmt.Sprintf(
"SET CLUSTER SETTING sql.stmt_diagnostics.bundle_chunk_size = '%d'",
5000+rand.Intn(10000),
))

t.Run("basic", func(t *testing.T) {
rows := r.QueryStr(t, "EXPLAIN ANALYZE (DEBUG) SELECT * FROM abc WHERE c=1")
checkBundle(
Expand Down
145 changes: 145 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/inverted_filter_geospatial_dist
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# LogicTest: 5node

statement ok
CREATE TABLE geo_table(
k int primary key,
geom geometry,
INVERTED INDEX geom_index(geom)
)

statement ok
INSERT INTO geo_table VALUES
(1, 'POINT(1 1)'),
(2, 'LINESTRING(1 1, 2 2)'),
(3, 'POINT(3 3)'),
(4, 'LINESTRING(4 4, 5 5)'),
(5, 'LINESTRING(40 40, 41 41)'),
(6, 'POLYGON((1 1, 5 1, 5 5, 1 5, 1 1))'),
(7, 'LINESTRING(1 1, 3 3)')

query I
SELECT k FROM geo_table WHERE ST_Intersects('MULTIPOINT((2.2 2.2), (3.0 3.0))'::geometry, geom) ORDER BY k
----
3
6
7

query I
SELECT k FROM geo_table WHERE ST_CoveredBy('MULTIPOINT((2.2 2.2), (3.0 3.0))'::geometry, geom) ORDER BY k
----
6
7

# Not distributed.
query T
SELECT url FROM [EXPLAIN (DISTSQL)
SELECT k FROM geo_table WHERE ST_Intersects('MULTIPOINT((2.2 2.2), (3.0 3.0))'::geometry, geom) ORDER BY k]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJyUU19P2z4Uff99iqv7Qquf19pOWqifykbYMhXK0rINkQplzRWLKHZmuxMT6nef0nRAYHTghyT3z7nnHDu-RfdjgQon0Sh6N4WlXcBhMj6C8-jryWg_PobWQTyZTj6N2rBpuaobLslc-OzbguDLhyiJwPmLQnuyjubetXaOTkfT-GQcH09bLdmRIDuyzaAVdDgEHd5u7yj1PhofRdPkjFWzrtswTg6iBN6ewdUMGWqT03F2TQ7VOQqcMSytmZNzxlap23VDnN-g4gwLXS59lZ4xnBtLqG7RF35BqHBaiUwoy8l2OTLMyWfFYj224r0odE43wzs7yHBSZtop6ArRkwMpejzs8_5e2B_s9t_8JbkLmc4h3APjv5N1OFsxNEt_r8j57JJQiRV7uepY_yTrKT8sFp4s2a5oSv9Tj25KC0bDUChwlW5wPrNepZimwW4vTbnkacr5_aO3fxCdpgik8390fU4RHnljOF56BUPxrEv5GpcfTaE3RyOb_kpbXGf2V-NcNtRsKJ9lD17DPjHWk-0GTeah-B8Z1vuuHv_WXPCQ10tu3oKL-mOwP7hbXIRP4vvOxtoLn8Q76uHtGMr2C_Y9fI3zhFxptKOG8-cm89WMIeWXVN87Z5Z2TifWzNc0dThe49aJnJyvq6IOYl2XKoEPwWIrWG4Hy63gYDs42AoOH4Fnq_9-BwAA___NL5NZ

# The inverted filterer handles five inverted index rows with decoded
# datums, where the first column is the PK (k) and the second is the cellid
# and is sorted in cellid order.
# 7, 1152921521786716160
# 2, 1152921526081683456
# 6, 1152921573326323712
# 7, 1152921574400065536
# 3, 1152921574740070469
# To test distribution, we inject a split after the third row and relocate
# the second part of the inverted index. Both inverted filterers will produce 7,
# which will need to be de-duplicated.

statement ok
ALTER INDEX geo_table@geom_index SPLIT AT VALUES (1152921574000000000)

query TI colnames,rowsort
SELECT replicas, lease_holder FROM [SHOW RANGES FROM INDEX geo_table@geom_index]
----
replicas lease_holder
{1} 1
{1} 1

# Not distributed, since both ranges of the index are on the same node,
# which is also the gateway node.
query T
SELECT url FROM [EXPLAIN (DISTSQL)
SELECT k FROM geo_table WHERE ST_Intersects('MULTIPOINT((2.2 2.2), (3.0 3.0))'::geometry, geom) ORDER BY k]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJyUU19P2z4Uff99iqv7Qquf19pOWqifykbYMhXK0rINkQplzRWLKHZmuxMT6nef0nRAYHTghyT3z7nnHDu-RfdjgQon0Sh6N4WlXcBhMj6C8-jryWg_PobWQTyZTj6N2rBpuaobLslc-OzbguDLhyiJwPmLQnuyjubetXaOTkfT-GQcH09bLdmRIDuyzaAVdDgEHd5u7yj1PhofRdPkjFWzrtswTg6iBN6ewdUMGWqT03F2TQ7VOQqcMSytmZNzxlap23VDnN-g4gwLXS59lZ4xnBtLqG7RF35BqHBaiUwoy8l2OTLMyWfFYj224r0odE43wzs7yHBSZtop6ArRkwMpejzs8_5e2B_s9t_8JbkLmc4h3APjv5N1OFsxNEt_r8j57JJQiRV7uepY_yTrKT8sFp4s2a5oSv9Tj25KC0bDUChwlW5wPrNepZimwW4vTbnkacr5_aO3fxCdpgik8390fU4RHnljOF56BUPxrEv5GpcfTaE3RyOb_kpbXGf2V-NcNtRsKJ9lD17DPjHWk-0GTeah-B8Z1vuuHv_WXPCQ10tu3oKL-mOwP7hbXIRP4vvOxtoLn8Q76uHtGMr2C_Y9fI3zhFxptKOG8-cm89WMIeWXVN87Z5Z2TifWzNc0dThe49aJnJyvq6IOYl2XKoEPwWIrWG4Hy63gYDs42AoOH4Fnq_9-BwAA___NL5NZ

statement ok
ALTER INDEX geo_table@geom_index EXPERIMENTAL_RELOCATE VALUES (ARRAY[2], 1152921574000000000)

query TTTI colnames,rowsort
SELECT start_key, end_key, replicas, lease_holder FROM [SHOW RANGES FROM INDEX geo_table@geom_index]
----
start_key end_key replicas lease_holder
NULL /1152921574000000000 {1} 1
/1152921574000000000 NULL {2} 2

# Distributed.
query I
SELECT k FROM geo_table WHERE ST_Intersects('MULTIPOINT((2.2 2.2), (3.0 3.0))'::geometry, geom) ORDER BY k
----
3
6
7

query T
SELECT url FROM [EXPLAIN (DISTSQL)
SELECT k FROM geo_table WHERE ST_Intersects('MULTIPOINT((2.2 2.2), (3.0 3.0))'::geometry, geom) ORDER BY k]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJzUU19v2j4Uff99Cuu-FPTLwHb-UPxEN9ItEy1doNuqBlUZueqiUpvZZupU8d2nEFYaStOyt_khyb2-x_cc5557MD9mIGAUDsJ3Y7LQM3IcD0_IZfj1bHAUnZJGPxqNR58GTbIuuSkLrlFd2fTbDMmXD2EcEmOvcmlRG5xa0zg4OR-Mo7NhdDpuNHiLE97iTYc03BYlbos2mwdCvA-HJ-E4vnCKs26bZBj3w5i8vSA3E3BAqgxP01s0IC6BgQMcJg7MtZqiMUoX6ftVUZTdgaAO5HK-sEV64sBUaQRxDza3MwQB44JojGmGuk3BgQxtms9WRxe9r3KZ4V3vQRI4MJqn0gjSZsznXc586gU0OPSCbid4syPZIanMCOsSZb-jNjBZOqAWdsPI2PQaQbCl83rWkfyJ2mJ2nM8satRtXqX-Zz-8m2uiJOkxQUzBmxibaisSSBK34ycJ5TRJKN08_KN-eJ4AQZm9UPU5AbLS5h2-pI0_q20jaSGVzlBjVlEyWe5Q38-NzeXUtr2q6l4xDMOFFaTHnuXi7nPPH1Uu18PhV3vNdX6b6l-VyVi3dnr82e7ePt1HSlvU7WBb5f_gQPnnxba5KKMeLRdfvxll5Uf3qPuwKPOexJvKyjr0nsQH4rFHe7z5inv3K8rZ613J_sqVHa-gHfi--8iVm2TpSv7i5LJ9WD9xpfuPunKHthjNXEmDW-7cfTItXIvZNZYWN2qhp3im1XTVpgyHK9wqkaGx5S4rg0iWWwXBx2BWC-YVMNsG81qwW9_ZrQV79WCvFtypB_u14KAeHOx1YZPlf78DAAD__58IeKk=

# Data is distributed, but the filterer can't be distributed since it is not a union.
query I
SELECT k FROM geo_table WHERE ST_CoveredBy('MULTIPOINT((2.2 2.2), (3.0 3.0))'::geometry, geom) ORDER BY k
----
6
7

query T
SELECT url FROM [EXPLAIN (DISTSQL)
SELECT k FROM geo_table WHERE ST_CoveredBy('MULTIPOINT((2.2 2.2), (3.0 3.0))'::geometry, geom) ORDER BY k]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJyUU99P2zAQft9fcboXWs1rbScU6qeyEbZMhbK004ZIhUJzYhEhzmwXgVD_9ylNBwTWjvohyf347vvuLn5A-ztHheNgGHyawNzkcBSNjuE8-Hk6PAhPoHUYjifjb8M2rFKu64Qr0hcuucwJfnwJogCsu5jpWzKUXt63do6_Dyfh6Sg8mbRasiNBdmSbQcvrcPA6vN3eUepzMDoOJtEZq0rdtGEUHQYRfDyD6ykyLHRKJ8kNWVTnKHDKsDR6RtZqU7kelglheoeKM8yKcu4q95ThTBtC9YAuczmhwkmlMaIkJdPlyDAll2T5smzFe5EVKd0NHrtBhuMyKayCrhC7si_FLvd7vLfv9_p7vQ__cO5BUqTg74N2v8hYnC4Y6rl7UmRdckWoxIK9XXVY3JJxlB5luSNDpiua0v_Gg7vSgC5gIBTYSjdYlxinYoxjb283jrnkccz5_x4IVKRbokSM8KJ3hqO5UzAQa6cgt5nCV50Vq9XJZv-lyW4Sc9_Y24qaDeRadm8b9rE2jkzXazIPxHtkWO9FvfjrueA-r49cvQUX9Uf_oP94uPBf2U-ZjbPvv7J31PPLM5DtN4zd36bxiGypC0uNxtdV5ospQ0qvqL6WVs_NjE6Nni1panO0xC0dKVlXR0VthEUdqgQ-B4uNYLkZLDeCvc1gbyPYfwGeLt79CQAA__84vJpP

# Move all the index data that will be read to node 2 while the query executes
# at node 1. The filtering moves to node 2 when it is distributable.

statement ok
ALTER INDEX geo_table@geom_index EXPERIMENTAL_RELOCATE VALUES (ARRAY[2], 1)

query TTTI colnames,rowsort
SELECT start_key, end_key, replicas, lease_holder FROM [SHOW RANGES FROM INDEX geo_table@geom_index]
----
start_key end_key replicas lease_holder
NULL /1152921574000000000 {2} 2
/1152921574000000000 NULL {2} 2

query I
SELECT k FROM geo_table WHERE ST_Intersects('MULTIPOINT((2.2 2.2), (3.0 3.0))'::geometry, geom) ORDER BY k
----
3
6
7

# Filtering is placed at node 2.
query T
SELECT url FROM [EXPLAIN (DISTSQL)
SELECT k FROM geo_table WHERE ST_Intersects('MULTIPOINT((2.2 2.2), (3.0 3.0))'::geometry, geom) ORDER BY k]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJyUU9FP204Mfv_9FZZfaPU72rtLWug9lY2wZSotS8s2RCqUNRaLKLns7joxof7vU5oOCIwAfmhj-_tsfz75Fu3PJSqcBqPg_QxWZglH0eQYzoNvJ6ODcAytw3A6m34etWELuaoAl6QvXPJ9SfD1YxAFYN1FljsylhbOtnaOT0ez8GQSjmetluxIkB3ZZtDyOhy8Dm-3d5T6EEyOg1l0xspa122YRIdBBO_O4GqODHOd0ji5JovqHAUylDhnWBi9IGu1KcO3G1CY3qDiDLO8WLkyPGe40IZQ3aLL3JJQ4Vjv6qLrI8OUXJItN7A1Q71y9yTrkktC1VuzB4VFc-FZuYGIkpRMl9fKYynqIstTuhne7QoZTosktwq6QvTkQIoe9_u8v-_3B3v93X8E9yDJU_D3QbsfZCw-N7V4y9Rh_ouMo_QoWzoyZLqiPvrffHBTGNA5DIUCW84N1iXGqRjj2NvrxTGXPI45v__pHRwGpzEC5ekLqC8xwiNtDCcrp2AonlUp36Lyk87y7dPIur7CZNeJ-V17l21rNpTPdvfe0n2qjSPT9eqdh-J_ZFjtXT2-GS64zyuT23_BRfUxOBjcGRf-E_8eWbN9_4m_ox6e3lC2X7F3v6b8hWOLyBY6t_Sqa-PrOUNKL6k6aKtXZkEnRi82bSp3suFtAilZV2V7lRPmVaoc8CFZNJJlM1k2kr1mstdI9pvJfiOZPyLP1__9CQAA__9URsVw

query I
SELECT k FROM geo_table WHERE ST_CoveredBy('MULTIPOINT((2.2 2.2), (3.0 3.0))'::geometry, geom) ORDER BY k
----
6
7

# Filtering is at gateway node since the filter is not distributable.
query T
SELECT url FROM [EXPLAIN (DISTSQL)
SELECT k FROM geo_table WHERE ST_CoveredBy('MULTIPOINT((2.2 2.2), (3.0 3.0))'::geometry, geom) ORDER BY k]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJyUU99P2zAQft9fcboXWs1rbScU6qeyEbZMhbK004ZIhUJzYhEhzmwXgVD_9ylNBwTWjvohyf347vvuLn5A-ztHheNgGHyawNzkcBSNjuE8-Hk6PAhPoHUYjifjb8M2rFKu64Qr0hcuucwJfnwJogCsu5jpWzKUXt63do6_Dyfh6Sg8mbRasiNBdmSbQcvrcPA6vN3eUepzMDoOJtEZq0rdtGEUHQYRfDyD6ykyLHRKJ8kNWVTnKHDKsDR6RtZqU7kelglheoeKM8yKcu4q95ThTBtC9YAuczmhwkmlMaIkJdPlyDAll2T5smzFe5EVKd0NHrtBhuMyKayCrhC7si_FLvd7vLfv9_p7vQ__cO5BUqTg74N2v8hYnC4Y6rl7UmRdckWoxIK9XXVY3JJxlB5luSNDpiua0v_Gg7vSgC5gIBTYSjdYlxinYoxjb283jrnkccz5_x4IVKRbokSM8KJ3hqO5UzAQa6cgt5nCV50Vq9XJZv-lyW4Sc9_Y24qaDeRadm8b9rE2jkzXazIPxHtkWO9FvfjrueA-r49cvQUX9Uf_oP94uPBf2U-ZjbPvv7J31PPLM5DtN4zd36bxiGypC0uNxtdV5ospQ0qvqL6WVs_NjE6Nni1panO0xC0dKVlXR0VthEUdqgQ-B4uNYLkZLDeCvc1gbyPYfwGeLt79CQAA__84vJpP
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# LogicTest: !fakedist-metadata !3node-tenant
# LogicTest: local local-vec-off

# TODO(sumeer): move these to opt/exec/execbuilder/testdata since logic tests
# are not supposed to change when a plan changes.
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/opt/optbuilder/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ func getIndexColumnNamesAndTypes(index cat.Index) (colNames []string, colTypes [
colNames[i] = string(c.ColName())
colTypes[i] = c.DatumType()
}
if index.IsInverted() && index.GeoConfig() != nil {
// TODO(sumeer): special case Array too. JSON is harder since the split
// needs to be a Datum and the JSON inverted column is not.
//
// Geospatial inverted index. The first column is the inverted column and
// is an int.
colTypes[0] = types.Int
}
return colNames, colTypes
}

Expand Down
39 changes: 27 additions & 12 deletions pkg/sql/stmtdiagnostics/statement_diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,23 @@ import (
"github.com/cockroachdb/errors"
)

var stmtDiagnosticsPollingInterval = settings.RegisterDurationSetting(
var pollingInterval = settings.RegisterDurationSetting(
"sql.stmt_diagnostics.poll_interval",
"rate at which the stmtdiagnostics.Registry polls for requests, set to zero to disable",
10*time.Second)

var bundleChunkSize = settings.RegisterValidatedByteSizeSetting(
"sql.stmt_diagnostics.bundle_chunk_size",
"chunk size for statement diagnostic bundles",
1024*1024,
func(val int64) error {
if val < 16 {
return errors.Errorf("chunk size must be at least 16 bytes")
}
return nil
},
)

// Registry maintains a view on the statement fingerprints
// on which data is to be collected (i.e. system.statement_diagnostics_requests)
// and provides utilities for checking a query against this list and satisfying
Expand Down Expand Up @@ -103,7 +115,7 @@ func (r *Registry) poll(ctx context.Context) {
deadline time.Time
pollIntervalChanged = make(chan struct{}, 1)
maybeResetTimer = func() {
if interval := stmtDiagnosticsPollingInterval.Get(&r.st.SV); interval <= 0 {
if interval := pollingInterval.Get(&r.st.SV); interval <= 0 {
// Setting the interval to a non-positive value stops the polling.
timer.Stop()
} else {
Expand All @@ -124,7 +136,7 @@ func (r *Registry) poll(ctx context.Context) {
lastPoll = timeutil.Now()
}
)
stmtDiagnosticsPollingInterval.SetOnChange(&r.st.SV, func() {
pollingInterval.SetOnChange(&r.st.SV, func() {
select {
case pollIntervalChanged <- struct{}{}:
default:
Expand Down Expand Up @@ -394,27 +406,30 @@ func (r *Registry) insertStatementDiagnostics(
errorVal = tree.NewDString(collectionErr.Error())
}

bundleChunksVal := tree.DNull
if len(bundle) != 0 {
// Insert the bundle into system.statement_bundle_chunks.
// TODO(radu): split in chunks.
bundleChunksVal := tree.NewDArray(types.Int)
for len(bundle) > 0 {
chunkSize := int(bundleChunkSize.Get(&r.st.SV))
chunk := bundle
if len(chunk) > chunkSize {
chunk = chunk[:chunkSize]
}
bundle = bundle[len(chunk):]

// Insert the chunk into system.statement_bundle_chunks.
row, err := r.ie.QueryRowEx(
ctx, "stmt-bundle-chunks-insert", txn,
sqlbase.InternalExecutorSessionDataOverride{User: security.RootUser},
"INSERT INTO system.statement_bundle_chunks(description, data) VALUES ($1, $2) RETURNING id",
"statement diagnostics bundle",
tree.NewDBytes(tree.DBytes(bundle)),
tree.NewDBytes(tree.DBytes(chunk)),
)
if err != nil {
return err
}
chunkID := row[0].(*tree.DInt)

array := tree.NewDArray(types.Int)
if err := array.Append(chunkID); err != nil {
if err := bundleChunksVal.Append(chunkID); err != nil {
return err
}
bundleChunksVal = array
}

collectionTime := timeutil.Now()
Expand Down

0 comments on commit 6b7b297

Please sign in to comment.