From f943df75a31c0608a38f8a940ef492be0351d73e Mon Sep 17 00:00:00 2001
From: "Robert (Bobby) Evans" <bobby@apache.org>
Date: Wed, 28 Jul 2021 07:42:52 -0500
Subject: [PATCH 1/2] Partial support for time windows

Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
---
 docs/configs.md                               |   1 +
 docs/supported_ops.md                         | 656 +++++++++++-------
 .../src/main/python/time_window_test.py       |  98 +++
 .../nvidia/spark/rapids/GpuOverrides.scala    |  17 +-
 .../apache/spark/sql/rapids/TimeWindow.scala  |  43 ++
 5 files changed, 544 insertions(+), 271 deletions(-)
 create mode 100644 integration_tests/src/main/python/time_window_test.py
 create mode 100644 sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TimeWindow.scala

diff --git a/docs/configs.md b/docs/configs.md
index b4373905562..99d21061c28 100644
--- a/docs/configs.md
+++ b/docs/configs.md
@@ -239,6 +239,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
 <a name="sql.expression.Pmod"></a>spark.rapids.sql.expression.Pmod|`pmod`|Pmod|true|None|
 <a name="sql.expression.PosExplode"></a>spark.rapids.sql.expression.PosExplode|`posexplode_outer`, `posexplode`|Given an input array produces a sequence of rows for each value in the array.|true|None|
 <a name="sql.expression.Pow"></a>spark.rapids.sql.expression.Pow|`pow`, `power`|lhs ^ rhs|true|None|
+<a name="sql.expression.PreciseTimestampConversion"></a>spark.rapids.sql.expression.PreciseTimestampConversion| |Expression used internally to convert the TimestampType to Long and back without losing precision, i.e. in microseconds. Used in time windowing.|true|None|
 <a name="sql.expression.PromotePrecision"></a>spark.rapids.sql.expression.PromotePrecision| |PromotePrecision before arithmetic operations between DecimalType data|true|None|
 <a name="sql.expression.PythonUDF"></a>spark.rapids.sql.expression.PythonUDF| |UDF run in an external python process. Does not actually run on the GPU, but the transfer of data to/from it can be accelerated.|true|None|
 <a name="sql.expression.Quarter"></a>spark.rapids.sql.expression.Quarter|`quarter`|Returns the quarter of the year for date, in the range 1 to 4|true|None|
diff --git a/docs/supported_ops.md b/docs/supported_ops.md
index b8c168edb98..54769cf2d25 100644
--- a/docs/supported_ops.md
+++ b/docs/supported_ops.md
@@ -190,9 +190,9 @@ Accelerator supports are described below.
 <td>S</td>
 <td><b>NS</b></td>
 <td><b>NS</b></td>
+<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
 <td><b>NS</b></td>
-<td><b>NS</b></td>
-<td><b>NS</b></td>
+<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
 <td><b>NS</b></td>
 </tr>
 <tr>
@@ -11833,6 +11833,96 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
+<td rowSpan="4">PreciseTimestampConversion</td>
+<td rowSpan="4"> </td>
+<td rowSpan="4">Expression used internally to convert the TimestampType to Long and back without losing precision, i.e. in microseconds. Used in time windowing.</td>
+<td rowSpan="4">None</td>
+<td rowSpan="2">project</td>
+<td>input</td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td>S</td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td>S*</td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+</tr>
+<tr>
+<td>result</td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td>S</td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td>S*</td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+</tr>
+<tr>
+<td rowSpan="2">lambda</td>
+<td>input</td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td><b>NS</b></td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td><b>NS</b></td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+</tr>
+<tr>
+<td>result</td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td><b>NS</b></td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td><b>NS</b></td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+<td> </td>
+</tr>
+<tr>
 <td rowSpan="4">PromotePrecision</td>
 <td rowSpan="4"> </td>
 <td rowSpan="4">PromotePrecision before arithmetic operations between DecimalType data</td>
@@ -11923,6 +12013,32 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
+<th>Expression</th>
+<th>SQL Functions(s)</th>
+<th>Description</th>
+<th>Notes</th>
+<th>Context</th>
+<th>Param/Output</th>
+<th>BOOLEAN</th>
+<th>BYTE</th>
+<th>SHORT</th>
+<th>INT</th>
+<th>LONG</th>
+<th>FLOAT</th>
+<th>DOUBLE</th>
+<th>DATE</th>
+<th>TIMESTAMP</th>
+<th>STRING</th>
+<th>DECIMAL</th>
+<th>NULL</th>
+<th>BINARY</th>
+<th>CALENDAR</th>
+<th>ARRAY</th>
+<th>MAP</th>
+<th>STRUCT</th>
+<th>UDT</th>
+</tr>
+<tr>
 <td rowSpan="8">PythonUDF</td>
 <td rowSpan="8"> </td>
 <td rowSpan="8">UDF run in an external python process. Does not actually run on the GPU, but the transfer of data to/from it can be accelerated.</td>
@@ -12099,32 +12215,6 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
-<th>Expression</th>
-<th>SQL Functions(s)</th>
-<th>Description</th>
-<th>Notes</th>
-<th>Context</th>
-<th>Param/Output</th>
-<th>BOOLEAN</th>
-<th>BYTE</th>
-<th>SHORT</th>
-<th>INT</th>
-<th>LONG</th>
-<th>FLOAT</th>
-<th>DOUBLE</th>
-<th>DATE</th>
-<th>TIMESTAMP</th>
-<th>STRING</th>
-<th>DECIMAL</th>
-<th>NULL</th>
-<th>BINARY</th>
-<th>CALENDAR</th>
-<th>ARRAY</th>
-<th>MAP</th>
-<th>STRUCT</th>
-<th>UDT</th>
-</tr>
-<tr>
 <td rowSpan="4">Quarter</td>
 <td rowSpan="4">`quarter`</td>
 <td rowSpan="4">Returns the quarter of the year for date, in the range 1 to 4</td>
@@ -12305,6 +12395,32 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
+<th>Expression</th>
+<th>SQL Functions(s)</th>
+<th>Description</th>
+<th>Notes</th>
+<th>Context</th>
+<th>Param/Output</th>
+<th>BOOLEAN</th>
+<th>BYTE</th>
+<th>SHORT</th>
+<th>INT</th>
+<th>LONG</th>
+<th>FLOAT</th>
+<th>DOUBLE</th>
+<th>DATE</th>
+<th>TIMESTAMP</th>
+<th>STRING</th>
+<th>DECIMAL</th>
+<th>NULL</th>
+<th>BINARY</th>
+<th>CALENDAR</th>
+<th>ARRAY</th>
+<th>MAP</th>
+<th>STRUCT</th>
+<th>UDT</th>
+</tr>
+<tr>
 <td rowSpan="2">Rank</td>
 <td rowSpan="2">`rank`</td>
 <td rowSpan="2">Window function that returns the rank value within the aggregation window</td>
@@ -12526,32 +12642,6 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
-<th>Expression</th>
-<th>SQL Functions(s)</th>
-<th>Description</th>
-<th>Notes</th>
-<th>Context</th>
-<th>Param/Output</th>
-<th>BOOLEAN</th>
-<th>BYTE</th>
-<th>SHORT</th>
-<th>INT</th>
-<th>LONG</th>
-<th>FLOAT</th>
-<th>DOUBLE</th>
-<th>DATE</th>
-<th>TIMESTAMP</th>
-<th>STRING</th>
-<th>DECIMAL</th>
-<th>NULL</th>
-<th>BINARY</th>
-<th>CALENDAR</th>
-<th>ARRAY</th>
-<th>MAP</th>
-<th>STRUCT</th>
-<th>UDT</th>
-</tr>
-<tr>
 <td rowSpan="6">Remainder</td>
 <td rowSpan="6">`%`, `mod`</td>
 <td rowSpan="6">Remainder or modulo</td>
@@ -12684,6 +12774,32 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
+<th>Expression</th>
+<th>SQL Functions(s)</th>
+<th>Description</th>
+<th>Notes</th>
+<th>Context</th>
+<th>Param/Output</th>
+<th>BOOLEAN</th>
+<th>BYTE</th>
+<th>SHORT</th>
+<th>INT</th>
+<th>LONG</th>
+<th>FLOAT</th>
+<th>DOUBLE</th>
+<th>DATE</th>
+<th>TIMESTAMP</th>
+<th>STRING</th>
+<th>DECIMAL</th>
+<th>NULL</th>
+<th>BINARY</th>
+<th>CALENDAR</th>
+<th>ARRAY</th>
+<th>MAP</th>
+<th>STRUCT</th>
+<th>UDT</th>
+</tr>
+<tr>
 <td rowSpan="4">Rint</td>
 <td rowSpan="4">`rint`</td>
 <td rowSpan="4">Rounds up a double value to the nearest double equal to an integer</td>
@@ -12906,32 +13022,6 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
-<th>Expression</th>
-<th>SQL Functions(s)</th>
-<th>Description</th>
-<th>Notes</th>
-<th>Context</th>
-<th>Param/Output</th>
-<th>BOOLEAN</th>
-<th>BYTE</th>
-<th>SHORT</th>
-<th>INT</th>
-<th>LONG</th>
-<th>FLOAT</th>
-<th>DOUBLE</th>
-<th>DATE</th>
-<th>TIMESTAMP</th>
-<th>STRING</th>
-<th>DECIMAL</th>
-<th>NULL</th>
-<th>BINARY</th>
-<th>CALENDAR</th>
-<th>ARRAY</th>
-<th>MAP</th>
-<th>STRUCT</th>
-<th>UDT</th>
-</tr>
-<tr>
 <td rowSpan="1">RowNumber</td>
 <td rowSpan="1">`row_number`</td>
 <td rowSpan="1">Window function that returns the index for the row within the aggregation window</td>
@@ -13048,14 +13138,40 @@ Accelerator support is described below.
 <td><b>NS</b></td>
 </tr>
 <tr>
-<td rowSpan="4">Second</td>
-<td rowSpan="4">`second`</td>
-<td rowSpan="4">Returns the second component of the string/timestamp</td>
-<td rowSpan="4">None</td>
-<td rowSpan="2">project</td>
-<td>input</td>
-<td> </td>
-<td> </td>
+<th>Expression</th>
+<th>SQL Functions(s)</th>
+<th>Description</th>
+<th>Notes</th>
+<th>Context</th>
+<th>Param/Output</th>
+<th>BOOLEAN</th>
+<th>BYTE</th>
+<th>SHORT</th>
+<th>INT</th>
+<th>LONG</th>
+<th>FLOAT</th>
+<th>DOUBLE</th>
+<th>DATE</th>
+<th>TIMESTAMP</th>
+<th>STRING</th>
+<th>DECIMAL</th>
+<th>NULL</th>
+<th>BINARY</th>
+<th>CALENDAR</th>
+<th>ARRAY</th>
+<th>MAP</th>
+<th>STRUCT</th>
+<th>UDT</th>
+</tr>
+<tr>
+<td rowSpan="4">Second</td>
+<td rowSpan="4">`second`</td>
+<td rowSpan="4">Returns the second component of the string/timestamp</td>
+<td rowSpan="4">None</td>
+<td rowSpan="2">project</td>
+<td>input</td>
+<td> </td>
+<td> </td>
 <td> </td>
 <td> </td>
 <td> </td>
@@ -13270,32 +13386,6 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
-<th>Expression</th>
-<th>SQL Functions(s)</th>
-<th>Description</th>
-<th>Notes</th>
-<th>Context</th>
-<th>Param/Output</th>
-<th>BOOLEAN</th>
-<th>BYTE</th>
-<th>SHORT</th>
-<th>INT</th>
-<th>LONG</th>
-<th>FLOAT</th>
-<th>DOUBLE</th>
-<th>DATE</th>
-<th>TIMESTAMP</th>
-<th>STRING</th>
-<th>DECIMAL</th>
-<th>NULL</th>
-<th>BINARY</th>
-<th>CALENDAR</th>
-<th>ARRAY</th>
-<th>MAP</th>
-<th>STRUCT</th>
-<th>UDT</th>
-</tr>
-<tr>
 <td rowSpan="6">ShiftRight</td>
 <td rowSpan="6">`shiftright`</td>
 <td rowSpan="6">Bitwise shift right (>>)</td>
@@ -13428,6 +13518,32 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
+<th>Expression</th>
+<th>SQL Functions(s)</th>
+<th>Description</th>
+<th>Notes</th>
+<th>Context</th>
+<th>Param/Output</th>
+<th>BOOLEAN</th>
+<th>BYTE</th>
+<th>SHORT</th>
+<th>INT</th>
+<th>LONG</th>
+<th>FLOAT</th>
+<th>DOUBLE</th>
+<th>DATE</th>
+<th>TIMESTAMP</th>
+<th>STRING</th>
+<th>DECIMAL</th>
+<th>NULL</th>
+<th>BINARY</th>
+<th>CALENDAR</th>
+<th>ARRAY</th>
+<th>MAP</th>
+<th>STRUCT</th>
+<th>UDT</th>
+</tr>
+<tr>
 <td rowSpan="6">ShiftRightUnsigned</td>
 <td rowSpan="6">`shiftrightunsigned`</td>
 <td rowSpan="6">Bitwise unsigned shift right (>>>)</td>
@@ -13650,32 +13766,6 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
-<th>Expression</th>
-<th>SQL Functions(s)</th>
-<th>Description</th>
-<th>Notes</th>
-<th>Context</th>
-<th>Param/Output</th>
-<th>BOOLEAN</th>
-<th>BYTE</th>
-<th>SHORT</th>
-<th>INT</th>
-<th>LONG</th>
-<th>FLOAT</th>
-<th>DOUBLE</th>
-<th>DATE</th>
-<th>TIMESTAMP</th>
-<th>STRING</th>
-<th>DECIMAL</th>
-<th>NULL</th>
-<th>BINARY</th>
-<th>CALENDAR</th>
-<th>ARRAY</th>
-<th>MAP</th>
-<th>STRUCT</th>
-<th>UDT</th>
-</tr>
-<tr>
 <td rowSpan="4">Sin</td>
 <td rowSpan="4">`sin`</td>
 <td rowSpan="4">Sine</td>
@@ -13856,6 +13946,32 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
+<th>Expression</th>
+<th>SQL Functions(s)</th>
+<th>Description</th>
+<th>Notes</th>
+<th>Context</th>
+<th>Param/Output</th>
+<th>BOOLEAN</th>
+<th>BYTE</th>
+<th>SHORT</th>
+<th>INT</th>
+<th>LONG</th>
+<th>FLOAT</th>
+<th>DOUBLE</th>
+<th>DATE</th>
+<th>TIMESTAMP</th>
+<th>STRING</th>
+<th>DECIMAL</th>
+<th>NULL</th>
+<th>BINARY</th>
+<th>CALENDAR</th>
+<th>ARRAY</th>
+<th>MAP</th>
+<th>STRUCT</th>
+<th>UDT</th>
+</tr>
+<tr>
 <td rowSpan="4">Size</td>
 <td rowSpan="4">`size`, `cardinality`</td>
 <td rowSpan="4">The size of an array or a map</td>
@@ -14078,32 +14194,6 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
-<th>Expression</th>
-<th>SQL Functions(s)</th>
-<th>Description</th>
-<th>Notes</th>
-<th>Context</th>
-<th>Param/Output</th>
-<th>BOOLEAN</th>
-<th>BYTE</th>
-<th>SHORT</th>
-<th>INT</th>
-<th>LONG</th>
-<th>FLOAT</th>
-<th>DOUBLE</th>
-<th>DATE</th>
-<th>TIMESTAMP</th>
-<th>STRING</th>
-<th>DECIMAL</th>
-<th>NULL</th>
-<th>BINARY</th>
-<th>CALENDAR</th>
-<th>ARRAY</th>
-<th>MAP</th>
-<th>STRUCT</th>
-<th>UDT</th>
-</tr>
-<tr>
 <td rowSpan="2">SortOrder</td>
 <td rowSpan="2"> </td>
 <td rowSpan="2">Sort order</td>
@@ -14267,6 +14357,32 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
+<th>Expression</th>
+<th>SQL Functions(s)</th>
+<th>Description</th>
+<th>Notes</th>
+<th>Context</th>
+<th>Param/Output</th>
+<th>BOOLEAN</th>
+<th>BYTE</th>
+<th>SHORT</th>
+<th>INT</th>
+<th>LONG</th>
+<th>FLOAT</th>
+<th>DOUBLE</th>
+<th>DATE</th>
+<th>TIMESTAMP</th>
+<th>STRING</th>
+<th>DECIMAL</th>
+<th>NULL</th>
+<th>BINARY</th>
+<th>CALENDAR</th>
+<th>ARRAY</th>
+<th>MAP</th>
+<th>STRUCT</th>
+<th>UDT</th>
+</tr>
+<tr>
 <td rowSpan="4">Sqrt</td>
 <td rowSpan="4">`sqrt`</td>
 <td rowSpan="4">Square root</td>
@@ -14489,32 +14605,6 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
-<th>Expression</th>
-<th>SQL Functions(s)</th>
-<th>Description</th>
-<th>Notes</th>
-<th>Context</th>
-<th>Param/Output</th>
-<th>BOOLEAN</th>
-<th>BYTE</th>
-<th>SHORT</th>
-<th>INT</th>
-<th>LONG</th>
-<th>FLOAT</th>
-<th>DOUBLE</th>
-<th>DATE</th>
-<th>TIMESTAMP</th>
-<th>STRING</th>
-<th>DECIMAL</th>
-<th>NULL</th>
-<th>BINARY</th>
-<th>CALENDAR</th>
-<th>ARRAY</th>
-<th>MAP</th>
-<th>STRUCT</th>
-<th>UDT</th>
-</tr>
-<tr>
 <td rowSpan="8">StringLPad</td>
 <td rowSpan="8">`lpad`</td>
 <td rowSpan="8">Pad a string on the left</td>
@@ -14689,6 +14779,32 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
+<th>Expression</th>
+<th>SQL Functions(s)</th>
+<th>Description</th>
+<th>Notes</th>
+<th>Context</th>
+<th>Param/Output</th>
+<th>BOOLEAN</th>
+<th>BYTE</th>
+<th>SHORT</th>
+<th>INT</th>
+<th>LONG</th>
+<th>FLOAT</th>
+<th>DOUBLE</th>
+<th>DATE</th>
+<th>TIMESTAMP</th>
+<th>STRING</th>
+<th>DECIMAL</th>
+<th>NULL</th>
+<th>BINARY</th>
+<th>CALENDAR</th>
+<th>ARRAY</th>
+<th>MAP</th>
+<th>STRUCT</th>
+<th>UDT</th>
+</tr>
+<tr>
 <td rowSpan="8">StringLocate</td>
 <td rowSpan="8">`position`, `locate`</td>
 <td rowSpan="8">Substring search operator</td>
@@ -14863,32 +14979,6 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
-<th>Expression</th>
-<th>SQL Functions(s)</th>
-<th>Description</th>
-<th>Notes</th>
-<th>Context</th>
-<th>Param/Output</th>
-<th>BOOLEAN</th>
-<th>BYTE</th>
-<th>SHORT</th>
-<th>INT</th>
-<th>LONG</th>
-<th>FLOAT</th>
-<th>DOUBLE</th>
-<th>DATE</th>
-<th>TIMESTAMP</th>
-<th>STRING</th>
-<th>DECIMAL</th>
-<th>NULL</th>
-<th>BINARY</th>
-<th>CALENDAR</th>
-<th>ARRAY</th>
-<th>MAP</th>
-<th>STRUCT</th>
-<th>UDT</th>
-</tr>
-<tr>
 <td rowSpan="8">StringRPad</td>
 <td rowSpan="8">`rpad`</td>
 <td rowSpan="8">Pad a string on the right</td>
@@ -15063,6 +15153,32 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
+<th>Expression</th>
+<th>SQL Functions(s)</th>
+<th>Description</th>
+<th>Notes</th>
+<th>Context</th>
+<th>Param/Output</th>
+<th>BOOLEAN</th>
+<th>BYTE</th>
+<th>SHORT</th>
+<th>INT</th>
+<th>LONG</th>
+<th>FLOAT</th>
+<th>DOUBLE</th>
+<th>DATE</th>
+<th>TIMESTAMP</th>
+<th>STRING</th>
+<th>DECIMAL</th>
+<th>NULL</th>
+<th>BINARY</th>
+<th>CALENDAR</th>
+<th>ARRAY</th>
+<th>MAP</th>
+<th>STRUCT</th>
+<th>UDT</th>
+</tr>
+<tr>
 <td rowSpan="8">StringReplace</td>
 <td rowSpan="8">`replace`</td>
 <td rowSpan="8">StringReplace operator</td>
@@ -15237,32 +15353,6 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
-<th>Expression</th>
-<th>SQL Functions(s)</th>
-<th>Description</th>
-<th>Notes</th>
-<th>Context</th>
-<th>Param/Output</th>
-<th>BOOLEAN</th>
-<th>BYTE</th>
-<th>SHORT</th>
-<th>INT</th>
-<th>LONG</th>
-<th>FLOAT</th>
-<th>DOUBLE</th>
-<th>DATE</th>
-<th>TIMESTAMP</th>
-<th>STRING</th>
-<th>DECIMAL</th>
-<th>NULL</th>
-<th>BINARY</th>
-<th>CALENDAR</th>
-<th>ARRAY</th>
-<th>MAP</th>
-<th>STRUCT</th>
-<th>UDT</th>
-</tr>
-<tr>
 <td rowSpan="8">StringSplit</td>
 <td rowSpan="8">`split`</td>
 <td rowSpan="8">Splits `str` around occurrences that match `regex`</td>
@@ -15437,6 +15527,32 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
+<th>Expression</th>
+<th>SQL Functions(s)</th>
+<th>Description</th>
+<th>Notes</th>
+<th>Context</th>
+<th>Param/Output</th>
+<th>BOOLEAN</th>
+<th>BYTE</th>
+<th>SHORT</th>
+<th>INT</th>
+<th>LONG</th>
+<th>FLOAT</th>
+<th>DOUBLE</th>
+<th>DATE</th>
+<th>TIMESTAMP</th>
+<th>STRING</th>
+<th>DECIMAL</th>
+<th>NULL</th>
+<th>BINARY</th>
+<th>CALENDAR</th>
+<th>ARRAY</th>
+<th>MAP</th>
+<th>STRUCT</th>
+<th>UDT</th>
+</tr>
+<tr>
 <td rowSpan="6">StringTrim</td>
 <td rowSpan="6">`trim`</td>
 <td rowSpan="6">StringTrim operator</td>
@@ -15701,32 +15817,6 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
-<th>Expression</th>
-<th>SQL Functions(s)</th>
-<th>Description</th>
-<th>Notes</th>
-<th>Context</th>
-<th>Param/Output</th>
-<th>BOOLEAN</th>
-<th>BYTE</th>
-<th>SHORT</th>
-<th>INT</th>
-<th>LONG</th>
-<th>FLOAT</th>
-<th>DOUBLE</th>
-<th>DATE</th>
-<th>TIMESTAMP</th>
-<th>STRING</th>
-<th>DECIMAL</th>
-<th>NULL</th>
-<th>BINARY</th>
-<th>CALENDAR</th>
-<th>ARRAY</th>
-<th>MAP</th>
-<th>STRUCT</th>
-<th>UDT</th>
-</tr>
-<tr>
 <td rowSpan="6">StringTrimRight</td>
 <td rowSpan="6">`rtrim`</td>
 <td rowSpan="6">StringTrimRight operator</td>
@@ -15859,6 +15949,32 @@ Accelerator support is described below.
 <td> </td>
 </tr>
 <tr>
+<th>Expression</th>
+<th>SQL Functions(s)</th>
+<th>Description</th>
+<th>Notes</th>
+<th>Context</th>
+<th>Param/Output</th>
+<th>BOOLEAN</th>
+<th>BYTE</th>
+<th>SHORT</th>
+<th>INT</th>
+<th>LONG</th>
+<th>FLOAT</th>
+<th>DOUBLE</th>
+<th>DATE</th>
+<th>TIMESTAMP</th>
+<th>STRING</th>
+<th>DECIMAL</th>
+<th>NULL</th>
+<th>BINARY</th>
+<th>CALENDAR</th>
+<th>ARRAY</th>
+<th>MAP</th>
+<th>STRUCT</th>
+<th>UDT</th>
+</tr>
+<tr>
 <td rowSpan="8">Substring</td>
 <td rowSpan="8">`substr`, `substring`</td>
 <td rowSpan="8">Substring operator</td>
diff --git a/integration_tests/src/main/python/time_window_test.py b/integration_tests/src/main/python/time_window_test.py
new file mode 100644
index 00000000000..45c7e29ab81
--- /dev/null
+++ b/integration_tests/src/main/python/time_window_test.py
@@ -0,0 +1,98 @@
+# Copyright (c) 2021, NVIDIA CORPORATION.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from asserts import assert_gpu_and_cpu_are_equal_collect
+from data_gen import *
+from datetime import datetime
+from marks import ignore_order, allow_non_gpu
+from pyspark.sql.types import *
+import pyspark.sql.functions as f
+from pyspark.sql.window import Window
+
+# do it over a day so we have more chance of overlapping values
+_restricted_start = datetime(2020, 1, 1, tzinfo=timezone.utc)
+_restricted_end = datetime(2020, 1, 2, tzinfo=timezone.utc)
+_restricted_ts_gen = TimestampGen(start=_restricted_start, end=_restricted_end)
+
+# Once we support grouping by a struct (even single level) this should go away
+# https://github.com/NVIDIA/spark-rapids/issues/2877
+# Shuffle falls back to CPU becasue it is in between two CPU hash/sort aggregates
+@allow_non_gpu('HashAggregateExec', 'SortAggregateExec', 'AggregateExpression', 'Max', 'Alias', 'ShuffleExchangeExec', 'HashPartitioning')
+@pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn)
+@ignore_order
+def test_grouped_tumbling_window(data_gen):
+    row_gen = StructGen([['ts', _restricted_ts_gen],['data', data_gen]], nullable=False)
+    assert_gpu_and_cpu_are_equal_collect(
+            lambda spark : gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour')).agg(f.max("data").alias("max_data")))
+
+# Having arrays allows us to verify that expand exec in this case works with arrays too
+# Once we support grouping by a struct (even single level) this should go away
+# https://github.com/NVIDIA/spark-rapids/issues/2877
+# Shuffle falls back to CPU becasue it is in between two CPU hash/sort aggregates
+@allow_non_gpu('HashAggregateExec', 'SortAggregateExec', 'AggregateExpression', 'GetArrayItem', 'Literal', 'Max', 'Alias', 'ShuffleExchangeExec', 'HashPartitioning')
+@pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn)
+@ignore_order
+def test_grouped_tumbling_window_array(data_gen):
+    row_gen = StructGen([['ts', _restricted_ts_gen],['data', ArrayGen(data_gen)]], nullable=False)
+    assert_gpu_and_cpu_are_equal_collect(
+            lambda spark : gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour')).agg(f.max(f.col("data")[3]).alias("max_data")))
+
+# Warning. On Sliding windows is it easy to make lots of overlapping windows. This can make the Spark code generation
+# have some real problems and even crash some times when trying to JIT it. This problem only happnes on the CPU
+# so be careful.
+
+# Once we support grouping by a struct (even single level) this should go away
+# https://github.com/NVIDIA/spark-rapids/issues/2877
+# Shuffle falls back to CPU becasue it is in between two CPU hash/sort aggregates
+@allow_non_gpu('HashAggregateExec', 'SortAggregateExec', 'AggregateExpression', 'Max', 'Alias', 'ShuffleExchangeExec', 'HashPartitioning')
+@pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn)
+@ignore_order
+def test_grouped_sliding_window(data_gen):
+    row_gen = StructGen([['ts', _restricted_ts_gen],['data', data_gen]], nullable=False)
+    assert_gpu_and_cpu_are_equal_collect(
+            lambda spark : gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour', '1 hour')).agg(f.max("data").alias("max_data")))
+
+# Having arrays allows us to verify that expand exec in this case works with arrays too
+# Once we support grouping by a struct (even single level) this should go away
+# https://github.com/NVIDIA/spark-rapids/issues/2877
+# Shuffle falls back to CPU becasue it is in between two CPU hash/sort aggregates
+@allow_non_gpu('HashAggregateExec', 'SortAggregateExec', 'AggregateExpression', 'GetArrayItem', 'Literal', 'Max', 'Alias', 'ShuffleExchangeExec', 'HashPartitioning')
+@pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn)
+@ignore_order
+def test_grouped_sliding_window_array(data_gen):
+    row_gen = StructGen([['ts', _restricted_ts_gen],['data', ArrayGen(data_gen)]], nullable=False)
+    assert_gpu_and_cpu_are_equal_collect(
+            lambda spark : gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour', '1 hour')).agg(f.max(f.col("data")[3]).alias("max_data")))
+
+@pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn)
+@allow_non_gpu('WindowExec', 'WindowExpression', 'WindowSpecDefinition', 'SpecifiedWindowFrame', 'UnboundedPreceding$', 'UnboundedFollowing$', 'AggregateExpression', 'Max', 'Alias')
+@ignore_order
+def test_tumbling_window(data_gen):
+    row_gen = StructGen([['ts', _restricted_ts_gen],['data', data_gen]], nullable=False)
+    w = Window.partitionBy(f.window('ts', '5 hour'))
+    assert_gpu_and_cpu_are_equal_collect(
+            lambda spark : gen_df(spark, row_gen).withColumn('rolling_max', f.max("data").over(w)))
+
+@pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn)
+@allow_non_gpu('WindowExec', 'WindowExpression', 'WindowSpecDefinition', 'SpecifiedWindowFrame', 'UnboundedPreceding$', 'UnboundedFollowing$', 'AggregateExpression', 'Max', 'Alias')
+@ignore_order
+def test_sliding_window(data_gen):
+    row_gen = StructGen([['ts', _restricted_ts_gen],['data', data_gen]], nullable=False)
+    w = Window.partitionBy(f.window('ts', '5 hour', '1 hour'))
+    assert_gpu_and_cpu_are_equal_collect(
+            lambda spark : gen_df(spark, row_gen).withColumn('rolling_max', f.max("data").over(w)))
+
+
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
index aade5c0141d..8128d22167b 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
@@ -964,6 +964,18 @@ object GpuOverrides {
         override def convertToGpu(): GpuExpression =
           GpuLag(input.convertToGpu(), offset.convertToGpu(), default.convertToGpu())
       }),
+    expr[PreciseTimestampConversion](
+      "Expression used internally to convert the TimestampType to Long and back without losing " +
+          "precision, i.e. in microseconds. Used in time windowing.",
+      ExprChecks.unaryProjectNotLambda(
+        TypeSig.TIMESTAMP + TypeSig.LONG,
+        TypeSig.TIMESTAMP + TypeSig.LONG,
+        TypeSig.TIMESTAMP + TypeSig.LONG,
+        TypeSig.TIMESTAMP + TypeSig.LONG),
+      (a, conf, p, r) => new UnaryExprMeta[PreciseTimestampConversion](a, conf, p, r) {
+        override def convertToGpu(child: Expression): GpuExpression =
+          GpuPreciseTimestampConversion(child, a.fromType, a.toType)
+      }),
     expr[UnaryMinus](
       "Negate a numeric value",
       ExprChecks.unaryProjectNotLambdaInputMatchesOutput(
@@ -3047,7 +3059,10 @@ object GpuOverrides {
       (sort, conf, p, r) => new GpuSortMeta(sort, conf, p, r)),
     exec[ExpandExec](
       "The backend for the expand operator",
-      ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all),
+      ExecChecks(
+        (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL +
+            TypeSig.STRUCT + TypeSig.ARRAY).nested(),
+        TypeSig.all),
       (expand, conf, p, r) => new GpuExpandExecMeta(expand, conf, p, r)),
     exec[WindowExec](
       "Window-operator backend",
diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TimeWindow.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TimeWindow.scala
new file mode 100644
index 00000000000..52d1e8d7972
--- /dev/null
+++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TimeWindow.scala
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2021, NVIDIA CORPORATION.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.rapids
+
+import ai.rapids.cudf.ColumnVector
+import com.nvidia.spark.rapids.{GpuColumnVector, GpuUnaryExpression}
+
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression}
+import org.apache.spark.sql.types.{AbstractDataType, DataType}
+
+/**
+ * Expression used internally to convert the TimestampType to Long and back without losing
+ * precision, i.e. in microseconds. Used in time windowing.
+ */
+case class GpuPreciseTimestampConversion(
+    child: Expression,
+    fromType: DataType,
+    toType: DataType) extends GpuUnaryExpression with ExpectsInputTypes {
+  override def inputTypes: Seq[AbstractDataType] = Seq(fromType)
+  override def dataType: DataType = toType
+
+  override protected def doColumnar(input: GpuColumnVector): ColumnVector = {
+    val outDType = GpuColumnVector.getNonNestedRapidsType(toType)
+    withResource(input.getBase.bitCastTo(outDType)) { bitCast =>
+      bitCast.copyToColumnVector()
+    }
+  }
+}
\ No newline at end of file

From 2d6083f011e0bda4433dbca26b872a8fc69d4ca3 Mon Sep 17 00:00:00 2001
From: "Robert (Bobby) Evans" <bobby@apache.org>
Date: Thu, 29 Jul 2021 08:48:17 -0500
Subject: [PATCH 2/2] Addressed review comments

---
 docs/supported_ops.md                         |  6 ++--
 integration_tests/src/main/python/data_gen.py |  4 +--
 .../src/main/python/time_window_test.py       | 30 +++++++++----------
 .../nvidia/spark/rapids/GpuOverrides.scala    |  2 +-
 4 files changed, 20 insertions(+), 22 deletions(-)

diff --git a/docs/supported_ops.md b/docs/supported_ops.md
index ba821ef4203..3390d39ea47 100644
--- a/docs/supported_ops.md
+++ b/docs/supported_ops.md
@@ -190,9 +190,9 @@ Accelerator supports are described below.
 <td>S</td>
 <td><b>NS</b></td>
 <td><b>NS</b></td>
-<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
-<td><b>NS</b></td>
-<td><em>PS* (missing nested BINARY, CALENDAR, MAP, UDT)</em></td>
+<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
+<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
+<td><em>PS* (missing nested BINARY, CALENDAR, UDT)</em></td>
 <td><b>NS</b></td>
 </tr>
 <tr>
diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py
index f5ffe30d72f..703ccdaf236 100644
--- a/integration_tests/src/main/python/data_gen.py
+++ b/integration_tests/src/main/python/data_gen.py
@@ -868,9 +868,9 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False):
 
 boolean_gens = [boolean_gen]
 
-single_level_array_gens = [ArrayGen(sub_gen) for sub_gen in all_basic_gens + decimal_gens + [null_gen]]
+single_level_array_gens = [ArrayGen(sub_gen) for sub_gen in all_basic_gens + decimal_gens]
 
-single_level_array_gens_no_decimal = [ArrayGen(sub_gen) for sub_gen in all_basic_gens + [null_gen]]
+single_level_array_gens_no_decimal = [ArrayGen(sub_gen) for sub_gen in all_basic_gens]
 
 map_string_string_gen = [MapGen(StringGen(pattern='key_[0-9]', nullable=False), StringGen())]
 
diff --git a/integration_tests/src/main/python/time_window_test.py b/integration_tests/src/main/python/time_window_test.py
index 45c7e29ab81..05005bef5b2 100644
--- a/integration_tests/src/main/python/time_window_test.py
+++ b/integration_tests/src/main/python/time_window_test.py
@@ -29,7 +29,7 @@
 
 # Once we support grouping by a struct (even single level) this should go away
 # https://github.com/NVIDIA/spark-rapids/issues/2877
-# Shuffle falls back to CPU becasue it is in between two CPU hash/sort aggregates
+# Shuffle falls back to CPU because it is in between two CPU hash/sort aggregates
 @allow_non_gpu('HashAggregateExec', 'SortAggregateExec', 'AggregateExpression', 'Max', 'Alias', 'ShuffleExchangeExec', 'HashPartitioning')
 @pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn)
 @ignore_order
@@ -38,25 +38,13 @@ def test_grouped_tumbling_window(data_gen):
     assert_gpu_and_cpu_are_equal_collect(
             lambda spark : gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour')).agg(f.max("data").alias("max_data")))
 
-# Having arrays allows us to verify that expand exec in this case works with arrays too
-# Once we support grouping by a struct (even single level) this should go away
-# https://github.com/NVIDIA/spark-rapids/issues/2877
-# Shuffle falls back to CPU becasue it is in between two CPU hash/sort aggregates
-@allow_non_gpu('HashAggregateExec', 'SortAggregateExec', 'AggregateExpression', 'GetArrayItem', 'Literal', 'Max', 'Alias', 'ShuffleExchangeExec', 'HashPartitioning')
-@pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn)
-@ignore_order
-def test_grouped_tumbling_window_array(data_gen):
-    row_gen = StructGen([['ts', _restricted_ts_gen],['data', ArrayGen(data_gen)]], nullable=False)
-    assert_gpu_and_cpu_are_equal_collect(
-            lambda spark : gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour')).agg(f.max(f.col("data")[3]).alias("max_data")))
-
 # Warning. On Sliding windows is it easy to make lots of overlapping windows. This can make the Spark code generation
-# have some real problems and even crash some times when trying to JIT it. This problem only happnes on the CPU
+# have some real problems and even crash some times when trying to JIT it. This problem only happens on the CPU
 # so be careful.
 
 # Once we support grouping by a struct (even single level) this should go away
 # https://github.com/NVIDIA/spark-rapids/issues/2877
-# Shuffle falls back to CPU becasue it is in between two CPU hash/sort aggregates
+# Shuffle falls back to CPU because it is in between two CPU hash/sort aggregates
 @allow_non_gpu('HashAggregateExec', 'SortAggregateExec', 'AggregateExpression', 'Max', 'Alias', 'ShuffleExchangeExec', 'HashPartitioning')
 @pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn)
 @ignore_order
@@ -68,7 +56,7 @@ def test_grouped_sliding_window(data_gen):
 # Having arrays allows us to verify that expand exec in this case works with arrays too
 # Once we support grouping by a struct (even single level) this should go away
 # https://github.com/NVIDIA/spark-rapids/issues/2877
-# Shuffle falls back to CPU becasue it is in between two CPU hash/sort aggregates
+# Shuffle falls back to CPU because it is in between two CPU hash/sort aggregates
 @allow_non_gpu('HashAggregateExec', 'SortAggregateExec', 'AggregateExpression', 'GetArrayItem', 'Literal', 'Max', 'Alias', 'ShuffleExchangeExec', 'HashPartitioning')
 @pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn)
 @ignore_order
@@ -95,4 +83,14 @@ def test_sliding_window(data_gen):
     assert_gpu_and_cpu_are_equal_collect(
             lambda spark : gen_df(spark, row_gen).withColumn('rolling_max', f.max("data").over(w)))
 
+# This allows us to verify that GpuExpandExec works with all of the various types.
+@pytest.mark.parametrize('data_gen', all_basic_gens + decimal_gens + array_gens_sample + map_gens_sample, ids=idfn)
+# This includes an expand and we produce a different order than the CPU does. Sort locally to allow sorting of all types
+@ignore_order(local=True)
+def test_just_window(data_gen):
+    row_gen = StructGen([['ts', timestamp_gen],['data', data_gen]], nullable=False)
+    assert_gpu_and_cpu_are_equal_collect(
+            lambda spark : gen_df(spark, row_gen).withColumn('time_bucket', f.window('ts', '5 hour', '1 hour')),
+            conf = allow_negative_scale_of_decimal_conf)
+
 
diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
index 5aafb933d5c..9600cc42f87 100644
--- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
+++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
@@ -3060,7 +3060,7 @@ object GpuOverrides {
       "The backend for the expand operator",
       ExecChecks(
         (TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL +
-            TypeSig.STRUCT + TypeSig.ARRAY).nested(),
+            TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(),
         TypeSig.all),
       (expand, conf, p, r) => new GpuExpandExecMeta(expand, conf, p, r)),
     exec[WindowExec](