Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
50973: sql: add final projection during physical plan's finalization r=yuzefovich a=yuzefovich

This commit updates the contract between `DistSQLReceiver` and
`FinalizePlan` method so that the latter adds a projection which allows
the DistSQL receiver to get the rows with the desired schema. This
shouldn't have any effect on the performance, but it seems like a
cleaner approach.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jul 6, 2020
2 parents afecdb7 + 5cc44fb commit d7761ab
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 63 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_dist.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func distChangefeedFlow(
}

p := sql.MakePhysicalPlan(gatewayNodeID)
p.AddNoInputStage(corePlacement, execinfrapb.PostProcessSpec{}, []*types.T{}, execinfrapb.Ordering{})
p.AddNoInputStage(corePlacement, execinfrapb.PostProcessSpec{}, changefeedResultTypes, execinfrapb.Ordering{})
p.AddSingleGroupStage(
gatewayNodeID,
execinfrapb.ProcessorCoreUnion{ChangeFrontier: &changeFrontierSpec},
Expand Down
26 changes: 10 additions & 16 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
Expand Down Expand Up @@ -109,6 +108,7 @@ func newChangeAggregatorProcessor(
flowCtx *execinfra.FlowCtx,
processorID int32,
spec execinfrapb.ChangeAggregatorSpec,
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
ctx := flowCtx.EvalCtx.Ctx()
Expand All @@ -120,8 +120,8 @@ func newChangeAggregatorProcessor(
}
if err := ca.Init(
ca,
&execinfrapb.PostProcessSpec{},
nil, /* types */
post,
changefeedResultTypes,
flowCtx,
processorID,
output,
Expand All @@ -144,10 +144,6 @@ func newChangeAggregatorProcessor(
return ca, nil
}

func (ca *changeAggregator) OutputTypes() []*types.T {
return changefeedResultTypes
}

// Start is part of the RowSource interface.
func (ca *changeAggregator) Start(ctx context.Context) context.Context {
ctx, ca.cancel = context.WithCancel(ctx)
Expand Down Expand Up @@ -350,9 +346,9 @@ func (ca *changeAggregator) close() {
func (ca *changeAggregator) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) {
for ca.State == execinfra.StateRunning {
if !ca.changedRowBuf.IsEmpty() {
return ca.changedRowBuf.Pop(), nil
return ca.ProcessRowHelper(ca.changedRowBuf.Pop()), nil
} else if !ca.resolvedSpanBuf.IsEmpty() {
return ca.resolvedSpanBuf.Pop(), nil
return ca.ProcessRowHelper(ca.resolvedSpanBuf.Pop()), nil
}
if err := ca.tick(); err != nil {
select {
Expand Down Expand Up @@ -483,6 +479,7 @@ func newChangeFrontierProcessor(
processorID int32,
spec execinfrapb.ChangeFrontierSpec,
input execinfra.RowSource,
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
ctx := flowCtx.EvalCtx.Ctx()
Expand All @@ -495,7 +492,8 @@ func newChangeFrontierProcessor(
sf: span.MakeFrontier(spec.TrackedSpans...),
}
if err := cf.Init(
cf, &execinfrapb.PostProcessSpec{},
cf,
post,
input.OutputTypes(),
flowCtx,
processorID,
Expand Down Expand Up @@ -532,10 +530,6 @@ func newChangeFrontierProcessor(
return cf, nil
}

func (cf *changeFrontier) OutputTypes() []*types.T {
return changefeedResultTypes
}

// Start is part of the RowSource interface.
func (cf *changeFrontier) Start(ctx context.Context) context.Context {
cf.input.Start(ctx)
Expand Down Expand Up @@ -656,9 +650,9 @@ func (cf *changeFrontier) shouldProtectBoundaries() bool {
func (cf *changeFrontier) Next() (sqlbase.EncDatumRow, *execinfrapb.ProducerMetadata) {
for cf.State == execinfra.StateRunning {
if !cf.passthroughBuf.IsEmpty() {
return cf.passthroughBuf.Pop(), nil
return cf.ProcessRowHelper(cf.passthroughBuf.Pop()), nil
} else if !cf.resolvedBuf.IsEmpty() {
return cf.resolvedBuf.Pop(), nil
return cf.ProcessRowHelper(cf.resolvedBuf.Pop()), nil
}

if cf.schemaChangeBoundaryReached() && cf.shouldFailOnSchemaChange() {
Expand Down
25 changes: 18 additions & 7 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,11 +652,10 @@ type PhysicalPlan struct {
// and indexJoinNode where not all columns in the table are actually used in
// the plan, but are kept for possible use downstream (e.g., sorting).
//
// When the query is run, the output processor's PlanToStreamColMap is used
// by DistSQLReceiver to create an implicit projection on the processor's
// output for client consumption (see DistSQLReceiver.Push()). Therefore,
// "invisible" columns (e.g., columns required for merge ordering) will not
// be output.
// Before the query is run, the physical plan must be finalized, and during
// the finalization a projection is added to the plan so that
// DistSQLReceiver gets rows of the desired schema from the output
// processor.
PlanToStreamColMap []int
}

Expand Down Expand Up @@ -3443,8 +3442,8 @@ func (dsp *DistSQLPlanner) NewPlanningCtx(
return planCtx
}

// FinalizePlan adds a final "result" stage if necessary and populates the
// endpoints of the plan.
// FinalizePlan adds a final "result" stage and a final projection if necessary
// as well as populates the endpoints of the plan.
func (dsp *DistSQLPlanner) FinalizePlan(planCtx *PlanningCtx, plan *PhysicalPlan) {
// Find all MetadataTestSenders in the plan, so that the MetadataTestReceiver
// knows how many sender IDs it should expect.
Expand All @@ -3458,6 +3457,18 @@ func (dsp *DistSQLPlanner) FinalizePlan(planCtx *PlanningCtx, plan *PhysicalPlan
// Add a final "result" stage if necessary.
plan.EnsureSingleStreamOnGateway()

// Add a final projection so that DistSQLReceiver gets the rows of the
// desired schema.
projection := make([]uint32, 0, len(plan.ResultTypes))
for _, outputCol := range plan.PlanToStreamColMap {
if outputCol >= 0 {
projection = append(projection, uint32(outputCol))
}
}
plan.AddProjection(projection)
// Update PlanToStreamColMap to nil since it is no longer necessary.
plan.PlanToStreamColMap = nil

if len(metadataSenders) > 0 {
plan.AddSingleGroupStage(
dsp.gatewayNodeID,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/distsql_plan_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func (dsp *DistSQLPlanner) createStatsPlan(
[]*types.T{},
)

p.PlanToStreamColMap = []int{}
return p, nil
}

Expand Down
25 changes: 5 additions & 20 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ func (dsp *DistSQLPlanner) Run(
defer dsp.distSQLSrv.ServerConfig.Metrics.QueryStop()

recv.outputTypes = plan.ResultTypes
recv.resultToStreamColMap = plan.PlanToStreamColMap

vectorizedThresholdMet := plan.MaxEstimatedRowCount >= evalCtx.SessionData.VectorizeRowCountThreshold

Expand Down Expand Up @@ -452,10 +451,6 @@ type DistSQLReceiver struct {
// outputTypes are the types of the result columns produced by the plan.
outputTypes []*types.T

// resultToStreamColMap maps result columns to columns in the rowexec results
// stream.
resultToStreamColMap []int

// noColsRequired indicates that the caller is only interested in the
// existence of a single row. Used by subqueries in EXISTS mode.
noColsRequired bool
Expand Down Expand Up @@ -732,16 +727,16 @@ func (r *DistSQLReceiver) Push(
r.status = execinfra.ConsumerClosed
} else {
if r.row == nil {
r.row = make(tree.Datums, len(r.resultToStreamColMap))
r.row = make(tree.Datums, len(row))
}
for i, resIdx := range r.resultToStreamColMap {
err := row[resIdx].EnsureDecoded(r.outputTypes[resIdx], &r.alloc)
for i, encDatum := range row {
err := encDatum.EnsureDecoded(r.outputTypes[i], &r.alloc)
if err != nil {
r.resultWriter.SetError(err)
r.status = execinfra.ConsumerClosed
return r.status
}
r.row[i] = row[resIdx].Datum
r.row[i] = encDatum.Datum
}
}
r.tracing.TraceExecRowsResult(r.ctx, r.row)
Expand Down Expand Up @@ -887,17 +882,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery(
subqueryRecv.noColsRequired = true
typ = sqlbase.ColTypeInfoFromColTypes([]*types.T{})
} else {
// Apply the PlanToStreamColMap projection to the ResultTypes to get the
// final set of output types for the subquery. The reason this is necessary
// is that the output schema of a query sometimes contains columns necessary
// to merge the streams, but that aren't required by the final output of the
// query. These get projected out, so we need to similarly adjust the
// expected result types of the subquery here.
colTypes := make([]*types.T, len(subqueryPhysPlan.PlanToStreamColMap))
for i, resIdx := range subqueryPhysPlan.PlanToStreamColMap {
colTypes[i] = subqueryPhysPlan.ResultTypes[resIdx]
}
typ = sqlbase.ColTypeInfoFromColTypes(colTypes)
typ = sqlbase.ColTypeInfoFromColTypes(subqueryPhysPlan.ResultTypes)
}
rows = rowcontainer.NewRowContainer(subqueryMemAccount, typ, 0)
defer rows.Close(ctx)
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/opt/exec/execbuilder/testdata/distsql_agg
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,12 @@ https://cockroachdb.github.io/distsqlplan/decode.html#eJzMl1Fv4jgQx9_vU1jzBLdGwU
query T
SELECT url FROM [EXPLAIN (DISTSQL) SELECT sum(x) FROM (SELECT a, b::float + c AS x FROM data) GROUP BY a ORDER BY a]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJzMllFv2j4Uxd__n8K6T6AaBTuB0jyF_0onJEq6QKVVFapcYlEkGjM7SK2qfvcpSTcCrHY6Rx1vJPjne-49R1d5AfVjBT5MBqPBlynayBW6iMJLdDv4fjXqD8eocT6cTCffRk30dkRtHhtPzeJU4-0dw-je9y9GYX_aQydojvoT9FQciVnKmuhrFF5fof9vEENhdD6I8p8zwJCImI_ZI1fg3wIBDBQwuIDBAwwdmGFYSzHnSgmZHXnJgWH8BH4bwzJZb9Ls9QzDXEgO_guky3TFwYcpu1_xiLOYS6cNGGKesuUqL7OWy0cmn4NMGmCYrFmifNRyCGJJjCgS6QOXCjBEPIm59FHgngT0V38YBQRmrxjEJt2WVylbcPDJK64usb9YSL5gqZBOZ1dhkE0hlDGXPPZR_tQf39yNw-nd-Ho0agS0mQm_vmwEpLknZnv__TN6YOph7-pM_FYwfVfw9h5RCNm_56S4SNMVaR_ULrdFDtoiv9vKGww3adY91gzc_Wv99B39Y9ESa4fsOvJeeW-nPKkeSVIlkg5pOTQPJSlCWUMmDRpL7nWPI5Ok5kyST84krR4KWikUtOW4NYfCoLE0vtPjCAWtORT0k0PhVg-FWykUbsvxag6FQWNpfL3jCIVbcyjcTw6FVz0UXqVQeK38w8I2CAZdpZGdHUcQvJqD4P3Dz5g_SIu4WotE8UpfKO2sOR4veDEMJTZyzq-kmOdliscw5_IXMVdp8S8pHoZJ8VcmsAyTfZiUYboDk4_BXRv4zAYmVrpJR09T7bxdPezqzerq3fK0dEcPd2ys1sMGq_WwwWo9bLLaQBus7tpYfaqFe3qzejZm6WGDWXrYYJYeNplloA1mndmYRQxb1LRG7fao3SK126SWq9RulxKrZUoM29QzmHawTj9kmp42maanTabpaaNpBtxk2sFS1Zo2e_3vZwAAAP__faGWgA==
https://cockroachdb.github.io/distsqlplan/decode.html#eJzMllFv4jgUhd_3V1j3CVSjYCdQmqewW7pCoqQbqLRVhSqXWBSJxowdpFZV__soSWcIMNjpOOrwRoK_63PvObrKG6hvK_BhMhgN_pmijVyhqyi8RveD_29G_eEYNS6Hk-nkv1ETfRxRm-fGS7M41fh4xzB69P2rUdif9tAZmqP-BL0UR2KWsib6Nwpvb9Dfd4ihMLocRPnPGWBIRMzH7Jkr8O-BAAYKGFzA4AGGDswwrKWYc6WEzI685cAwfgG_jWGZrDdp9nqGYS4kB_8N0mW64uDDlD2ueMRZzKXTBgwxT9lylV-zlstnJl-DTBpgmKxZonzUcghiSYwoEukTlwowRDyJufRR4J4F9Ed_GAUEZu8YxCbdXq9StuDgk3dcXWJ_sZB8wVIhnc6uwiCbQihjLnnso_ypP757GIfTh_HtaNQIaDMTfnvdCEhzT8y2_uMremLqaa90Jn4rmB4VvK0jCiH7dc6KQpquSPvg7nJb5KAt8rOtvMFwk2bdY83A3d_WT4_oH4uWWDtkz5FCynEd3o4OUj2bpEo2HdJyaJ5OUqSzhnAaNJZs7J5GOEnN4SRfHE5aPRS0Uihoy3FrDoVBY2l856cRClpzKOgXh8KtHgq3UijcluPVHAqDxtL4eqcRCrfmULhfHAqveii8SqHwWvkXhm0QDLpKI7s4jSB4NQfB-4PfM7-QFnG1FoniO6qOVW5nzfF4wYthKLGRc34jxTy_pngMcy5_EXOVFv-S4mGYFH9lAssw2YdJGaY7MPkc3LWBL2xgYqWbdPQ01c7b1cOu3qyu3i1PS3f0cMfGaj1ssFoPG6zWwyarDbTB6q6N1edauKc3q2djlh42mKWHDWbpYZNZBtpg1oWNWcSwRU1r1G6P2i1Su01quUrtdimxWqbEsE09g2kH6_RTpulpk2l62mSanjaaZsBNph0sVa1ps_e_vgcAAP__4KmYxw==

query T
SELECT url FROM [EXPLAIN (DISTSQL) SELECT sum(x) FROM (SELECT a, b::float + c AS x FROM data) WHERE a > x GROUP BY a ORDER BY a]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJzMll1v2kwQhe_fX7GaK1AWmV2bL1_B25AWieDUELVRiqINXhEkgunaSERR_ntlb1I-3OyYYqXcge3Hc2bPmZGfIfo5BxeG3X7304is1Jxc-N4lue1-v-p3egNSOu8NR8Ov_TJ5fSRaPZbWZf1U6fWaoOTedS_6XmfUJGdkQjpDstaPBCIWZfLtS9fvEkF-rKpVW5I1-ex711fk_xsiiOefd_305xgoLMJADsSjjMC9BQYUOFCwgYIDFGowprBU4URGUaiSR55ToBeswa1SmC2Wqzi5PKYwCZUE9xniWTyX4MJI3M-lL0UglVUFCoGMxWyellmq2aNQT-1EK1AYLsUicknFYkQsAsJJGD9IFQEFXy4CqVzSts_a_K1hStoMxi8UwlW8KR_FYirBZS80v8RBWAmXVm1X3MVsHqc1-dvpGcrxQ8p1plMlpyIOlcX2TqSdnLqnAqlkkJQGCp3Bzd3AG90Nrvv9UpuXk4O6viy1WXlPzabA_RN5ENHD3qsT9RvF9ruKN-8JtZD995zpF5naqmVqb7fFMm2x322lDXqrOOmeGk7c-Wv9_B39OgV815H3ytd2yrP8I8DyjIDFKhZPh4DpIShgBhCNuvt6YTOAlNsOCzuNGWAFz0D9g2eA5w8hzxVCXrHsgkOIaNQhbBQWQqTctlv8NELICw5h44NDaOcPoZ0rhHbFcgoOIaJRh7BZWAiRcttu2acRQrvgEDY_OIRO_hA6uULoVNKvw2ODh-jSwWsVFjyk3LZDzmkEzyk4eK1_-Bn6B2m-jJbhIpK5vjCrSXMymEp9GFG4UhN5pcJJWkb_9VIuvRDIKNZ3mf7TW-hbicBtmBlhbob5Psy2YXsHZofBzWNgxo-i68fQvGqmbeOBO2bYMbuFeF0z0nUzXDfCDTPcOCYoZhgJihnGgoLQSFDMNBaU5jFBaZl3QhVZCshKwXZKZqkcYjdCI34jNGY4hiOOIzhmOcuslkM8Z-bVwhzENfNyYTUEz2yXg0w305jpZho1HcEx0804arp5s2KmZ5bMrmtNxDXzlmEtBM_smYNMN9OY6WYaNR3BMdPNOGY6N2_YfdPHL__9CgAA__--oLSZ
https://cockroachdb.github.io/distsqlplan/decode.html#eJzMll9v4jgUxd_3U1j3CVSjYCf8yxPslu4iUdINVDNVB1UusSgSJYwTJKqq332UuB3-ZPANQ9ThDUJ-vsf3nHvFK0Tf5-DCsNvv_jMiKzUnV753Te67X2_6nd6AlC57w9Hw_36ZvL8SrZ5L67J-q_T-TFDy6LpXfa8zapILMiGdIVnrVwIRizL58l_X7xJBvq2qVVuSNfnX925vyN93RBDPv-z66ccxUFiEgRyIZxmBew8MKHCgYAMFByjUYExhqcKJjKJQJa-8pkAvWINbpTBbLFdx8nhMYRIqCe4rxLN4LsGFkXicS1-KQCqrChQCGYvZPC2zVLNnoV7aiVagMFyKReSSisWIWASEkzB-kioCCr5cBFK5pG1ftPnHhSlpMxi_UQhX8aZ8FIupBJe90fwSB2ElXFq1XXFXs3mc1uQf3TOU48eU60ynSk5FHCqL7XWknXTdU4FUMkhKA4XO4O5h4I0eBrf9fqnNy0mjbq9LbVbeU7Mp8PhCnkT0tHd0on6j2D6oeHNOqIXsn3OhDzJdq5apvX0tlrkW-3mt9ILeKk5uTw0dd35bPz-gX6eA7zmipRzWUdvRwfLPAsszCxarWDydBqanoYBhQDTqNtQLGwak3HZq2HkMAyt4GOqfPAw8fwh5rhDyimUXHEJEow5ho7AQIuW23eLnEUJecAgbnxxCO38I7VwhtCuWU3AIEY06hM3CQoiU23bLPo8Q2gWHsPnJIXTyh9DJFUKnkv5NPDV4iC4dvFZhwUPKbTvknEfwnIKD1_qD_0d_Ic2X0TJcRHJH1aGTq8nlZDCVuhlRuFITeaPCSVpGf_VSLn0QyCjWvzL9pbfQPyUCt2FmhLkZ5vsw24btHZgdBzdPgRk_ia6fQvOqmbaNDXfMsGN2C_G6ZqTrZrhuhBtmuHFKUMwwEhQzjAUFoZGgmGksKM1TgtIy74QqshSQlYLtlMxSOcZuhEb8RmjMcAxHHEdwzHKWWS3HeM7Mq4U5iGvm5cJqCJ7ZLkeZbqYx0800ajqCY6abcdR082bFTM8smV3Xmohr5i3DWgie2TNHmW6mMdPNNGo6gmOmm3HMdG7esPumj9_--hEAAP__YTy24A==

# Ensure that an interesting input ordering that causes an ordered group by
# forces an ordered synchronizer. We create an index on b even though it's
Expand Down Expand Up @@ -378,4 +378,4 @@ INSERT INTO uv SELECT x, x*10 FROM generate_series(2, 8) AS g(x);
query T
SELECT url FROM [EXPLAIN (DISTSQL) SELECT sum(v) FROM data INNER LOOKUP JOIN uv ON (a=u) GROUP BY u ORDER BY u]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJzMlttv2kgUxt_3rxidJ9AOMjM2N0srkd2wFSmxKRepUYQiB48ILfFQX6JGUf73ynYUDIQ5dkEubxj7N-eb-b5zNC8Q_FiBCePeoPffhET-ivw_sq_Jbe_rcHDRt0jlsj-ejL8MquTtkyB6rDxV069cJ3RI37J6IzKw7c_TIbmy-xaJnohtkYpD_iFRlXwa2dMh-feGRMQeXfZGyc8ZUPCkKyznUQRg3gIDChwo6EDBAAoNmFFY-3IugkD68ScvCdB3f4JZp7D01lEY_z2jMJe-APMFwmW4EmDCxLlfiZFwXOFrdaDgitBZrpIya3_56PjP3Vg5UBivHS8wSU1jxPFcwokMH4QfAAU7Ck3SZTB7pSCjcFMsCJ2FAJO90vyCruTSe9PT-FhP9AQUBlJ-j9bkm1x6RHpJ-XchtMtpVz8ohxeRc7FY-GLhhNLX2M75dGMPbN8VvnBNkjxdWDd3lj25s6aDQaXLq_GxTa8rXb26o2ZT4P6ZPDjBw87S8WFuFOsHFW_WkamQ3XX-ThdSbauxVzu7Lba3Lfa-rWSD6aFzqgiA8dv6-QH9lqzJtca3HTlUvrFVnuVvCJanITRW03jSEixticIdgSjKdESzjI5A5GSjw86jI9iJO6JZckfw_JHkuSLJa5p-VCQRRZlItsqIJCIn6x0_j0jyE0eyVXIk9fyR1HNFUq9pxlGRRBRlItkuI5KInKx3-nlEUj9xJNslR9LIH0kjVySNWnLFLBZDREUmhp0yYojIyfplnEcMjRPHsPMHr68fSBuJYC29QOS6mdbjzQl3IdLDCGTkz8XQl_OkTPpoJ1zyhyuCMH3L0oe-l76KBWZhpoS5Gua7MMvC-hbMisHtY2DGj6Kbx9C8rqZ15YEbathQu4V43VDSTTXcVMItNdw6JihqGAmKGsaCgtBIUNQ0FpT2MUHpqGdCHRkKyEjBZsreUCliN0IjfiM0ZjiGI44jOGY52xstRTxn6tHCDMQ19XBhDQTfmy6FTFfTmOlqGjUdwTHT1ThqunqyYqbvDZlt19qIa-opwzoIvjdnCpmupjHT1TRqOoJjpqtxzHSunrC7ps9e__oVAAD__7N8zns=
https://cockroachdb.github.io/distsqlplan/decode.html#eJzMlttv2kgYxd_3rxh9T6AdZGZsbpZWIrthK1JiUy5SowhFDh4RWuKhvkSNovzvle0oGAjz2QW5vGHwb-bMnPMd8QLBjxWYMO4Nev9NSOSvyP8j-5rc9r4OBxd9i1Qu--PJ-MugSt5eCaLHylM1fct1Qof0Las3IgPb_jwdkiu7b5HoidgWqTjkHxJVyaeRPR2Sf29IROzRZW-UfJwBBU-6wnIeRQDmLTCgwIGCDhQMoNCAGYW1L-ciCKQfv_KSAH33J5h1CktvHYXx1zMKc-kLMF8gXIYrASZMnPuVGAnHFb5WBwquCJ3lKtlm7S8fHf-5GysHCuO14wUmqWmMOJ5LOJHhg_ADoGBHoUm6DGavFGQUbjYLQmchwGSvNL-gK7n03vQ0PtYTPQGFgZTfozX5JpcekV6y_bsQ2uW0qx-Uw4vIuVgsfLFwQulrbOd-urEHtu8KX7gmSZ4urJs7y57cWdPBoNLl1fjapteVrl7dUbPZ4P6ZPDjBw87S8WVuFOsHFW_WkamQ3XX-ThdSHauxt3f2WGzvWOz9WMkB00vnVBEA47f18wP6LVmTa43vOIIFsbGlg-WfDJZnMjRW03gyGyydjcKjgSjKjEazjNFA5GQzxM5jNNiJR6NZ8mjw_JHkuSLJa5p-VCQRRZlItsqIJCIn6x0_j0jyE0eyVXIk9fyR1HNFUq9pxlGRRBRlItkuI5KInKx3-nlEUj9xJNslR9LIH0kjVySNWvJfs1gMERWZGHbKiCEiJ-uXcR4xNE4cw84f_B_7gbSRCNbSC8SWqkMr1-PDCXch0ssIZOTPxdCX82Sb9NFOuOQLVwRh-itLH_pe-lMsMAszJczVMN-FWRbWt2BWDG4fAzN-FN08huZ1Na0rL9xQw4baLcTrhpJuquGmEm6p4dYxQVHDSFDUMBYUhEaCoqaxoLSPCUpH3Ql1pBSQSsE6Za9UitiN0IjfCI0ZjuGI4wiOWc72qqWI50xdLcxAXFOXC2sg-F67FDJdTWOmq2nUdATHTFfjqOnqZsVM3yuZbdfaiGvqlmEdBN_rmUKmq2nMdDWNmo7gmOlqHDOdqxt21_TZ61-_AgAA___ghtDC
2 changes: 1 addition & 1 deletion pkg/sql/opt/exec/execbuilder/testdata/distsql_numtables
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ https://cockroachdb.github.io/distsqlplan/decode.html#eJyMkE1L60AUhvf3VxzOpbSFuT
query T
SELECT url FROM [EXPLAIN (DISTSQL) SELECT str FROM NumToStr WHERE y % 1000 = 0 AND str LIKE '%i%' ORDER BY y]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJzMlEFr20AQhe_9FcOAiUzXeCXLJiwUlNZKK-rKqWxoS9BBtYYgkLXq7gpqjP970Spt4pIUFR-cm2bnPd58A6M96h8lClyFi_DdGhpVwnWy_AS34debxVUUgzOPVuvV58UQ7iXaqE5SNVsj2-rLhzAJwXF2MACXcz6EN8CHcBXPwWn7i-hjCBeDYnAxhGUyDxN4-w12KTKsZE5xtiWN4hZdZOghwwky9JHhFFOGtZIb0lqqVrK3hij_iYIzLKq6Me1zynAjFaHYoylMSShwnX0vKaEsJzXmyDAnkxWljalVsc3ULvg9PjJc1VmlBYzGHuet-rooDSkBjhO490xCiCheX1q0P9-WMPAeAQohVuskit8PkeGyMQICjwUupgeGsjEPA2uT3REK98CegXpgkSonRfkxReC9xvTwBHksR7IeT4_Uz6V7R-lu_5W6fVdqNzoa--fdq9efzOtN5luy2XnJJv3JJr3JZpbs8rxkfn8yvzdZyzR6IQf-BFRCupaVpl7Hy9vrp_yOur-Flo3a0I2SGxvTlUvrsw85adN13a6IKtuyAz42u_80z47M_G-zd0ry5BSzf4p5-l_m9PDqVwAAAP__fh4tNA==
https://cockroachdb.github.io/distsqlplan/decode.html#eJzMlFGLm04Uxd__n2K4EFb5T8hoTFgGCm4bt5WmZqtCWxYfbLwsgnHszAgNId-9OG67a9kUSx7SN-_cc7znd2HmAOpbBRySYB28SUkrK3Ibbz6Q--Dz3fomjIi1CpM0-bi2yaNEadlL6nanRVd9ehfEAbGsPZkQhzFmk1eE2eQmWhGr66_D9wG5mpSTK5ts4lUQk9dfyD4DCrUoMMp3qIDfgwMUXKAwBwoeUFhARqGRYotKCdlJDsYQFt-BMwpl3bS6O84obIVE4AfQpa4QOKT51wpjzAuUMwYUCtR5WZkxjSx3udz7P-MDhaTJa8XJdOYy1qlvy0qj5MSyfOeRiXMeRum1Qfv1bQh99xkg5zxJ4zB6awOFTas58V3qO5AdKYhWPwVWOn9A4M6RnoB6YhGyQInFkMJ3_4fs-AJ5JKaimS2G6j7J6RjuIIYzfrfO2N2a1U5n3mUX7I4nc0eTeYZseVmy-Xiy-WiypSG7viyZN57MG03WMU3_kZv-AlSMqhG1wgHPqT-z7hnA4gH7Z0OJVm7xToqtGdOXG-MzBwUq3Xedvghr0zIBn5udP5qXAzP73eyeM3l-jtk7x7z4K3N2_O9HAAAA__-YrS97

#
# -- Join tests --
Expand Down
17 changes: 10 additions & 7 deletions pkg/sql/plan_node_to_row_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,21 @@ type planNodeToRowSource struct {
func makePlanNodeToRowSource(
source planNode, params runParams, fastPath bool,
) (*planNodeToRowSource, error) {
nodeColumns := planColumns(source)

types := make([]*types.T, len(nodeColumns))
for i := range nodeColumns {
types[i] = nodeColumns[i].Typ
var typs []*types.T
if fastPath {
// If our node is a "fast path node", it means that we're set up to
// just return a row count meaning we'll output a single row with a
// single INT column.
typs = []*types.T{types.Int}
} else {
typs = getTypesFromResultColumns(planColumns(source))
}
row := make(sqlbase.EncDatumRow, len(nodeColumns))
row := make(sqlbase.EncDatumRow, len(typs))

return &planNodeToRowSource{
node: source,
params: params,
outputTypes: types,
outputTypes: typs,
row: row,
fastPath: fastPath,
}, nil
Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/rowexec/bulk_row_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,6 @@ func (sp *bulkRowWriter) work(ctx context.Context) error {
return g.Wait()
}

func (sp *bulkRowWriter) OutputTypes() []*types.T {
return CTASPlanResultTypes
}

func (sp *bulkRowWriter) ingestLoop(ctx context.Context, kvCh chan row.KVBatch) error {
writeTS := sp.spec.Table.CreateAsOfTime
const bufferSize = 64 << 20
Expand Down
Loading

0 comments on commit d7761ab

Please sign in to comment.