Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Snapshot validation fails when running table compaction #9969

Open
gpathak128 opened this issue Nov 21, 2024 · 7 comments · May be fixed by #10033
Open

[Bug]: Snapshot validation fails when running table compaction #9969

gpathak128 opened this issue Nov 21, 2024 · 7 comments · May be fixed by #10033

Comments

@gpathak128
Copy link

What happened

I am running table compaction using Spark Actions. my spark action code is:

sparkActions
            .rewriteDataFiles(table)
            .option(RewriteDataFiles.PARTIAL_PROGRESS_ENABLED, "true")
            .option(RewriteDataFiles.PARTIAL_PROGRESS_MAX_COMMITS, "200")
            .option(RewriteDataFiles.USE_STARTING_SEQUENCE_NUMBER, "false")
            .option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.FILES_DESC.orderName())
            .option(RewriteDataFiles.TARGET_FILE_SIZE_BYTES, (1024 * 1024 * 1024).toString) // 1 GB
            .filter(Expressions.lessThan("watermark", "2024-11-17T00:00:00.000+00:00"))
            .filter(Expressions.greaterThanOrEqual("watermark", "2024-11-16T00:00:00.000+00:00"))
            .zOrder("watermark", "user_id")
            .execute()

The job runs fine, however none of the file groups are able to commit. the commits fail with the following error:

org.apache.iceberg.exceptions.ValidationException: Cannot determine history between starting snapshot 8855497291943185445 and the last known ancestor 1637454130654249874
	at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49) ~[__app__.jar:?]
	at org.apache.iceberg.MergingSnapshotProducer.validationHistory(MergingSnapshotProducer.java:805) ~[__app__.jar:?]
	at org.apache.iceberg.MergingSnapshotProducer.addedDeleteFiles(MergingSnapshotProducer.java:563) ~[__app__.jar:?]
	at org.apache.iceberg.MergingSnapshotProducer.validateNoNewDeletesForDataFiles(MergingSnapshotProducer.java:479) ~[__app__.jar:?]
	at org.apache.iceberg.MergingSnapshotProducer.validateNoNewDeletesForDataFiles(MergingSnapshotProducer.java:425) ~[__app__.jar:?]
	at org.apache.iceberg.BaseRewriteFiles.validate(BaseRewriteFiles.java:140) ~[__app__.jar:?]
	at org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:252) ~[__app__.jar:?]
	at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:403) ~[__app__.jar:?]
	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) ~[__app__.jar:?]
	at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219) ~[__app__.jar:?]
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203) ~[__app__.jar:?]
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) ~[__app__.jar:?]
	at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:401) ~[__app__.jar:?]
	at org.apache.iceberg.actions.RewriteDataFilesCommitManager.commitFileGroups(RewriteDataFilesCommitManager.java:92) ~[__app__.jar:?]
	at org.apache.iceberg.actions.RewriteDataFilesCommitManager.commitOrClean(RewriteDataFilesCommitManager.java:114) ~[__app__.jar:?]
	at org.apache.iceberg.actions.RewriteDataFilesCommitManager$CommitService.commitOrClean(RewriteDataFilesCommitManager.java:152) ~[__app__.jar:?]
	at org.apache.iceberg.actions.BaseCommitService.commitReadyCommitGroups(BaseCommitService.java:229) ~[__app__.jar:?]
	at org.apache.iceberg.actions.BaseCommitService.offer(BaseCommitService.java:151) ~[__app__.jar:?]
	at org.apache.iceberg.actions.RewriteDataFilesCommitManager$CommitService.offer(RewriteDataFilesCommitManager.java:144) ~[__app__.jar:?]
	at org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction.lambda$doExecuteWithPartialProgress$4(RewriteDataFilesSparkAction.java:347) ~[__app__.jar:?]
	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) ~[__app__.jar:?]
	at org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:315) ~[__app__.jar:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
	at java.lang.Thread.run(Thread.java:829) ~[?:?]
2024-11-21T15:02:04,134 INFO [Driver] actions.BaseCommitService: Closing commit service for copilot.copilot.messages waiting for all commits to finish
2024-11-21T15:02:04,228 ERROR [Driver] actions.RewriteDataFilesSparkAction: partial-progress.enabled is true but no rewrite commits succeeded. Check the logs to determine why the individual commits failed. If this is persistent it may help to increase partial-progress.max-commits which will break the rewrite operation into smaller commits.```



### How to reproduce it

Run compaction against an iceberg table. using spark actions. 
iceberg version: 1.6.0
catalog: nessie: 0.99.0

### Nessie server type (docker/uber-jar/built from source) and version

docker

### Client type (Ex: UI/Spark/pynessie ...) and version

Spark

### Additional information

The nessie catalog had 3 branches. I dropped 2 of the dev branches to ensure there is only 1 branch during comapction.
@snazy
Copy link
Member

snazy commented Nov 22, 2024

I tried to reproduce this, also by adding delete files into the mix into our Spark integration tests, which are executed against Nessie (with and without Iceberg REST), but no luck so far. See #9974

Can you provide a full reproducer - like all the statements that are necessary, because Run compaction against an iceberg table. using spark actions. is not sufficient as a reproducer.

@dimas-b
Copy link
Member

dimas-b commented Nov 22, 2024

@snazy as far as I understand (based on Zulip discussion) the key point is inserting via the Iceberg REST API, but compacting via Nessie API.

@gpathak128
Copy link
Author

the full spark action command is:

sparkActions
            .rewriteDataFiles(table)
            .option("partial-progress.enabled", "true")
            .option("target-file-size-bytes", (1024 * 1024 * 1024).toString)
            .filter(Expressions.greaterThanOrEqual("created_at", startMillis))
            .filter(Expressions.lessThanOrEqual("created_at", endMillis))
            .zOrder("watermark", "user_id")
            .execute()

@gpathak128
Copy link
Author

the spark settings are as follows:

SparkSession.builder()
      .config("spark.sql.session.timeZone", "UTC")
      .config(s"spark.sql.catalog.$catalogName", "org.apache.iceberg.spark.SparkCatalog")
      .config(s"spark.sql.catalog.$catalogName.type", "nessie")
      .config(s"spark.sql.catalog.$catalogName.ref", "main")
      .config(s"spark.sql.catalog.$catalogName.io-impl", "org.apache.iceberg.azure.adlsv2.ADLSFileIO")
      .config(s"spark.sql.catalog.$catalogName.uri", config.getString(s"spark.sql.$env.catalog.url"))
      .config(s"spark.sql.catalog.$catalogName.warehouse", config.getString(s"spark.sql.$env.catalog.warehouse-path"))
      .config(s"spark.sql.catalog.$catalogName.authentication.type", config.getString(s"spark.sql.$env.catalog.authType"))
      .config("spark.sql.defaultCatalog", config.getString("spark.sql.defaultCatalog"))
      .config("spark.sql.defaultSchema", config.getString("spark.sql.defaultSchema"))
      .config(s"spark.sql.catalog.$catalogName.adls.auth.shared-key.account.name", config.getString(s"spark.sql.$env.catalog.blob-access.account"))
      .config(s"spark.sql.catalog.$catalogName.adls.auth.shared-key.account.key", getAccessToken(env))
      .config(s"spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      .getOrCreate()
      ```

@snazy
Copy link
Member

snazy commented Nov 22, 2024

@gpathak128 we need the whole scenario - from CREATE TABLE via INSERT/UPDATE/DELETE to compaction.

@gpathak128
Copy link
Author

The table was created via trino (setup using the rest catalog settings):

CREATE TABLE corp.user_data (
                                          contains_pii boolean,
                                          watermark timestamp(3) WITH TIME ZONE,
                                          created_at bigint,
                                          <snipped other cols>
)
    WITH (

        partitioning =  ARRAY['hour(watermark)']

        )

Partitioning was changed a few times to add/remove user_id buckets.

there is a spark streaming job that is writing to the table continuously.

df
            .writeStream
            .format("iceberg")
            .outputMode("append")
            .option("fanout-enabled", "true")
            .trigger(Trigger.ProcessingTime(triggerIntervalValue))
            .option("checkpointLocation", outputCheckpointPath)
            .options(optionsValue)
            .toTable(s"${catalogName.get}.$schemaNameValue.$tableNameValue")

I am running a "rewriteDataFiles" compaction job based on the spark actions as described above.

@gpathak128
Copy link
Author

I was able to run compaction after stopping the streaming spark job that was writing to the same table. Looks like this is a multiple writer issue.

snazy added a commit to snazy/nessie that referenced this issue Dec 3, 2024
The current behavior of Nessie's Iceberg REST is to return only the most recent Iceberg snapshot. However, this seems to conflict with some Iceberg operations, which are not only maintenance operations, but rather related to "merge on read" / (equality) deletes.

This change changes Nessie's behavior by returning older snapshots from a load-table and update-table operations.

Register-table operation however do not change, because only the latest snapshot is actually imported. The behavior does change by returning an error if the table to be registered has more than 1 snapshots.

Fixes projectnessie#10013
Fixes projectnessie#9969
snazy added a commit to snazy/nessie that referenced this issue Dec 3, 2024
The current behavior of Nessie's Iceberg REST is to return only the most recent Iceberg snapshot. However, this seems to conflict with some Iceberg operations, which are not only maintenance operations, but rather related to "merge on read" / (equality) deletes.

This change changes Nessie's behavior by returning older snapshots from a load-table and update-table operations.

Register-table operation however do not change, because only the latest snapshot is actually imported. The behavior does change by returning an error if the table to be registered has more than 1 snapshots.

Fixes projectnessie#10013
Fixes projectnessie#9969
snazy added a commit to snazy/nessie that referenced this issue Dec 3, 2024
The current behavior of Nessie's Iceberg REST is to return only the most recent Iceberg snapshot. However, this seems to conflict with some Iceberg operations, which are not only maintenance operations, but rather related to "merge on read" / (equality) deletes.

This change changes Nessie's behavior by returning older snapshots from a load-table and update-table operations.

Register-table operation however do not change, because only the latest snapshot is actually imported. The behavior does change by returning an error if the table to be registered has more than 1 snapshots.

Fixes projectnessie#10013
Fixes projectnessie#9969
snazy added a commit to snazy/nessie that referenced this issue Dec 3, 2024
The current behavior of Nessie's Iceberg REST is to return only the most recent Iceberg snapshot. However, this seems to conflict with some Iceberg operations, which are not only maintenance operations, but rather related to "merge on read" / (equality) deletes.

This change changes Nessie's behavior by returning older snapshots from a load-table and update-table operations.

Register-table operation however do not change, because only the latest snapshot is actually imported. The behavior does change by returning an error if the table to be registered has more than 1 snapshots.

Fixes projectnessie#10013
Fixes projectnessie#9969
snazy added a commit to snazy/nessie that referenced this issue Dec 3, 2024
The current behavior of Nessie's Iceberg REST is to return only the most recent Iceberg snapshot. However, this seems to conflict with some Iceberg operations, which are not only maintenance operations, but rather related to "merge on read" / (equality) deletes.

This change changes Nessie's behavior by returning older snapshots from a load-table and update-table operations.

Register-table operation however do not change, because only the latest snapshot is actually imported. The behavior does change by returning an error if the table to be registered has more than 1 snapshots.

Fixes projectnessie#10013
Fixes projectnessie#9969
snazy added a commit to snazy/nessie that referenced this issue Dec 3, 2024
The current behavior of Nessie's Iceberg REST is to return only the most recent Iceberg snapshot. However, this seems to conflict with some Iceberg operations, which are not only maintenance operations, but rather related to "merge on read" / (equality) deletes.

This change changes Nessie's behavior by returning older snapshots from a load-table and update-table operations.

Register-table operation however do not change, because only the latest snapshot is actually imported. The behavior does change by returning an error if the table to be registered has more than 1 snapshots.

Fixes projectnessie#10013
Fixes projectnessie#9969
snazy added a commit to snazy/nessie that referenced this issue Dec 4, 2024
The current behavior of Nessie's Iceberg REST is to return only the most recent Iceberg snapshot. However, this seems to conflict with some Iceberg operations, which are not only maintenance operations, but rather related to "merge on read" / (equality) deletes.

This change changes Nessie's behavior by returning older snapshots from a load-table and update-table operations.

Register-table operation however do not change, because only the latest snapshot is actually imported. The behavior does change by returning an error if the table to be registered has more than 1 snapshots.

Fixes projectnessie#10013
Fixes projectnessie#9969
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants