diff --git a/pkg/sql/distsqlrun/windower.go b/pkg/sql/distsqlrun/windower.go index 6160fd963844..c5c6dff1e527 100644 --- a/pkg/sql/distsqlrun/windower.go +++ b/pkg/sql/distsqlrun/windower.go @@ -320,7 +320,7 @@ func (w *windower) accumulateRows() (windowerState, sqlbase.EncDatumRow, *Produc if err := w.accumulationAcc.Grow(w.Ctx, int64(row.Size())); err != nil { w.MoveToDraining(err) - return windowerStateUnknown, nil, nil + return windowerStateUnknown, nil, w.DrainHelper() } if len(w.partitionBy) == 0 { w.encodedPartitions[""] = append(w.encodedPartitions[""], w.rowAlloc.CopyRow(row)) @@ -335,7 +335,8 @@ func (w *windower) accumulateRows() (windowerState, sqlbase.EncDatumRow, *Produc var err error w.scratch, err = row[int(col)].Encode(&w.inputTypes[int(col)], &w.datumAlloc, preferredEncoding, w.scratch) if err != nil { - return windowerStateUnknown, nil, &ProducerMetadata{Err: err} + w.MoveToDraining(err) + return windowerStateUnknown, nil, w.DrainHelper() } } w.encodedPartitions[string(w.scratch)] = append(w.encodedPartitions[string(w.scratch)], w.rowAlloc.CopyRow(row)) @@ -375,17 +376,17 @@ func (w *windower) emitRow() (windowerState, sqlbase.EncDatumRow, *ProducerMetad for !w.populated { if err := w.cancelChecker.Check(); err != nil { w.MoveToDraining(err) - return windowerStateUnknown, nil, nil + return windowerStateUnknown, nil, w.DrainHelper() } if err := w.decodePartitions(); err != nil { w.MoveToDraining(err) - return windowerStateUnknown, nil, nil + return windowerStateUnknown, nil, w.DrainHelper() } if err := w.computeWindowFunctions(w.Ctx, w.evalCtx); err != nil { w.MoveToDraining(err) - return windowerStateUnknown, nil, nil + return windowerStateUnknown, nil, w.DrainHelper() } w.populated = true } @@ -399,7 +400,7 @@ func (w *windower) emitRow() (windowerState, sqlbase.EncDatumRow, *ProducerMetad } w.MoveToDraining(errors.Errorf("unexpected: emitRow() is called on a windower before all input rows are accumulated")) - return windowerStateUnknown, nil, nil + return windowerStateUnknown, nil, w.DrainHelper() } // computeWindowFunctions computes all window functions over all partitions.