Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add avro reader support [databricks] #4956

Merged
merged 37 commits into from
Mar 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2008c37
framework v1 can work
wbo4958 Mar 14, 2022
42ec470
basic avro read test
HaoYang670 Mar 15, 2022
c33799f
customize avro reader
wbo4958 Mar 14, 2022
e5884d1
temp save
HaoYang670 Mar 15, 2022
9f9b152
Merge remote-tracking branch 'wangbo/avro' into issue4935_avro_read_p…
HaoYang670 Mar 15, 2022
b9087f4
test avro read on basic types
HaoYang670 Mar 15, 2022
526dfac
format
wbo4958 Mar 15, 2022
fcad2f7
Add configs
wbo4958 Mar 15, 2022
c9ad9d3
add v2 support
wbo4958 Mar 15, 2022
9982351
temp save
HaoYang670 Mar 15, 2022
4b22d64
Merge remote-tracking branch 'wangbo/avro' into issue4935_avro_read_p…
HaoYang670 Mar 16, 2022
1e31ae1
test v2 support
HaoYang670 Mar 16, 2022
214b544
Merge pull request #1 from HaoYang670/issue4935_avro_read_primitive
wbo4958 Mar 16, 2022
7d4d000
add shims
wbo4958 Mar 16, 2022
d5be717
add doc
wbo4958 Mar 16, 2022
6d54e18
add avro section
wbo4958 Mar 16, 2022
a374545
fix deprecated error
wbo4958 Mar 16, 2022
2db2b9f
add db support
Mar 16, 2022
0e20926
fix premerge parallism run issue
wbo4958 Mar 17, 2022
7880759
add spark-avro dep for tests
wbo4958 Mar 17, 2022
8aef7cc
add avro dep for integration_tests
wbo4958 Mar 17, 2022
73d8e92
add avro dep for tests-spark310+
wbo4958 Mar 17, 2022
5342223
fix rapids require spark-avro issues
wbo4958 Mar 17, 2022
f362a25
Revert "add avro dep for tests-spark310+"
wbo4958 Mar 17, 2022
8fbf23d
Revert "add avro dep for integration_tests"
wbo4958 Mar 17, 2022
0388649
Revert "add spark-avro dep for tests"
wbo4958 Mar 17, 2022
aa36591
fix build error for 3.3.0
wbo4958 Mar 17, 2022
dccdb72
fix issue when no spark-avro jar
wbo4958 Mar 17, 2022
3d7a663
resolve comments
wbo4958 Mar 18, 2022
3917cb0
update doc
wbo4958 Mar 18, 2022
7a2214c
Merge remote-tracking branch 'upstream/branch-22.04' into avro
wbo4958 Mar 20, 2022
1b045a8
support db 3.2.1
wbo4958 Mar 20, 2022
087a576
modify ci
wbo4958 Mar 21, 2022
02cd1c8
Merge remote-tracking branch 'upstream/branch-22.04' into avro
wbo4958 Mar 22, 2022
0be4e9a
add INCLUDE_SPARK_AVRO_JAR
wbo4958 Mar 22, 2022
f4a8e31
fix premerge issue
wbo4958 Mar 22, 2022
1d538b5
fix run_pyspark_from_build issue
wbo4958 Mar 22, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
Expand Down
12 changes: 12 additions & 0 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,18 @@ parse some variants of `NaN` and `Infinity` even when this option is disabled
([SPARK-38060](https://issues.apache.org/jira/browse/SPARK-38060)). The RAPIDS Accelerator behavior is consistent with
Spark version 3.3.0 and later.

## Avro

The Avro format read is a very experimental feature which is expected to have some issues, so we disable
it by default. If you would like to test it, you need to enable `spark.rapids.sql.format.avro.enabled` and
`spark.rapids.sql.format.avro.read.enabled`.

Currently, the GPU accelerated Avro reader doesn't support reading the Avro version 1.2 files.

### Supported types

The boolean, byte, short, int, long, float, double, string are supported in current version.

## Regular Expressions

Regular expression evaluation on the GPU can potentially have high memory overhead and cause out-of-memory errors so
Expand Down
3 changes: 3 additions & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ Name | Description | Default Value
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
<a name="sql.fast.sample"></a>spark.rapids.sql.fast.sample|Option to turn on fast sample. If enable it is inconsistent with CPU sample because of GPU sample algorithm is inconsistent with CPU.|false
<a name="sql.format.avro.enabled"></a>spark.rapids.sql.format.avro.enabled|When set to true enables all avro input and output acceleration. (only input is currently supported anyways)|false
<a name="sql.format.avro.read.enabled"></a>spark.rapids.sql.format.avro.read.enabled|When set to true enables avro input acceleration|false
<a name="sql.format.csv.enabled"></a>spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true
<a name="sql.format.csv.read.enabled"></a>spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true
<a name="sql.format.json.enabled"></a>spark.rapids.sql.format.json.enabled|When set to true enables all json input and output acceleration. (only input is currently supported anyways)|false
Expand Down Expand Up @@ -390,6 +392,7 @@ Name | Description | Default Value | Notes
<a name="sql.input.JsonScan"></a>spark.rapids.sql.input.JsonScan|Json parsing|true|None|
<a name="sql.input.OrcScan"></a>spark.rapids.sql.input.OrcScan|ORC parsing|true|None|
<a name="sql.input.ParquetScan"></a>spark.rapids.sql.input.ParquetScan|Parquet parsing|true|None|
<a name="sql.input.AvroScan"></a>spark.rapids.sql.input.AvroScan|Avro parsing|true|None|

### Partitioning

Expand Down
43 changes: 43 additions & 0 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -17914,6 +17914,49 @@ dates or timestamps, or for a lack of type coercion support.
<th>UDT</th>
</tr>
<tr>
<th rowSpan="2">Avro</th>
<th>Read</th>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td>S</td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<th>Write</th>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<th rowSpan="2">CSV</th>
<th>Read</th>
<td>S</td>
Expand Down
9 changes: 7 additions & 2 deletions integration_tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,19 @@
<goals>
<goal>copy</goal>
</goals>
<configuration>
<useBaseVersion>true</useBaseVersion>
<configuration>
<useBaseVersion>true</useBaseVersion>
<artifactItems>
<artifactItem>
<groupId>ai.rapids</groupId>
<artifactId>cudf</artifactId>
<classifier>${cuda.version}</classifier>
</artifactItem>
<artifactItem>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</artifactItem>
</artifactItems>
</configuration>
</execution>
Expand Down
19 changes: 18 additions & 1 deletion integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,35 @@ else
# support alternate local jars NOT building from the source code
if [ -d "$LOCAL_JAR_PATH" ]; then
CUDF_JARS=$(echo "$LOCAL_JAR_PATH"/cudf-*.jar)
AVRO_JARS=$(echo "$LOCAL_JAR_PATH"/spark-avro*.jar)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
PLUGIN_JARS=$(echo "$LOCAL_JAR_PATH"/rapids-4-spark_*.jar)
# the integration-test-spark3xx.jar, should not include the integration-test-spark3xxtest.jar
TEST_JARS=$(echo "$LOCAL_JAR_PATH"/rapids-4-spark-integration-tests*-$SPARK_SHIM_VER.jar)
else
CUDF_JARS=$(echo "$SCRIPTPATH"/target/dependency/cudf-*.jar)
AVRO_JARS=$(echo "$SCRIPTPATH"/target/dependency/spark-avro*.jar)
PLUGIN_JARS=$(echo "$SCRIPTPATH"/../dist/target/rapids-4-spark_*.jar)
# the integration-test-spark3xx.jar, should not include the integration-test-spark3xxtest.jar
TEST_JARS=$(echo "$SCRIPTPATH"/target/rapids-4-spark-integration-tests*-$SPARK_SHIM_VER.jar)
fi

# `./run_pyspark_from_build.sh` runs all tests including avro_test.py with spark-avro.jar
# in the classpath.
#
# `./run_pyspark_from_build.sh -k xxx ` runs all xxx tests with spark-avro.jar in the classpath
#
# `INCLUDE_SPARK_AVRO_JAR=true ./run_pyspark_from_build.sh` run all tests (except the marker skipif())
# without spark-avro.jar
if [[ $( echo ${INCLUDE_SPARK_AVRO_JAR} | tr [:upper:] [:lower:] ) == "true" ]];
then
export INCLUDE_SPARK_AVRO_JAR=true
else
export INCLUDE_SPARK_AVRO_JAR=false
AVRO_JARS=""
fi

# Only 3 jars: cudf.jar dist.jar integration-test.jar
ALL_JARS="$CUDF_JARS $PLUGIN_JARS $TEST_JARS"
ALL_JARS="$CUDF_JARS $PLUGIN_JARS $TEST_JARS $AVRO_JARS"
Copy link
Collaborator

@firestarman firestarman Mar 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Better to check whether the avro jar exists. If not, set AVRO_JARS="" and CI_EXCLUDE_AVRO=true as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DONE

echo "AND PLUGIN JARS: $ALL_JARS"
if [[ "${TEST}" != "" ]];
then
Expand Down
90 changes: 90 additions & 0 deletions integration_tests/src/main/python/avro_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

from spark_session import with_cpu_session
import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from marks import *
from pyspark.sql.types import *

if os.environ.get('INCLUDE_SPARK_AVRO_JAR', 'false') == 'false':
pytestmark = pytest.mark.skip(reason=str("INCLUDE_SPARK_AVRO_JAR is disabled"))

support_gens = numeric_gens + [string_gen, boolean_gen]

_enable_all_types_conf = {
'spark.rapids.sql.format.avro.enabled': 'true',
'spark.rapids.sql.format.avro.read.enabled': 'true'}


@pytest.mark.parametrize('gen', support_gens)
@pytest.mark.parametrize('v1_enabled_list', ["avro", ""])
def test_basic_read(spark_tmp_path, gen, v1_enabled_list):
data_path = spark_tmp_path + '/AVRO_DATA'
with_cpu_session(
lambda spark: unary_op_df(spark, gen).write.format("avro").save(data_path)
)

all_confs = copy_and_update(_enable_all_types_conf, {
'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.format("avro").load(data_path),
conf=all_confs)


@pytest.mark.parametrize('v1_enabled_list', ["", "avro"])
def test_avro_simple_partitioned_read(spark_tmp_path, v1_enabled_list):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(support_gens)]
first_data_path = spark_tmp_path + '/AVRO_DATA/key=0/key2=20'
with_cpu_session(
lambda spark: gen_df(spark, gen_list).write.format("avro").save(first_data_path))
second_data_path = spark_tmp_path + '/AVRO_DATA/key=1/key2=21'
with_cpu_session(
lambda spark: gen_df(spark, gen_list).write.format("avro").save(second_data_path))
third_data_path = spark_tmp_path + '/AVRO_DATA/key=2/key2=22'
with_cpu_session(
lambda spark: gen_df(spark, gen_list).write.format("avro").save(third_data_path))

data_path = spark_tmp_path + '/AVRO_DATA'

all_confs = copy_and_update(_enable_all_types_conf, {
'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.format("avro").load(data_path),
conf=all_confs)


@pytest.mark.parametrize('v1_enabled_list', ["", "avro"])
def test_avro_input_meta(spark_tmp_path, v1_enabled_list):
first_data_path = spark_tmp_path + '/AVRO_DATA/key=0'
with_cpu_session(
lambda spark: unary_op_df(spark, long_gen).write.format("avro").save(first_data_path))
second_data_path = spark_tmp_path + '/AVRO_DATA/key=1'
with_cpu_session(
lambda spark: unary_op_df(spark, long_gen).write.format("avro").save(second_data_path))
data_path = spark_tmp_path + '/AVRO_DATA'

all_confs = copy_and_update(_enable_all_types_conf, {
'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.format("avro").load(data_path)
.filter(f.col('a') > 0)
.selectExpr('a',
'input_file_name()',
'input_file_block_start()',
'input_file_block_length()'),
conf=all_confs)
35 changes: 35 additions & 0 deletions jenkins/databricks/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ JACKSONANNOTATION=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive
HADOOPCOMMON=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.hadoop--hadoop-common--org.apache.hadoop__hadoop-common__2.7.4.jar
HADOOPMAPRED=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-${HADOOP_VERSION}--org.apache.hadoop--hadoop-mapreduce-client-core--org.apache.hadoop__hadoop-mapreduce-client-core__2.7.4.jar

if [[ $BASE_SPARK_VERSION == "3.2.1" ]]
then
AVROSPARKJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--vendor--avro--avro-hive-2.3__hadoop-3.2_2.12_deploy_shaded.jar
AVROMAPRED=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-3.2--org.apache.avro--avro-mapred--org.apache.avro__avro-mapred__1.10.2.jar
AVROJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-3.2--org.apache.avro--avro--org.apache.avro__avro__1.10.2.jar
else
AVROSPARKJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--vendor--avro--avro_2.12_deploy_shaded.jar
AVROMAPRED=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.avro--avro-mapred-hadoop2--org.apache.avro__avro-mapred-hadoop2__1.8.2.jar
AVROJAR=----workspace_${SPARK_MAJOR_VERSION_STRING}--maven-trees--hive-2.3__hadoop-2.7--org.apache.avro--avro--org.apache.avro__avro__1.8.2.jar
fi

# Please note we are installing all of these dependencies using the Spark version (SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS) to make it easier
# to specify the dependencies in the pom files

Expand All @@ -177,6 +188,30 @@ mvn -B install:install-file \
-Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \
-Dpackaging=jar

mvn -B install:install-file \
-Dmaven.repo.local=$M2DIR \
-Dfile=$JARDIR/$AVROSPARKJAR\
-DgroupId=org.apache.spark \
-DartifactId=spark-avro_$SCALA_VERSION \
-Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \
-Dpackaging=jar

mvn -B install:install-file \
-Dmaven.repo.local=$M2DIR \
-Dfile=$JARDIR/$AVROMAPRED\
-DgroupId=org.apache.avro\
-DartifactId=avro-mapred \
-Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \
-Dpackaging=jar

mvn -B install:install-file \
-Dmaven.repo.local=$M2DIR \
-Dfile=$JARDIR/$AVROJAR \
-DgroupId=org.apache.avro\
-DartifactId=avro \
-Dversion=$SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS \
-Dpackaging=jar

mvn -B install:install-file \
-Dmaven.repo.local=$M2DIR \
-Dfile=$JARDIR/$ANNOTJAR \
Expand Down
1 change: 1 addition & 0 deletions jenkins/spark-premerge-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ ci_2() {
TEST_PARALLEL=5 TEST='struct_test or time_window_test' ./integration_tests/run_pyspark_from_build.sh
TEST='not conditionals_test and not window_function_test and not struct_test and not time_window_test' \
./integration_tests/run_pyspark_from_build.sh
INCLUDE_SPARK_AVRO_JAR=true TEST='avro_test.py' ./integration_tests/run_pyspark_from_build.sh
}


Expand Down
24 changes: 24 additions & 0 deletions sql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
<groupId>com.google.flatbuffers</groupId>
<artifactId>flatbuffers-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<profiles>
Expand Down Expand Up @@ -119,6 +125,24 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids.shims

import com.nvidia.spark.rapids.RapidsMeta

import org.apache.spark.sql.avro.AvroOptions

object AvroUtils {

def tagSupport(
parsedOptions: AvroOptions,
meta: RapidsMeta[_, _, _]): Unit = {

}

}
Loading