Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-28518: Iceberg: Fix ClassCastException during in-place migration to Iceberg tables with timestamp columns #5590

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

ggangadharan
Copy link
Contributor

What changes were proposed in this pull request?

This fix improves the stability and reliability of in-place migrated Iceberg tables involving timestamp data types.

Why are the changes needed?

The issue occurred due to incorrect type casting in the timestamp handling logic, which caused the migrated Iceberg tables Fetch task to fail.

Does this PR introduce any user-facing change?

No

Is the change a dependency upgrade?

No

How was this patch tested?

Qtest - iceberg_inplace_migration_with_timestamp_column.q

@Aggarwal-Raghav
Copy link
Contributor

@ggangadharan , thanks for the PR. I have 1 question though.
I tried to run the q file without the patch with PARQEUT and ORC. I am seeing the difference in behaviour. In ORC, without the patch also it is working. is it because of the way ORC and parquet store date/time type?
Attaching screenshot for same.
With ORC:
ORC
With Parquet:
Parquet

@ggangadharan
Copy link
Contributor Author

Hi @Aggarwal-Raghav

Thank you for raising this question.

Upon investigation, it appears that the issue stems from how the IcebergRecordReader interprets the timestamp column for different file formats:

  • For ORC tables, the timestamp column is read as LOCALDATETIME.
  • For Parquet tables, the same column is read as OFFSETDATETIME.

Due to this discrepancy, we are encountering a ClassCastException when working with Parquet tables.

As you mentioned, I also believe the root cause lies in the underlying file format/Iceberg level.

I’ve attached a screenshot for reference.

HIVE-28518_DEBUG

Please let me know if you need further details or if we should take any additional steps to address this.

if it looks okay , Please review the PR.

@okumin
Copy link
Contributor

okumin commented Dec 27, 2024

I wonder why everything is ok when we directly create an Iceberg + Parquet table.

> CREATE TABLE test3(`id` int,`name` string,`dt` timestamp) stored by iceberg stored as parquet
> insert into test3 values (1, "test name" , cast('2024-08-09 14:08:26.326107' as timestamp));
> select * from test3;

@ggangadharan
Copy link
Contributor Author

Hi @okumin ,

Thank you for taking the time to review the pull request.

In the Iceberg Parquet table, the timestamp column is read as LOCALDATETIME. I’ve attached a screenshot for reference.

Screenshot 2024-12-28 at 8 18 14 AM

There is a notable difference in how the timestamp column is stored at the Parquet file format level. Specifically:

  • In Iceberg Parquet tables, the timestamp column is stored as INT64 L:TIMESTAMP(MICROS,false) .
  • In standard Parquet tables, the timestamp column is stored as INT96 .

For clarity, I’ve also included the metadata from Parquet-tools for reference.

As Iceberg Parquet table

file schema: table
------------------------------------------------------------------------------------------------------------------------------------------------------------
id:          OPTIONAL INT32 R:0 D:1
name:        OPTIONAL BINARY L:STRING R:0 D:1
dt:          OPTIONAL INT64 L:TIMESTAMP(MICROS,false) R:0 D:1

row group 1: RC:1 TS:112 OFFSET:4
------------------------------------------------------------------------------------------------------------------------------------------------------------
id:           INT32 SNAPPY DO:0 FPO:4 SZ:35/33/0.94 VC:1 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 1, num_nulls: 0]
name:         BINARY SNAPPY DO:0 FPO:39 SZ:44/42/0.95 VC:1 ENC:BIT_PACKED,RLE,PLAIN ST:[min: test name, max: test name, num_nulls: 0]
dt:           INT64 SNAPPY DO:0 FPO:83 SZ:39/37/0.95 VC:1 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 2024-08-09T14:08:26.326107, max: 2024-08-09T14:08:26.326107, num_nulls: 0]

As standard parquet table

file schema: hive_schema
------------------------------------------------------------------------------------------------------------------------------------------------------------
id:          OPTIONAL INT32 R:0 D:1
name:        OPTIONAL BINARY L:STRING R:0 D:1
dt:          OPTIONAL INT96 R:0 D:1

row group 1: RC:1 TS:137 OFFSET:4
------------------------------------------------------------------------------------------------------------------------------------------------------------
id:           INT32 UNCOMPRESSED DO:0 FPO:4 SZ:33/33/1.00 VC:1 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 1, num_nulls: 0]
name:         BINARY UNCOMPRESSED DO:0 FPO:37 SZ:42/42/1.00 VC:1 ENC:BIT_PACKED,RLE,PLAIN ST:[min: test name, max: test name, num_nulls: 0]
dt:           INT96 UNCOMPRESSED DO:79 FPO:110 SZ:62/62/1.00 VC:1 ENC:BIT_PACKED,RLE,PLAIN_DICTIONARY ST:[min: 0x78037C8D4C2E0000748B2500, max: 0x78037C8D4C2E0000748B2500, num_nulls: 0]

@okumin
Copy link
Contributor

okumin commented Dec 28, 2024

Thanks. I also remember Hive didn't follow the regular convention on encoding TIMESTAMP. I don't have an immediate idea on how to fix it

@ayushtkn
Copy link
Member

We should try to fix the regular convention on encoding TIMESTAMP, but that might not fix the case with existing tables, For those the fix in the current PR seems ok to me

@ggangadharan
Copy link
Contributor Author

@ayushtkn Thank you for the feedback.

Based on this, I believe the changes are in a good state to proceed unless there are further concerns.

@Aggarwal-Raghav @okumin Could you kindly review the code and share your feedback? Your insights would be greatly appreciated to help move this forward. If there are any questions or blockers, feel free to let me know.

Thank you for your time and support!

@okumin
Copy link
Contributor

okumin commented Jan 2, 2025

Thank you! It is obvious. So, is the remaining problem to verify INT96 can be compatible with Iceberg's TIMESTAMP, which means verifying other query engines or tools can read it as a timestamp. I am trying to check it.

@ggangadharan
Copy link
Contributor Author

ggangadharan commented Jan 2, 2025

@okumin Thanks for the update

Successfully read the migrated ICEBERG table (previously migrated from Hive) using spark.sql in Spark , and it worked as expected. Spark is reading the timestamp column as TimestampNTZType.

As per documentation - TimestampNTZType : Timestamp without time zone(TIMESTAMP_NTZ). It represents values comprising values of fields year, month, day, hour, minute, and second. All operations are performed without taking any time zone into account.

Ref - https://spark.apache.org/docs/latest/sql-ref-datatypes.html

Attaching spark3-shell output for a reference.

scala> spark.sql("DESCRIBE  TABLE formatted default.hive_28518_test").show(false)
+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |comment|
+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|id                          |int                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |null   |
|name                        |string                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |null   |
|dt                          |timestamp_ntz                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |null   |
|                            |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |       |
|# Metadata Columns          |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |       |
|_spec_id                    |int                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |       |
|_partition                  |struct<>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             |       |
|_file                       |string                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |       |
|_pos                        |bigint                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |       |
|_deleted                    |boolean                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |       |
|                            |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |       |
|# Detailed Table Information|                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     |       |
|Name                        |spark_catalog.default.hive_28518_test                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                |       |
|Type                        |MANAGED                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |       |
|Location                    |hdfs://ns1/warehouse/tablespace/external/hive/hive_28518_test                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        |       |
|Provider                    |iceberg                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |       |
|Table Properties            |[EXTERNAL=TRUE,MIGRATED_TO_ICEBERG=true,OBJCAPABILITIES=EXTREAD,EXTWRITE,current-snapshot-id=4868016704679265240,engine.hive.enabled=true,format=iceberg/parquet,format-version=2,iceberg.orc.files.only=false,last_modified_by=hive,last_modified_time=1735822103,schema.name-mapping.default=[ {\n  "field-id" : 1,\n  "names" : [ "id" ]\n}, {\n  "field-id" : 2,\n  "names" : [ "name" ]\n}, {\n  "field-id" : 3,\n  "names" : [ "dt" ]\n} ],storage_handler=org.apache.iceberg.mr.hive.HiveIcebergStorageHandler,table_type=ICEBERG,write.delete.mode=merge-on-read,write.format.default=parquet,write.merge.mode=merge-on-read,write.update.mode=merge-on-read]|       |
+----------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+


scala> spark.sql("select dt from default.hive_28518_test").show(10,false)
+--------------------------+
|dt                        |
+--------------------------+
|2024-08-09 14:08:26.326107|
+--------------------------+

FYI

While reading the string column name, I encountered an error that has been reported here . Since it is related to a spark/Iceberg issue, we can ignore it for now

scala> spark.sql("select name from default.hive_28518_test").show()
25/01/02 12:59:29 WARN  scheduler.TaskSetManager: [task-result-getter-3]: Lost task 0.0 in stage 5.0 (TID 11) (ccycloud-2.nightly7310-ec.root.comops.site executor 2): java.lang.UnsupportedOperationException: Unsupported type: UTF8String
	at org.apache.iceberg.arrow.vectorized.ArrowVectorAccessor.getUTF8String(ArrowVectorAccessor.java:81)
	at org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector.getUTF8String(IcebergArrowColumnVector.java:138)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:574)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1530)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:577)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

25/01/02 12:59:29 ERROR scheduler.TaskSetManager: [task-result-getter-2]: Task 0 in stage 5.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 14) (ccycloud-2.nightly7310-ec.root.comops.site executor 2): java.lang.UnsupportedOperationException: Unsupported type: UTF8String
	at org.apache.iceberg.arrow.vectorized.ArrowVectorAccessor.getUTF8String(ArrowVectorAccessor.java:81)
	at org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector.getUTF8String(IcebergArrowColumnVector.java:138)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:574)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1530)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:577)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
  at scala.Option.foreach(Option.scala:407)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2300)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2319)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4183)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3167)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4173)
  at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:527)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4171)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4171)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:3167)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:3388)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:290)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:329)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:815)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:774)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:783)
  ... 47 elided
Caused by: java.lang.UnsupportedOperationException: Unsupported type: UTF8String
  at org.apache.iceberg.arrow.vectorized.ArrowVectorAccessor.getUTF8String(ArrowVectorAccessor.java:81)
  at org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector.getUTF8String(IcebergArrowColumnVector.java:138)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
  at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
  at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
  at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
  at org.apache.spark.scheduler.Task.run(Task.scala:139)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:574)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1530)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:577)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants