Skip to content

Commit

Permalink
upmerge
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jan 30, 2024
2 parents c4170f5 + e375368 commit 677374f
Show file tree
Hide file tree
Showing 29 changed files with 695 additions and 213 deletions.
10 changes: 5 additions & 5 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
Expand All @@ -10,7 +10,7 @@
# limitations under the License.


/jenkins/ @jlowe @revans2 @tgravescs @GaryShen2008 @NvTimLiu
pom.xml @jlowe @revans2 @tgravescs @GaryShen2008 @NvTimLiu
/dist/ @jlowe @revans2 @tgravescs @GaryShen2008 @NvTimLiu
/.github/ @jlowe @revans2 @tgravescs @GaryShen2008 @NvTimLiu
/jenkins/ @jlowe @revans2 @tgravescs @GaryShen2008 @NvTimLiu @gerashegalov
pom.xml @jlowe @revans2 @tgravescs @GaryShen2008 @NvTimLiu @gerashegalov
/dist/ @jlowe @revans2 @tgravescs @GaryShen2008 @NvTimLiu @gerashegalov
/.github/ @jlowe @revans2 @tgravescs @GaryShen2008 @NvTimLiu @gerashegalov
10 changes: 5 additions & 5 deletions .github/workflows/auto-merge.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,7 @@ name: auto-merge HEAD to BASE
on:
pull_request_target:
branches:
- branch-23.12
- branch-24.02
types: [closed]

jobs:
Expand All @@ -29,13 +29,13 @@ jobs:
steps:
- uses: actions/checkout@v3
with:
ref: branch-23.12 # force to fetch from latest upstream instead of PR ref
ref: branch-24.02 # force to fetch from latest upstream instead of PR ref

- name: auto-merge job
uses: ./.github/workflows/auto-merge
env:
OWNER: NVIDIA
REPO_NAME: spark-rapids
HEAD: branch-23.12
BASE: branch-24.02
HEAD: branch-24.02
BASE: branch-24.04
AUTOMERGE_TOKEN: ${{ secrets.AUTOMERGE_TOKEN }} # use to merge PR
1 change: 1 addition & 0 deletions docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ Name | Description | Default Value | Applicable at
<a name="sql.json.read.decimal.enabled"></a>spark.rapids.sql.json.read.decimal.enabled|JSON reading is not 100% compatible when reading decimals.|false|Runtime
<a name="sql.json.read.double.enabled"></a>spark.rapids.sql.json.read.double.enabled|JSON reading is not 100% compatible when reading doubles.|true|Runtime
<a name="sql.json.read.float.enabled"></a>spark.rapids.sql.json.read.float.enabled|JSON reading is not 100% compatible when reading floats.|true|Runtime
<a name="sql.json.read.mixedTypesAsString.enabled"></a>spark.rapids.sql.json.read.mixedTypesAsString.enabled|JSON reading is not 100% compatible when reading mixed types as string.|false|Runtime
<a name="sql.mode"></a>spark.rapids.sql.mode|Set the mode for the Rapids Accelerator. The supported modes are explainOnly and executeOnGPU. This config can not be changed at runtime, you must restart the application for it to take affect. The default mode is executeOnGPU, which means the RAPIDS Accelerator plugin convert the Spark operations and execute them on the GPU when possible. The explainOnly mode allows running queries on the CPU and the RAPIDS Accelerator will evaluate the queries as if it was going to run on the GPU. The explanations of what would have run on the GPU and why are output in log messages. When using explainOnly mode, the default explain output is ALL, this can be changed by setting spark.rapids.sql.explain. See that config for more details.|executeongpu|Startup
<a name="sql.optimizer.joinReorder.enabled"></a>spark.rapids.sql.optimizer.joinReorder.enabled|When enabled, joins may be reordered for improved query performance|true|Runtime
<a name="sql.python.gpu.enabled"></a>spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false|Runtime
Expand Down
14 changes: 13 additions & 1 deletion docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,14 @@ In particular, the output map is not resulted from a regular JSON parsing but in
* If the input JSON is given as multiple rows, any row containing invalid JSON format will be parsed as an empty
struct instead of a null value ([#9592](https://github.com/NVIDIA/spark-rapids/issues/9592)).

When a JSON attribute contains mixed types (different types in different rows), such as a mix of dictionaries
and lists, Spark will return a string representation of the JSON, but when running on GPU, the default
behavior is to throw an exception. There is an experimental setting
`spark.rapids.sql.json.read.mixedTypesAsString.enabled` that can be set to true to support reading
mixed types as string, but there are known issues where it could also read structs as string in some cases. There
can also be minor formatting differences. Spark will return a parsed and formatted representation, but the
GPU implementation returns the unparsed JSON string.

### `to_json` function

The `to_json` function is disabled by default because it is experimental and has some known incompatibilities
Expand Down Expand Up @@ -529,14 +537,18 @@ The following regular expression patterns are not yet supported on the GPU and w
or more results
- Line anchor `$` and string anchors `\Z` are not supported in patterns containing `\W` or `\D`
- Line and string anchors are not supported by `string_split` and `str_to_map`
- Lazy quantifiers, such as `a*?`
- Lazy quantifiers within a choice block such as `(2|\u2029??)+`
- Possessive quantifiers, such as `a*+`
- Character classes that use union, intersection, or subtraction semantics, such as `[a-d[m-p]]`, `[a-z&&[def]]`,
or `[a-z&&[^bc]]`
- Empty groups: `()`

Work is ongoing to increase the range of regular expressions that can run on the GPU.

## URL Parsing

`parse_url` QUERY with a column key could produce different results on CPU and GPU. In Spark, the `key` in `parse_url` could act like a regex, but GPU will match the key exactly. If key is literal, GPU will check if key contains regex special characters and fallback to CPU if it does, but if key is column, it will not be able to fallback. For example, `parse_url("http://foo/bar?abc=BAD&a.c=GOOD", QUERY, "a.c")` will return "BAD" on CPU, but "GOOD" on GPU. See the Spark issue: https://issues.apache.org/jira/browse/SPARK-44500

## Timestamps

Spark stores timestamps internally relative to the JVM time zone. Converting an arbitrary timestamp
Expand Down
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -10817,7 +10817,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>only support partToExtract = PROTOCOL | HOST;<br/>Literal value only</em></td>
<td><em>PS<br/>only support partToExtract = PROTOCOL | HOST | QUERY;<br/>Literal value only</em></td>
<td> </td>
<td> </td>
<td> </td>
Expand All @@ -10838,7 +10838,7 @@ are limited.
<td> </td>
<td> </td>
<td> </td>
<td><em>PS<br/>Literal value only</em></td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
Expand Down
13 changes: 9 additions & 4 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1153,11 +1153,13 @@ def do_join(spark):
"spark.rapids.sql.input." + scan_name: False})

@ignore_order(local=True)
@pytest.mark.parametrize("join_type", ["Inner", "FullOuter"], ids=idfn)
@pytest.mark.parametrize("is_left_host_shuffle", [False, True], ids=idfn)
@pytest.mark.parametrize("is_right_host_shuffle", [False, True], ids=idfn)
@pytest.mark.parametrize("is_left_smaller", [False, True], ids=idfn)
@pytest.mark.parametrize("batch_size", ["1024", "1g"], ids=idfn)
def test_new_inner_join(is_left_host_shuffle, is_right_host_shuffle, is_left_smaller, batch_size):
def test_new_symmetric_join(join_type, is_left_host_shuffle, is_right_host_shuffle,
is_left_smaller, batch_size):
join_conf = {
"spark.rapids.sql.join.useShuffledSymmetricHashJoin": "true",
"spark.sql.autoBroadcastJoinThreshold": "1",
Expand All @@ -1183,14 +1185,17 @@ def do_join(spark):
left_df = left_df.groupBy("key1", "key2").max("ints", "floats")
if not is_right_host_shuffle:
right_df = right_df.groupBy("key1", "key2").max("doubles", "shorts")
return left_df.join(right_df, ["key1", "key2"], "inner")
return left_df.join(right_df, ["key1", "key2"], join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=join_conf)

@ignore_order(local=True)
@pytest.mark.parametrize("join_type", ["Inner", "FullOuter"], ids=idfn)
@pytest.mark.parametrize("is_left_smaller", [False, True], ids=idfn)
@pytest.mark.parametrize("is_ast_supported", [False, True], ids=idfn)
@pytest.mark.parametrize("batch_size", ["1024", "1g"], ids=idfn)
def test_new_inner_join_conditional(is_ast_supported, is_left_smaller, batch_size):
def test_new_symmetric_join_conditional(join_type, is_ast_supported, is_left_smaller, batch_size):
if join_type == "FullOuter" and not is_ast_supported:
pytest.skip("Full outer joins do not support a non-AST condition")
join_conf = {
"spark.rapids.sql.join.useShuffledSymmetricHashJoin": "true",
"spark.sql.autoBroadcastJoinThreshold": "1",
Expand All @@ -1213,5 +1218,5 @@ def do_join(spark):
else:
# AST does not support logarithm yet
cond.append(left_df.ints >= f.log(right_df.ints))
return left_df.join(right_df, cond, "inner")
return left_df.join(right_df, cond, join_type)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=join_conf)
18 changes: 16 additions & 2 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,9 @@ def test_read_invalid_json(spark_tmp_table_factory, std_input_path, read_func, f
@pytest.mark.parametrize('schema', [_int_schema])
@pytest.mark.parametrize('v1_enabled_list', ["", "json"])
def test_read_valid_json(spark_tmp_table_factory, std_input_path, read_func, filename, schema, v1_enabled_list):
conf = copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.rapids.sql.json.read.mixedTypesAsString.enabled': True})
assert_gpu_and_cpu_are_equal_collect(
read_func(std_input_path + '/' + filename,
schema,
Expand Down Expand Up @@ -644,7 +646,7 @@ def test_from_json_map_fallback():
@allow_non_gpu(*non_utc_allow)
def test_from_json_struct(schema):
# note that column 'a' does not use leading zeroes due to https://github.com/NVIDIA/spark-rapids/issues/9588
json_string_gen = StringGen(r'{"a": [1-9]{0,5}, "b": "[A-Z]{0,5}", "c": 1\d\d\d}') \
json_string_gen = StringGen(r'{\'a\': [1-9]{0,5}, "b": \'[A-Z]{0,5}\', "c": 1\d\d\d}') \
.with_special_pattern('', weight=50) \
.with_special_pattern('null', weight=50)
assert_gpu_and_cpu_are_equal_collect(
Expand Down Expand Up @@ -871,6 +873,18 @@ def test_from_json_struct_of_list(schema):
.select(f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True})

@pytest.mark.parametrize('schema', [
'struct<a:string>'
])
@allow_non_gpu(*non_utc_allow)
def test_from_json_mixed_types_list_struct(schema):
json_string_gen = StringGen(r'{"a": (\[1,2,3\]|{"b":"[a-z]{2}"}) }')
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, json_string_gen) \
.select('a', f.from_json('a', schema)),
conf={"spark.rapids.sql.expression.JsonToStructs": True,
'spark.rapids.sql.json.read.mixedTypesAsString.enabled': True})

@pytest.mark.parametrize('schema', ['struct<a:string>', 'struct<a:string,b:int>'])
@allow_non_gpu(*non_utc_allow)
def test_from_json_struct_all_empty_string_input(schema):
Expand Down
11 changes: 10 additions & 1 deletion integration_tests/src/main/python/regexp_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1064,3 +1064,12 @@ def test_re_replace_all():
lambda spark: unary_op_df(spark, gen).selectExpr(
'REGEXP_REPLACE(a, ".*$", "PROD", 1)'),
conf=_regexp_conf)

def test_lazy_quantifier():
gen = mk_str_gen('[a-z]{0,2} \"[a-z]{0,2}\" and \"[a-z]{0,3}\"')
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, gen).selectExpr(
'a', r'REGEXP_EXTRACT(a, "(\".??\")")',
r'REGEXP_EXTRACT(a, "(\".+?\")")',
r'REGEXP_EXTRACT(a, "(\".*?\")")'),
conf=_regexp_conf)
14 changes: 13 additions & 1 deletion integration_tests/src/main/python/udf_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -216,6 +216,18 @@ def pandas_sum(to_process: pd.Series) -> list:
.select([f.col('py_array').getItem(i) for i in range(0, 1)]),
conf=arrow_udf_conf)

@ignore_order
@pytest.mark.parametrize('data_gen', [byte_gen, int_gen], ids=idfn)
@pytest.mark.parametrize('window', udf_windows, ids=window_ids)
def test_window_aggregate_udf_array_input(data_gen, window):
@f.pandas_udf(returnType=LongType())
def pandas_size(to_process: pd.Series) -> int:
return to_process.size

assert_gpu_and_cpu_are_equal_collect(
lambda spark: three_col_df(spark, data_gen, data_gen, ArrayGen(data_gen)).select(
pandas_size(f.col('c')).over(window).alias('size_col')),
conf=arrow_udf_conf)

# ======= Test flat map group in Pandas =======

Expand Down
38 changes: 30 additions & 8 deletions integration_tests/src/main/python/url_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -26,7 +26,7 @@
r'(:[0-9]{1,3}){0,1}(/[a-zA-Z0-9]{1,3}){0,3}(\?[a-zA-Z0-9]{1,3}=[a-zA-Z0-9]{1,3}){0,1}(#([a-zA-Z0-9]{1,3})){0,1}'

url_pattern_with_key = r'((http|https|ftp|file)://)(([a-z]{1,3}\.){0,3}([a-z]{1,3})\.([a-z]{1,3}))' \
r'(:[0-9]{1,3}){0,1}(/[a-z]{1,3}){0,3}(\?key=[a-z]{1,3}){0,1}(#([a-z]{1,3})){0,1}'
r'(:[0-9]{1,3}){0,1}(/[a-z]{1,3}){0,3}(\?[a-c]{1,3}=[a-z]{1,3}(&[a-c]{1,3}=[a-z]{1,3}){0,3}){0,1}(#([a-z]{1,3})){0,1}'

edge_cases = [
"[email protected]/path?query=1#Ref",
Expand Down Expand Up @@ -150,8 +150,8 @@

supported_parts = ['PROTOCOL', 'HOST', 'QUERY']
unsupported_parts = ['PATH', 'REF', 'FILE', 'AUTHORITY', 'USERINFO']
supported_with_key_parts = ['PROTOCOL', 'HOST']
unsupported_with_key_parts = ['QUERY', 'PATH', 'REF', 'FILE', 'AUTHORITY', 'USERINFO']
supported_with_key_parts = ['PROTOCOL', 'HOST', 'QUERY']
unsupported_with_key_parts = ['PATH', 'REF', 'FILE', 'AUTHORITY', 'USERINFO']

@pytest.mark.parametrize('data_gen', [url_gen, edge_cases_gen], ids=idfn)
@pytest.mark.parametrize('part', supported_parts, ids=idfn)
Expand All @@ -166,16 +166,38 @@ def test_parse_url_unsupported_fallback(part):
lambda spark: unary_op_df(spark, url_gen).selectExpr("a", "parse_url(a, '" + part + "')"),
'ParseUrl')

def test_parse_url_query_with_key():
url_gen = StringGen(url_pattern_with_key)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, url_gen)
.selectExpr("a", "parse_url(a, 'QUERY', 'abc')", "parse_url(a, 'QUERY', 'a')")
)

def test_parse_url_query_with_key_column():
url_gen = StringGen(url_pattern_with_key)
key_gen = StringGen('[a-d]{1,3}')
assert_gpu_and_cpu_are_equal_collect(
lambda spark: two_col_df(spark, url_gen, key_gen)
.selectExpr("a", "parse_url(a, 'QUERY', b)")
)

@pytest.mark.parametrize('key', ['a?c', '*'], ids=idfn)
@allow_non_gpu('ProjectExec', 'ParseUrl')
def test_parse_url_query_with_key_regex_fallback(key):
url_gen = StringGen(url_pattern_with_key)
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, url_gen)
.selectExpr("a", "parse_url(a, 'QUERY', '" + key + "')"),
'ParseUrl')

@pytest.mark.parametrize('part', supported_with_key_parts, ids=idfn)
def test_parse_url_with_key(part):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, url_gen).selectExpr("parse_url(a, '" + part + "', 'key')"))



@allow_non_gpu('ProjectExec', 'ParseUrl')
@pytest.mark.parametrize('part', unsupported_with_key_parts, ids=idfn)
def test_parse_url_query_with_key_fallback(part):
def test_parse_url_with_key_fallback(part):
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, url_gen).selectExpr("parse_url(a, '" + part + "', 'key')"),
'ParseUrl')
'ParseUrl')
10 changes: 5 additions & 5 deletions integration_tests/src/test/resources/dates.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{ "number": "2020-09-16" }
{ "number": " 2020-09-16" }
{ "number": "2020-09-16 " }
{ "number": "1581-01-01" }
{ "number": "1583-01-01" }
{ 'number': '2020-09-16' }
{ 'number': ' 2020-09-16' }
{ 'number': '2020-09-16 ' }
{ 'number': '1581-01-01' }
{ 'number': '1583-01-01' }
8 changes: 7 additions & 1 deletion jenkins/dependency-check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ ARTIFACT_FILE=${1:-"/tmp/artifacts-list"}
SERVER_ID=${SERVER_ID:-"snapshots"}
SERVER_URL=${SERVER_URL:-"file:/tmp/local-release-repo"}
M2_CACHE=${M2_CACHE:-"/tmp/m2-cache"}
DEST_PATH=${DEST_PATH:-"/tmp/test-get-dest"}
rm -rf $DEST_PATH && mkdir -p $DEST_PATH

remote_maven_repo=$SERVER_ID::default::$SERVER_URL
# Get the spark-rapids-jni and spark-rapids-private jars from OSS Snapshot maven repo
Expand All @@ -43,5 +45,9 @@ if [ "$SERVER_ID" == "snapshots" ]; then
fi
while read line; do
artifact=$line # artifact=groupId:artifactId:version:[[packaging]:classifier]
mvn dependency:get -DremoteRepositories=$remote_maven_repo -Dmaven.repo.local=$M2_CACHE -Dartifact=$artifact
# Clean up $M2_CACHE to avoid side-effect of previous dependency:get
rm -rf $M2_CACHE/com/nvida
mvn -B dependency:get -DremoteRepositories=$remote_maven_repo -Dmaven.repo.local=$M2_CACHE -Dartifact=$artifact -Ddest=$DEST_PATH
done < $ARTIFACT_FILE

ls -l $DEST_PATH
15 changes: 11 additions & 4 deletions jenkins/spark-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -304,10 +304,17 @@ if [[ $TEST_MODE == "DEFAULT" ]]; then
PYSP_TEST_spark_shuffle_manager=com.nvidia.spark.rapids.${SHUFFLE_SPARK_SHIM}.RapidsShuffleManager \
./run_pyspark_from_build.sh

SPARK_SHELL_SMOKE_TEST=1 \
PYSP_TEST_spark_jars_packages=com.nvidia:rapids-4-spark_${SCALA_BINARY_VER}:${PROJECT_VER} \
PYSP_TEST_spark_jars_repositories=${PROJECT_REPO} \
./run_pyspark_from_build.sh
# As '--packages' only works on the default cuda11 jar, it does not support classifiers
# refer to issue : https://issues.apache.org/jira/browse/SPARK-20075
# "$CLASSIFIER" == ''" is usally for the case running by developers,
# while "$CLASSIFIER" == "cuda11" is for the case running on CI.
# We expect to run packages test for both cases
if [[ "$CLASSIFIER" == "" || "$CLASSIFIER" == "cuda11" ]]; then
SPARK_SHELL_SMOKE_TEST=1 \
PYSP_TEST_spark_jars_packages=com.nvidia:rapids-4-spark_${SCALA_BINARY_VER}:${PROJECT_VER} \
PYSP_TEST_spark_jars_repositories=${PROJECT_REPO} \
./run_pyspark_from_build.sh
if

# ParquetCachedBatchSerializer cache_test
PYSP_TEST_spark_sql_cache_serializer=com.nvidia.spark.ParquetCachedBatchSerializer \
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023, NVIDIA CORPORATION.
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,7 +79,8 @@ abstract class AbstractGpuJoinIterator(
*/
protected def setupNextGatherer(): Option[JoinGatherer]

protected def getFinalBatch(): Option[ColumnarBatch] = None
/** Whether to automatically call close() on this iterator when it is exhausted. */
protected val shouldAutoCloseOnExhaust: Boolean = true

override def hasNext: Boolean = {
if (closed) {
Expand Down Expand Up @@ -107,12 +108,9 @@ abstract class AbstractGpuJoinIterator(
}
}
}
if (nextCb.isEmpty) {
nextCb = getFinalBatch()
if (nextCb.isEmpty) {
// Nothing is left to return so close ASAP.
opTime.ns(close())
}
if (nextCb.isEmpty && shouldAutoCloseOnExhaust) {
// Nothing is left to return so close ASAP.
opTime.ns(close())
}
nextCb.isDefined
}
Expand Down
Loading

0 comments on commit 677374f

Please sign in to comment.