Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use C++ to parse and filter parquet footers. #5310

Merged
merged 11 commits into from
May 11, 2022
10 changes: 9 additions & 1 deletion build/buildall
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ function print_usage() {
echo " E.g., --module=integration_tests"
echo " -P=N, --parallel=N"
echo " Build in parallel, N (4 by default) is passed via -P to xargs"
echo " --install"
echo " Intall the resulting jar instead of just building it"
}

function bloopInstall() {
Expand Down Expand Up @@ -85,6 +87,8 @@ function bloopInstall() {
)
}

FINAL_OP="package"

while [[ "$1" != "" ]] ; do

case "$1" in
Expand Down Expand Up @@ -118,6 +122,10 @@ case "$1" in
SKIP_CLEAN="0"
;;

--install)
FINAL_OP="install"
;;

*)
echo >&2 "Unknown arg: $1"
print_usage
Expand Down Expand Up @@ -262,7 +270,7 @@ time (
# a negligible increase of the build time by ~2 seconds.
joinShimBuildFrom="aggregator"
echo "Resuming from $joinShimBuildFrom build only using $BASE_VER"
mvn package -rf $joinShimBuildFrom $MODULE_OPT $MVN_PROFILE_OPT $INCLUDED_BUILDVERS_OPT \
mvn $FINAL_OP -rf $joinShimBuildFrom $MODULE_OPT $MVN_PROFILE_OPT $INCLUDED_BUILDVERS_OPT \
-Dbuildver="$BASE_VER" \
-DskipTests -Dskip -Dmaven.javadoc.skip
)
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Name | Description | Default Value
<a name="sql.format.parquet.multiThreadedRead.maxNumFilesParallel"></a>spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.parquet.reader.type|2147483647
<a name="sql.format.parquet.multiThreadedRead.numThreads"></a>spark.rapids.sql.format.parquet.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small parquet files in parallel. This can not be changed at runtime after the executor has started. Used with COALESCING and MULTITHREADED reader, see spark.rapids.sql.format.parquet.reader.type.|20
<a name="sql.format.parquet.read.enabled"></a>spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true
<a name="sql.format.parquet.reader.footer.type"></a>spark.rapids.sql.format.parquet.reader.footer.type|In some cases reading the footer of the file is very expensive. Typically this happens when there are a large number of columns and relatively few of them are being read on a large number of files. This provides the ability to use a different path to parse and filter the footer. JAVA is the default and should match closely with Apache Spark. NATIVE will parse and filter the footer using C++. In the worst case this can be slower than JAVA, but not by much if anything. This is still a very experimental feature and there are known bugs and limitations. It should work for most cases when reading data that complies with the latest Parquet standard, but can run into issues for older data that does not fully comply with it.|JAVA
<a name="sql.format.parquet.reader.type"></a>spark.rapids.sql.format.parquet.reader.type|Sets the parquet reader type. We support different types that are optimized for different environments. The original Spark style reader can be selected by setting this to PERFILE which individually reads and copies files to the GPU. Loading many small files individually has high overhead, and using either COALESCING or MULTITHREADED is recommended instead. The COALESCING reader is good when using a local file system where the executors are on the same nodes or close to the nodes the data is being read on. This reader coalesces all the files assigned to a task into a single host buffer before sending it down to the GPU. It copies blocks from a single file into a host buffer in separate threads in parallel, see spark.rapids.sql.format.parquet.multiThreadedRead.numThreads. MULTITHREADED is good for cloud environments where you are reading from a blobstore that is totally separate and likely has a higher I/O read cost. Many times the cloud environments also get better throughput when you have multiple readers in parallel. This reader uses multiple threads to read each file in parallel and each file is sent to the GPU separately. This allows the CPU to keep reading while GPU is also doing work. See spark.rapids.sql.format.parquet.multiThreadedRead.numThreads and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel to control the number of threads and amount of memory used. By default this is set to AUTO so we select the reader we think is best. This will either be the COALESCING or the MULTITHREADED based on whether we think the file is in the cloud. See spark.rapids.cloudSchemes.|AUTO
<a name="sql.format.parquet.write.enabled"></a>spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true
<a name="sql.format.parquet.writer.int96.enabled"></a>spark.rapids.sql.format.parquet.writer.int96.enabled|When set to false, disables accelerated parquet write if the spark.sql.parquet.outputTimestampType is set to INT96|true
Expand Down
15 changes: 14 additions & 1 deletion integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,20 @@ def read_parquet_sql(data_path):
original_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'PERFILE'}
multithreaded_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'MULTITHREADED'}
coalesce_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'COALESCING'}
native_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'PERFILE',
'spark.rapids.sql.format.parquet.reader.footer.type': 'NATIVE'}
native_multithreaded_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'MULTITHREADED',
'spark.rapids.sql.format.parquet.reader.footer.type': 'NATIVE'}
native_coalesce_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'COALESCING',
'spark.rapids.sql.format.parquet.reader.footer.type': 'NATIVE'}

# For now the native configs are not compatible with spark.sql.parquet.writeLegacyFormat written files
# for nested types
reader_opt_confs = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf,
coalesce_parquet_file_reader_conf, native_parquet_file_reader_conf,
native_multithreaded_parquet_file_reader_conf, native_coalesce_parquet_file_reader_conf]

reader_opt_confs_no_native = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf,
coalesce_parquet_file_reader_conf]

@pytest.mark.parametrize('parquet_gens', parquet_gens_list, ids=idfn)
Expand Down Expand Up @@ -222,7 +235,7 @@ def test_ts_read_fails_datetime_legacy(gen, spark_tmp_path, ts_write, ts_rebase,
[ArrayGen(decimal_gen_32bit, max_length=10)],
[StructGen([['child0', decimal_gen_32bit]])]], ids=idfn)
@pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql])
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('reader_confs', reader_opt_confs_no_native)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_parquet_decimal_read_legacy(spark_tmp_path, parquet_gens, read_func, reader_confs, v1_enabled_list):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.avro.file.SeekableInput
import org.apache.avro.io.{BinaryData, BinaryDecoder, DecoderFactory}
import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream}

private class SeekableInputStream(in: SeekableInput) extends InputStream with SeekableInput {
private class AvroSeekableInputStream(in: SeekableInput) extends InputStream with SeekableInput {
var oneByte = new Array[Byte](1)

override def read(): Int = {
Expand Down Expand Up @@ -132,7 +132,7 @@ case class BlockInfo(blockStart: Long, blockLength: Long, blockDataSize: Long, c
* AvroDataFileReader parses the Avro file to get the header and all block information
*/
class AvroDataFileReader(si: SeekableInput) extends AutoCloseable {
private val sin = new SeekableInputStream(si)
private val sin = new AvroSeekableInputStream(si)
sin.seek(0) // seek to the start of file and get some meta info.
private var vin: BinaryDecoder = DecoderFactory.get.binaryDecoder(sin, vin);

Expand Down
Loading