Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support bucketing write for GPU #10957

Merged
merged 18 commits into from
Jun 24, 2024

Conversation

firestarman
Copy link
Collaborator

@firestarman firestarman commented Jun 2, 2024

close #22

This PR adds the GPU support for the bucketing write.

  • React the code of the dynamic partition single writer and concurrent writer to try to reuse the code as much as possible, and then add in the bucketing write logic for both of them.
  • Update the bucket check during the plan overriding for the write commands, including InsertIntoHadoopFsRelationCommand, CreateDataSourceTableAsSelectCommand, InsertIntoHiveTable, CreateHiveTableAsSelectCommand.
  • From 330, Spark also supports HiveHash to generate the bucket IDs, in addition to Murmur3Hash. So the shim object GpuBucketingUtils is introduced to handle the shim things.
  • This change also adds two functions (tagForHiveBucketingWrite and tagForBucketing) to do the overriding check for the two hashing functions separately. And the Hive write nodes will fall back to CPU when HiveHash is chosen, because HiveHash is not supported on GPU.
  • This change also adds the basic tests for this new feature.

@firestarman
Copy link
Collaborator Author

firestarman commented Jun 2, 2024

Make it draft for early reviews and running tests on DB by ci.

@firestarman
Copy link
Collaborator Author

build

@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <[email protected]>
@firestarman
Copy link
Collaborator Author

build

@firestarman firestarman marked this pull request as ready for review June 4, 2024 01:31
Signed-off-by: Firestarman <[email protected]>
@firestarman
Copy link
Collaborator Author

build

@firestarman firestarman added the feature request New feature or request label Jun 11, 2024
@firestarman
Copy link
Collaborator Author

build

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly didn't make it though the entire patch. It is very large. I'll try to find time to finish it soon.

What performance and scale testing have we done with this?

@firestarman
Copy link
Collaborator Author

firestarman commented Jun 12, 2024

I honestly didn't make it though the entire patch. It is very large. I'll try to find time to finish it soon.

Yeah, it is big, thx a lot for the review.

What performance and scale testing have we done with this?

not yet, since I thought it is not necessary for feature PRs. But I am happy to run NDS with this if you prefer.

Signed-off-by: Firestarman <[email protected]>
Signed-off-by: Firestarman <[email protected]>
Signed-off-by: Firestarman <[email protected]>
Signed-off-by: Firestarman <[email protected]>
@firestarman
Copy link
Collaborator Author

build

@revans2
Copy link
Collaborator

revans2 commented Jun 12, 2024

not yet, since I thought it is not necessary for feature PRs. But I am happy to run NDS with this if you prefer.

I am not concerned much about NDS. We are not doing any bucketed writes/reads in NDS. If you think we should change that we can, but it would take some analysis to see how we wanted to do that. I am more concerned about how our performance compares to the CPU for similar situations. Especially for writes. We already know that reads should be much faster because we already are really good at joins and this should reduce the shuffle ahead of a join.

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still didn't get through everything, but I am getting closer.

Signed-off-by: Firestarman <[email protected]>
Signed-off-by: Firestarman <[email protected]>
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <[email protected]>
revans2
revans2 previously approved these changes Jun 13, 2024
Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I have made my way through all of the code now. But this is large enough I don't trust myself and would like at least one other person to review this too.

@firestarman
Copy link
Collaborator Author

build

@firestarman firestarman requested review from res-life and jlowe June 14, 2024 01:00
@revans2
Copy link
Collaborator

revans2 commented Jun 14, 2024

Looks like you had a few failures in databricks.

[2024-06-14T03:03:57.389Z] =================================== FAILURES ===================================
[2024-06-14T03:03:57.389Z] _____________________ test_buckets_write_fallback_for_map ______________________
[2024-06-14T03:03:57.389Z] [gw4] linux -- Python 3.8.10 /usr/bin/python
[2024-06-14T03:03:57.389Z] 
[2024-06-14T03:03:57.389Z] spark_tmp_path = '/tmp/pyspark_tests//0614-010744-asmgss3u-10-59-175-151-gw4-4457-1165034746/'
[2024-06-14T03:03:57.389Z] spark_tmp_table_factory = <conftest.TmpTableFactory object at 0x7f4e66d1d940>
[2024-06-14T03:03:57.389Z] 
[2024-06-14T03:03:57.389Z]     @allow_non_gpu('DataWritingCommandExec,ExecutedCommandExec,WriteFilesExec, SortExec')
[2024-06-14T03:03:57.389Z]     def test_buckets_write_fallback_for_map(spark_tmp_path, spark_tmp_table_factory):
[2024-06-14T03:03:57.389Z]         data_path = spark_tmp_path + '/ORC_DATA'
[2024-06-14T03:03:57.389Z]         gen_list = [["id", simple_string_to_string_map_gen], ["data", long_gen]]
[2024-06-14T03:03:57.389Z] >       assert_gpu_fallback_write(
[2024-06-14T03:03:57.389Z]             lambda spark, path: gen_df(spark, gen_list).selectExpr("id as b_id", "data").write
[2024-06-14T03:03:57.389Z]                 .bucketBy(4, "b_id").format('orc').mode('overwrite').option("path", path)
[2024-06-14T03:03:57.389Z]                 .saveAsTable(spark_tmp_table_factory.get()),
[2024-06-14T03:03:57.389Z]             lambda spark, path: spark.read.orc(path),
[2024-06-14T03:03:57.389Z]             data_path,
[2024-06-14T03:03:57.389Z]             'DataWritingCommandExec',
[2024-06-14T03:03:57.389Z]             conf={'spark.rapids.sql.format.orc.write.enabled': True})
[2024-06-14T03:03:57.389Z] 
[2024-06-14T03:03:57.389Z] ../../src/main/python/orc_write_test.py:237: 
[2024-06-14T03:03:57.389Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[2024-06-14T03:03:57.389Z] ../../src/main/python/asserts.py:364: in assert_gpu_fallback_write
[2024-06-14T03:03:57.389Z]     with_cpu_session(lambda spark : write_func(spark, cpu_path), conf=conf)
[2024-06-14T03:03:57.389Z] ../../src/main/python/spark_session.py:147: in with_cpu_session
[2024-06-14T03:03:57.389Z]     return with_spark_session(func, conf=copy)
[2024-06-14T03:03:57.389Z] /usr/lib/python3.8/contextlib.py:75: in inner
[2024-06-14T03:03:57.389Z]     return func(*args, **kwds)
[2024-06-14T03:03:57.389Z] ../../src/main/python/spark_session.py:131: in with_spark_session
[2024-06-14T03:03:57.389Z]     ret = func(_spark)
[2024-06-14T03:03:57.389Z] ../../src/main/python/asserts.py:364: in <lambda>
[2024-06-14T03:03:57.389Z]     with_cpu_session(lambda spark : write_func(spark, cpu_path), conf=conf)
[2024-06-14T03:03:57.389Z] ../../src/main/python/orc_write_test.py:238: in <lambda>
[2024-06-14T03:03:57.389Z]     lambda spark, path: gen_df(spark, gen_list).selectExpr("id as b_id", "data").write
[2024-06-14T03:03:57.390Z] /databricks/spark/python/pyspark/instrumentation_utils.py:48: in wrapper
[2024-06-14T03:03:57.390Z]     res = func(*args, **kwargs)
[2024-06-14T03:03:57.390Z] /databricks/spark/python/pyspark/sql/readwriter.py:1520: in saveAsTable
[2024-06-14T03:03:57.390Z]     self._jwrite.saveAsTable(name)
[2024-06-14T03:03:57.390Z] /databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321: in __call__
[2024-06-14T03:03:57.390Z]     return_value = get_return_value(
[2024-06-14T03:03:57.390Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[2024-06-14T03:03:57.390Z] 
[2024-06-14T03:03:57.390Z] a = ('xro639394', <py4j.clientserver.JavaClient object at 0x7f4eac089ee0>, 'o639393', 'saveAsTable')
[2024-06-14T03:03:57.390Z] kw = {}, converted = AnalysisException()
[2024-06-14T03:03:57.390Z] 
[2024-06-14T03:03:57.390Z]     def deco(*a: Any, **kw: Any) -> Any:
[2024-06-14T03:03:57.390Z]         try:
[2024-06-14T03:03:57.390Z]             return f(*a, **kw)
[2024-06-14T03:03:57.390Z]         except Py4JJavaError as e:
[2024-06-14T03:03:57.390Z]             converted = convert_exception(e.java_exception)
[2024-06-14T03:03:57.390Z]             if not isinstance(converted, UnknownException):
[2024-06-14T03:03:57.390Z]                 # Hide where the exception came from that shows a non-Pythonic
[2024-06-14T03:03:57.390Z]                 # JVM exception message.
[2024-06-14T03:03:57.390Z] >               raise converted from None
[2024-06-14T03:03:57.390Z] E               pyspark.errors.exceptions.AnalysisException: Invalid call to exprId on unresolved object
[2024-06-14T03:03:57.390Z] 
[2024-06-14T03:03:57.390Z] /databricks/spark/python/pyspark/errors/exceptions.py:234: AnalysisException
[2024-06-14T03:03:57.390Z] ---------------------------- Captured stderr setup -----------------------------
[2024-06-14T03:03:57.390Z] 2024-06-14 02:27:15 INFO     Running test 'src/main/python/orc_write_test.py::test_buckets_write_fallback_for_map[DATAGEN_SEED=1718328190, TZ=UTC, ALLOW_NON_GPU(DataWritingCommandExec,ExecutedCommandExec,WriteFilesExec, SortExec)]'
[2024-06-14T03:03:57.390Z] ------------------------------ Captured log setup ------------------------------
[2024-06-14T03:03:57.390Z] INFO     __pytest_worker_logger__:spark_init_internal.py:256 Running test 'src/main/python/orc_write_test.py::test_buckets_write_fallback_for_map[DATAGEN_SEED=1718328190, TZ=UTC, ALLOW_NON_GPU(DataWritingCommandExec,ExecutedCommandExec,WriteFilesExec, SortExec)]'
[2024-06-14T03:03:57.390Z] ----------------------------- Captured stdout call -----------------------------
[2024-06-14T03:03:57.390Z] ### CPU RUN ###
[2024-06-14T03:03:57.390Z] _____________________ test_buckets_write_fallback_for_map ______________________
[2024-06-14T03:03:57.390Z] [gw6] linux -- Python 3.8.10 /usr/bin/python
[2024-06-14T03:03:57.390Z] 
[2024-06-14T03:03:57.390Z] spark_tmp_path = '/tmp/pyspark_tests//0614-010744-asmgss3u-10-59-175-151-gw6-4465-1724103483/'
[2024-06-14T03:03:57.390Z] spark_tmp_table_factory = <conftest.TmpTableFactory object at 0x7fccdbee9ee0>
[2024-06-14T03:03:57.390Z] 
[2024-06-14T03:03:57.390Z]     @allow_non_gpu('DataWritingCommandExec,ExecutedCommandExec,WriteFilesExec, SortExec')
[2024-06-14T03:03:57.390Z]     def test_buckets_write_fallback_for_map(spark_tmp_path, spark_tmp_table_factory):
[2024-06-14T03:03:57.390Z]         data_path = spark_tmp_path + '/PARQUET_DATA'
[2024-06-14T03:03:57.390Z]         gen_list = [["id", simple_string_to_string_map_gen], ["data", long_gen]]
[2024-06-14T03:03:57.390Z] >       assert_gpu_fallback_write(
[2024-06-14T03:03:57.390Z]             lambda spark, path: gen_df(spark, gen_list).selectExpr("id as b_id", "data").write
[2024-06-14T03:03:57.390Z]                 .bucketBy(4, "b_id").format('parquet').mode('overwrite').option("path", path)
[2024-06-14T03:03:57.390Z]                 .saveAsTable(spark_tmp_table_factory.get()),
[2024-06-14T03:03:57.390Z]             lambda spark, path: spark.read.parquet(path),
[2024-06-14T03:03:57.390Z]             data_path,
[2024-06-14T03:03:57.390Z]             'DataWritingCommandExec',
[2024-06-14T03:03:57.390Z]             conf=writer_confs)
[2024-06-14T03:03:57.390Z] 
[2024-06-14T03:03:57.390Z] ../../src/main/python/parquet_write_test.py:462: 
[2024-06-14T03:03:57.390Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[2024-06-14T03:03:57.390Z] ../../src/main/python/asserts.py:364: in assert_gpu_fallback_write
[2024-06-14T03:03:57.390Z]     with_cpu_session(lambda spark : write_func(spark, cpu_path), conf=conf)
[2024-06-14T03:03:57.390Z] ../../src/main/python/spark_session.py:147: in with_cpu_session
[2024-06-14T03:03:57.390Z]     return with_spark_session(func, conf=copy)
[2024-06-14T03:03:57.390Z] /usr/lib/python3.8/contextlib.py:75: in inner
[2024-06-14T03:03:57.390Z]     return func(*args, **kwds)
[2024-06-14T03:03:57.390Z] ../../src/main/python/spark_session.py:131: in with_spark_session
[2024-06-14T03:03:57.390Z]     ret = func(_spark)
[2024-06-14T03:03:57.390Z] ../../src/main/python/asserts.py:364: in <lambda>
[2024-06-14T03:03:57.390Z]     with_cpu_session(lambda spark : write_func(spark, cpu_path), conf=conf)
[2024-06-14T03:03:57.390Z] ../../src/main/python/parquet_write_test.py:463: in <lambda>
[2024-06-14T03:03:57.390Z]     lambda spark, path: gen_df(spark, gen_list).selectExpr("id as b_id", "data").write
[2024-06-14T03:03:57.390Z] /databricks/spark/python/pyspark/instrumentation_utils.py:48: in wrapper
[2024-06-14T03:03:57.390Z]     res = func(*args, **kwargs)
[2024-06-14T03:03:57.390Z] /databricks/spark/python/pyspark/sql/readwriter.py:1520: in saveAsTable
[2024-06-14T03:03:57.390Z]     self._jwrite.saveAsTable(name)
[2024-06-14T03:03:57.390Z] /databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321: in __call__
[2024-06-14T03:03:57.390Z]     return_value = get_return_value(
[2024-06-14T03:03:57.390Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[2024-06-14T03:03:57.390Z] 
[2024-06-14T03:03:57.390Z] a = ('xro957695', <py4j.clientserver.JavaClient object at 0x7fccf3a2aee0>, 'o957694', 'saveAsTable')
[2024-06-14T03:03:57.390Z] kw = {}, converted = AnalysisException()
[2024-06-14T03:03:57.390Z] 
[2024-06-14T03:03:57.390Z]     def deco(*a: Any, **kw: Any) -> Any:
[2024-06-14T03:03:57.390Z]         try:
[2024-06-14T03:03:57.390Z]             return f(*a, **kw)
[2024-06-14T03:03:57.390Z]         except Py4JJavaError as e:
[2024-06-14T03:03:57.390Z]             converted = convert_exception(e.java_exception)
[2024-06-14T03:03:57.390Z]             if not isinstance(converted, UnknownException):
[2024-06-14T03:03:57.390Z]                 # Hide where the exception came from that shows a non-Pythonic
[2024-06-14T03:03:57.390Z]                 # JVM exception message.
[2024-06-14T03:03:57.390Z] >               raise converted from None
[2024-06-14T03:03:57.390Z] E               pyspark.errors.exceptions.AnalysisException: Invalid call to exprId on unresolved object
[2024-06-14T03:03:57.390Z] 
[2024-06-14T03:03:57.390Z] /databricks/spark/python/pyspark/errors/exceptions.py:234: AnalysisException
[2024-06-14T03:03:57.390Z] ---------------------------- Captured stderr setup -----------------------------
...
[2024-06-14T03:03:57.392Z] FAILED ../../src/main/python/orc_write_test.py::test_buckets_write_fallback_for_map[DATAGEN_SEED=1718328190, TZ=UTC, ALLOW_NON_GPU(DataWritingCommandExec,ExecutedCommandExec,WriteFilesExec, SortExec)] - pyspark.errors.exceptions.AnalysisException: Invalid call to exprId on unre...
[2024-06-14T03:03:57.392Z] FAILED ../../src/main/python/parquet_write_test.py::test_buckets_write_fallback_for_map[DATAGEN_SEED=1718328190, TZ=UTC, ALLOW_NON_GPU(DataWritingCommandExec,ExecutedCommandExec,WriteFilesExec, SortExec)] - pyspark.errors.exceptions.AnalysisException: Invalid call to exprId on unre...

@firestarman
Copy link
Collaborator Author

firestarman commented Jun 17, 2024

Looks like you had a few failures in databricks.

Yeah, checking it ...

It can be fixed by changing the MapGen to BinaryGen.

This is probably not a Rapids bug. I checked the error stack and it even did not go into any GPU code.

It seems to be related to the MapGen in Python. Maybe DB is doing some optimization for map data as the input from Python, but I am not 100% sure. There is also an issue related to MapGen. It is #10948.

Signed-off-by: Firestarman <[email protected]>
@firestarman
Copy link
Collaborator Author

build

@firestarman
Copy link
Collaborator Author

firestarman commented Jun 17, 2024

The lastest failure is due to the known issue #11070

@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <[email protected]>
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <[email protected]>
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <[email protected]>
@firestarman
Copy link
Collaborator Author

build

@firestarman firestarman merged commit 4b44903 into NVIDIA:branch-24.08 Jun 24, 2024
45 checks passed
SurajAralihalli pushed a commit to SurajAralihalli/spark-rapids that referenced this pull request Jul 12, 2024
This PR adds the GPU support for the bucketing write.

- React the code of the dynamic partition single writer and concurrent writer to try to reuse the code
   as much as possible, and then add in the bucketing write logic for both of them.
- Update the bucket check during the plan overriding for the write commands, including 
   InsertIntoHadoopFsRelationCommand, CreateDataSourceTableAsSelectCommand,
   InsertIntoHiveTable, CreateHiveTableAsSelectCommand.
- From 330, Spark also supports HiveHash to generate the bucket IDs, in addition to Murmur3Hash.
   So the shim object GpuBucketingUtils is introduced to handle the shim things.
- This change also adds two functions (tagForHiveBucketingWrite and tagForBucketing) to do the
   overriding check for the two hashing functions separately. And the Hive write nodes will fall back 
   to CPU when HiveHash is chosen, because HiveHash is not supported on GPU.


---------

Signed-off-by: Firestarman <[email protected]>
@firestarman firestarman deleted the write-bucketed branch August 6, 2024 01:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Add support for bucketed writes
3 participants