Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Two tables with the same key may be placed at different partitions by Table.hashPartition. #13000

Closed
firestarman opened this issue Mar 23, 2023 · 3 comments
Labels
bug Something isn't working

Comments

@firestarman
Copy link
Contributor

firestarman commented Mar 23, 2023

Describe the bug
Read the two files (on google drive and links are posted at the end of this section) into two tables individually, and you can find all the rows in the two tables have the same value (100) in the first column "byteF".

scala> spark.read.parquet("/data/tmp/d1.parquet").groupBy("byteF").count.show
+-----+------+
|byteF| count|
+-----+------+
|  100|299839|
+-----+------+

scala> spark.read.parquet("/data/tmp/d2.parquet").groupBy("byteF").count.show
+-----+------+
|byteF| count|
+-----+------+
|  100|125483|
+-----+------+

scala> spark.read.parquet("/data/tmp/d2.parquet").show(1)
+-----+------+----+--------------------+-------------+------------+--------+-------+--------------------+--------------------+----------+
|byteF|shortF|intF|               longF|       floatF|     doubleF|booleanF|   strF|               strF1|          timestampF|     dateF|
+-----+------+----+--------------------+-------------+------------+--------+-------+--------------------+--------------------+----------+
|  100|   615| 759|-3036077738068910150|-4.73701312E8|-5.2340516E8|   false|takeout|8113f26b-15bd-444...|2021-07-19 11:01:...|2021-07-21|
+-----+------+----+--------------------+-------------+------------+--------+-------+--------------------+--------------------+----------+
only showing top 1 row


scala> spark.read.parquet("/data/tmp/d1.parquet").show(1)
+-----+------+----+--------------------+------------+------------+--------+-------+--------------------+--------------------+----------+
|byteF|shortF|intF|               longF|      floatF|     doubleF|booleanF|   strF|               strF1|          timestampF|     dateF|
+-----+------+----+--------------------+------------+------------+--------+-------+--------------------+--------------------+----------+
|  100|     1| 186|-7539947440835918015|1.49581312E9|4.85257121E8|   false|takeout|1b30500b-d4fb-46c...|2021-09-03 16:01:...|2021-08-31|
+-----+------+----+--------------------+------------+------------+--------+-------+--------------------+--------------------+----------+
only showing top 1 row

When hash partitioning the two tables by the byteF column with the same hash function, partition number (16) and seed, we expect the two tables are in the same partition, but they are not. One is at partition 10, while the other is at 13. And the real partitions are as below.

parts1: [0,0,0,0,0,0,0,0,0,0,299839,299839,299839,299839,299839,299839]
parts2: [0,0,0,0,0,0,0,0,0,0,0,0,0,125483,125483,125483]

I failed to figure out a smaller dataset, so had to upload the files to google drive.
https://drive.google.com/file/d/1FtMlmeNDK2NM8wjjlkDuYkUR-1CyPD9p/view?usp=share_link
https://drive.google.com/file/d/1xg-JPIRgNv-a7FvIvEH4DqpjhUaZ778s/view?usp=share_link

Steps/Code to reproduce bug
Download the two parquet files above and put them under your local folder, e.g. "/data/tmp".
Add the below test into TableTest and build the native cudf library,

  @Test
  void testPartsDetermistic() {
    final int PARTS = 16;
    int[] parts1 = null, parts2 = null;
    int id1 = -1, id2 = -2; 
    try (Table start = Table.readParquet(new File("/data/tmp/d2.parquet"));
         PartitionedTable out = start.onColumns(0).hashPartition(HashType.MURMUR3,PARTS,0)) {
      parts1 = out.getPartitions();
    }
    try (Table start = Table.readParquet(new File("/data/tmp/d1.parquet"));
         PartitionedTable out = start.onColumns(0).hashPartition(HashType.MURMUR3,PARTS,0)) {
      parts2 = out.getPartitions();
    }

    for(int i= 0; i < parts1.length; i++) {
      if (parts1[i] > 0 && id1 < 0) id1 = i;
    }
    for(int i= 0; i < parts2.length; i++) {
      if (parts2[i] > 0 && id2 < 0) id2 = i;
    }
    System.out.println("==> id1= " + id1 + ", id2= " + id2);
    assertTrue(id1 == id2);
  }

then run

cd <cudf_root>/java
mvn test -Dtest=ai.rapids.cudf.TableTest#testPartsDetermistic

Expected behavior
The test should pass.

Environment details
OS Information
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=20.04
DISTRIB_CODENAME=focal
DISTRIB_DESCRIPTION="Ubuntu 20.04.3 LTS"
NAME="Ubuntu"
VERSION="20.04.3 LTS (Focal Fossa)"
ID=ubuntu
ID_LIKE=debian
PRETTY_NAME="Ubuntu 20.04.3 LTS"

 ***GPU Information***
 Thu Mar 23 08:55:24 2023
 +-----------------------------------------------------------------------------+
 | NVIDIA-SMI 520.61.05    Driver Version: 520.61.05    CUDA Version: 11.8     |
 |-------------------------------+----------------------+----------------------+
 | GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
 | Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
 |                               |                      |               MIG M. |
 |===============================+======================+======================|
 |   0  NVIDIA TITAN V      Off  | 00000000:01:00.0  On |                  N/A |
 | 30%   44C    P8    29W / 250W |     70MiB / 12288MiB |      0%      Default |
 |                               |                      |                  N/A |
 +-------------------------------+----------------------+----------------------+
 
 +-----------------------------------------------------------------------------+
 | Processes:                                                                  |
 |  GPU   GI   CI        PID   Type   Process name                  GPU Memory |
 |        ID   ID                                                   Usage      |
 |=============================================================================|
 |    0   N/A  N/A       999      G   /usr/lib/xorg/Xorg                 56MiB |
 |    0   N/A  N/A      1203      G   /usr/bin/gnome-shell               11MiB |
 +-----------------------------------------------------------------------------+
@firestarman firestarman added bug Something isn't working Needs Triage Need team to review and classify labels Mar 23, 2023
@davidwendt
Copy link
Contributor

davidwendt commented Mar 23, 2023

The type of byteF column in the d1.parquet file is int32 while the type of byteF column in d2.parquet is int8. So the hash values are different since one is hashing 4 bytes (MurmurHash3=3271117817) and the other is hashing only 1 byte (MurmurHash3=3310390828). The different hash values creates different buckets/partitions.

Using HASH_IDENTITY instead of HASH_MURMUR3 may work if you want to ignore the type difference.

@firestarman
Copy link
Contributor Author

firestarman commented Mar 24, 2023

Thanks for the info.

@revans2 What's your idea about this ? Are we good to use HASH_IDENTITY instead for our sub-partitioning ? Or choose one dynamically according to whether the keys' types are different if HASH_IDENTITY works ? Or fall back to non sub-partitioning path ?

This is from the bug #4039216 by QA.

@revans2
Copy link
Contributor

revans2 commented Mar 24, 2023

@firestarman in the short term I think it would be good to use the same hashing code that the plugin uses for HashPartitioning.

https://github.com/NVIDIA/spark-rapids/blob/3dbccb16d75bfb24d38a792badeafad03e0d5a8a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioningBase.scala#L37-L49

But with a different hash to avoid collisions. We know that this code works and that all of the odd corner cases have been covered.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants