Skip to content

Commit

Permalink
distsql: add missing MoveToDrainings in windower
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
jordanlewis committed Aug 20, 2018
1 parent dabd49b commit 8b5f564
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions pkg/sql/distsqlrun/windower.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (w *windower) accumulateRows() (windowerState, sqlbase.EncDatumRow, *Produc

if err := w.accumulationAcc.Grow(w.Ctx, int64(row.Size())); err != nil {
w.MoveToDraining(err)
return windowerStateUnknown, nil, nil
return windowerStateUnknown, nil, w.DrainHelper()
}
if len(w.partitionBy) == 0 {
w.encodedPartitions[""] = append(w.encodedPartitions[""], w.rowAlloc.CopyRow(row))
Expand All @@ -335,7 +335,8 @@ func (w *windower) accumulateRows() (windowerState, sqlbase.EncDatumRow, *Produc
var err error
w.scratch, err = row[int(col)].Encode(&w.inputTypes[int(col)], &w.datumAlloc, preferredEncoding, w.scratch)
if err != nil {
return windowerStateUnknown, nil, &ProducerMetadata{Err: err}
w.MoveToDraining(err)
return windowerStateUnknown, nil, w.DrainHelper()
}
}
w.encodedPartitions[string(w.scratch)] = append(w.encodedPartitions[string(w.scratch)], w.rowAlloc.CopyRow(row))
Expand Down Expand Up @@ -375,17 +376,17 @@ func (w *windower) emitRow() (windowerState, sqlbase.EncDatumRow, *ProducerMetad
for !w.populated {
if err := w.cancelChecker.Check(); err != nil {
w.MoveToDraining(err)
return windowerStateUnknown, nil, nil
return windowerStateUnknown, nil, w.DrainHelper()
}

if err := w.decodePartitions(); err != nil {
w.MoveToDraining(err)
return windowerStateUnknown, nil, nil
return windowerStateUnknown, nil, w.DrainHelper()
}

if err := w.computeWindowFunctions(w.Ctx, w.evalCtx); err != nil {
w.MoveToDraining(err)
return windowerStateUnknown, nil, nil
return windowerStateUnknown, nil, w.DrainHelper()
}
w.populated = true
}
Expand All @@ -399,7 +400,7 @@ func (w *windower) emitRow() (windowerState, sqlbase.EncDatumRow, *ProducerMetad
}

w.MoveToDraining(errors.Errorf("unexpected: emitRow() is called on a windower before all input rows are accumulated"))
return windowerStateUnknown, nil, nil
return windowerStateUnknown, nil, w.DrainHelper()
}

// computeWindowFunctions computes all window functions over all partitions.
Expand Down

0 comments on commit 8b5f564

Please sign in to comment.