Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Adding support for Delta to Iceberg migration #5331

Closed

Conversation

ericlgoodman
Copy link
Contributor

This is a WIP PR (adding unit tests and still testing edge cases) but wanted to put this out here to get initial feedback.

It adds support to migrate a Delta Lake table to an Iceberg table. Currently this will only create a new table and effectively compress the history of the table into a single commit. In the future, we can add functionality to optionally carry over the history of the Delta Lake table.

It also assumes that the user's current Delta Lake resides in what has now been configured to be an Iceberg catalog. Thinking of adding an optional destinationCatalog for scenarios where users are attempting to move between catalogs.

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments and questions. Also suggest, rebase and running the spotlessApply so discussion can be more on the fundamental aspects and less on nit style stuff.


private final long numFilesImported;

public BaseMigrateDeltaLakeTableActionResult(long numFilesImported) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to capture aspects like the total data size and how long the migration procedure took in the result?

@@ -293,7 +293,7 @@ private static PartitionSpec identitySpec(Schema schema, Collection<Column> colu
return identitySpec(schema, names);
}

private static PartitionSpec identitySpec(Schema schema, List<String> partitionNames) {
public static PartitionSpec identitySpec(Schema schema, List<String> partitionNames) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the change in access modifiers?

.collect(Collectors.toList());
}

private static PartitionSpec getPartitionSpecFromDeltaSnapshot(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get what the function is doing but does it need to be static?

}
}

private static List<SparkTableUtil.SparkPartition> getSparkPartitionsFromDeltaSnapshot(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same, does it need to be static?

Comment on lines 384 to 386
boolean checkDuplicateFiles,
PartitionSpec nullableSpec,
List<SparkPartition> nullablePartitions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get we need to pass in the computed partition spec and spark partitions, but don't think we should change any public API signatures, we can add a new one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, avoid changing existing public methods, we can do something like:

public static void importSparkTable(SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable,
                                      String stagingDir, Map<String, String> partitionFilter,
                                      boolean checkDuplicateFiles) {
  PartitionSpec spec = SparkSchemaUtil.specForTable(spark, sourceTableIdentWithDB.unquotedString());
  ...
  nullablePartitions(...,  spec, sourceTablePartitions)
      
}

public static void importSparkTable(SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable,
                                      String stagingDir, Map<String, String> partitionFilter,
                                      boolean checkDuplicateFiles) {
                                      boolean checkDuplicateFiles,
                                      PartitionSpec spec,
                                      List<SparkPartition> partitions) {
   ...
}

Comment on lines 178 to 278
return updatedSnapshot.getAllFiles()
.stream()
// Map each partition to the list of files within it
.collect(Collectors.groupingBy(AddFile::getPartitionValues))
.entrySet()
.stream()
.map(entry -> {
// We don't care what value we take since they will all have the same prefix.
// The arbitrary file will have a path that looks like "partition1/partition2/file.parquet,
// We're interested in the part prior to the filename
AddFile addFile = entry.getValue().get(0);
String pathBeforeFileName = addFile.getPath().substring(0, addFile.getPath().lastIndexOf("/"));
String fullPath = new Path(deltaLogPath, pathBeforeFileName).toString();

return new SparkTableUtil.SparkPartition(
entry.getKey(), // Map containing name and values of partitions
fullPath,
// Delta tables only support parquet
"parquet"
);
}
)
.collect(Collectors.toList());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we parallelize this computation? For example, concurrently across files update a grouping based on partition values. Could be overkill (depends on the number of files in the snapshot)

@ericlgoodman
Copy link
Contributor Author

Adding here my primary concern with this PR - and in general a concern going forward with Spark and using multiple tables such as Delta Lake and Iceberg.

Spark reads tables through whatever catalog is located at the first part of a table's identifier. There can only be 1 catalog per identifier, and different catalogs have different capabilities. For example, the DeltaCatalog can read Delta Lake and generic Hive tables, and the SparkSessionCatalog can read Iceberg + Hive tables.

In theory, in order to read from multiple table types in one Spark session, a user would initialize a DeltaCatalog, at say, delta and then the SparkSessionCatalog at iceberg. Then all their Delta Lake tables would be located at delta.my_delta_database.my_delta_lake_table and all their Iceberg tables at iceberg.my_iceberg_database.my_iceberg_table. Unfortunately, this doesn't work out of the box. Both of these catalog implementations are designed to be used by overriding the default Spark catalog, which is located at spark_catalog. CatalogExtension, from which DeltaCatalog and SparkSessionCatalog both inherit from, contains a method setDelegateCatalog(CatalogPlugin delegate). As the Javadoc reads:

 /**
   * This will be called only once by Spark to pass in the Spark built-in session catalog, after
   * {@link #initialize(String, CaseInsensitiveStringMap)} is called.
   */
  void setDelegateCatalog(CatalogPlugin delegate);

A user can fix this issue by manually calling this method during Spark setup and setting the delegate to the one in the default Spark catalog. But most users presumably are not doing this, and some users might face difficulty depending on their service provider and how much abstraction/configuration has been taken away from them during setup.

This basically means that in today's world, it doesn't seem realistic that users currently have a simple way to use one Spark session to read/migrate between different table types. This solution might make make sense to implement first, as users may find that a Delta/Iceberg/Hudi table makes sense for them in one context but another one is preferable in another.

When it comes to migration, there are basically two options:

  1. Create a more abstract Catalog implementation that can read Iceberg/Delta/Hudi/Hive tables dynamically, similar to what happens in the Trino Hive connector. The connector inspects the table properties and determines at runtime whether to redirect to another connector. Similarly, a Spark catalog could simply delegate to specific catalogs if it sees certain table type specific properties.
  2. Provide an easier method for users to not have to override the default catalog for these table type specific catalog implementations. If the Delta catalog was located at delta, and Iceberg at iceberg, then users could just keep their different table types in different catalogs and migration could take an optional parameter of the new desired catalog.

Copy link
Contributor

@jackye1995 jackye1995 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So based on my understanding, the difference of this action vs the existing migrate for Hive table is that it uses DeltaLog to get partition spec, schema and partitions of the table. However, I don't see the logic of reproducing the Delta transactions as Iceberg snapshots, which is the key value of such migration.

I understand the challenge of doing this under the context of Spark, but I think we can actually do better and do not rely on Spark context at all. What we can do is to have a BaseMigrateDeltaLakeTableAction that takes in a icebergCatalog instance and a Delta Lake table location. With those 2 information, we can fully replay the DeltaLog information and create an Iceberg table out of that.

Then we can seek for a Spark-based implementation, where the icebergCatalog can be retrieved from SparkSessionCatalog.icebergCatalog(), and location can be retrieved from a Delta table name + Spark default catalog's Delta table serde property path

What do you think?

@@ -63,6 +63,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
implementation("org.apache.parquet:parquet-column")
implementation("org.apache.parquet:parquet-hadoop")

implementation ("io.delta:delta-standalone_${scalaVersion}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make this compileOnly, not everyone wants this as a part of their Spark runtime.

io.delta.standalone.Snapshot updatedSnapshot,
StructType structType
) {
Type converted = SparkTypeVisitor.visit(structType, new SparkTypeToType(structType));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use SparkSchemaUtil.convert


private void renameAndBackupSourceTable() {
try {
LOG.info("Renaming {} as {} for backup", identifier, backupIdent);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Renaming Delta Lake table ..."

Comment on lines 384 to 386
boolean checkDuplicateFiles,
PartitionSpec nullableSpec,
List<SparkPartition> nullablePartitions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, avoid changing existing public methods, we can do something like:

public static void importSparkTable(SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable,
                                      String stagingDir, Map<String, String> partitionFilter,
                                      boolean checkDuplicateFiles) {
  PartitionSpec spec = SparkSchemaUtil.specForTable(spark, sourceTableIdentWithDB.unquotedString());
  ...
  nullablePartitions(...,  spec, sourceTablePartitions)
      
}

public static void importSparkTable(SparkSession spark, TableIdentifier sourceTableIdent, Table targetTable,
                                      String stagingDir, Map<String, String> partitionFilter,
                                      boolean checkDuplicateFiles) {
                                      boolean checkDuplicateFiles,
                                      PartitionSpec spec,
                                      List<SparkPartition> partitions) {
   ...
}

@@ -616,6 +621,11 @@ public static Dataset<Row> loadMetadataTable(SparkSession spark, Table table, Me
return Dataset.ofRows(spark, DataSourceV2Relation.create(metadataTable, Some.empty(), Some.empty(), options));
}

public static String getIcebergMetadataLocation(Table table) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is hard coded across many places, I think we can raise a separated PR just to clean that up

"Cannot find any partitions in table %s", sourceTableIdent);
importSparkPartitions(spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles);
importSparkPartitions(spark, partitions, targetTable, nullableSpec, stagingDir, checkDuplicateFiles);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer to not change variable name if possible

@ericlgoodman ericlgoodman marked this pull request as draft August 23, 2022 16:27
@github-actions github-actions bot added the data label Aug 23, 2022
@@ -121,7 +121,7 @@ private MigrateTable.Result doExecute() {

Some<String> backupNamespace = Some.apply(backupIdent.namespace()[0]);
TableIdentifier v1BackupIdent = new TableIdentifier(backupIdent.name(), backupNamespace);
String stagingLocation = getMetadataLocation(icebergTable);
String stagingLocation = SparkTableUtil.getIcebergMetadataLocation(icebergTable);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to remove this change

@@ -124,7 +124,7 @@ private SnapshotTable.Result doExecute() {
ensureNameMappingPresent(icebergTable);

TableIdentifier v1TableIdent = v1SourceTable().identifier();
String stagingLocation = getMetadataLocation(icebergTable);
String stagingLocation = SparkTableUtil.getIcebergMetadataLocation(icebergTable);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

String nameMappingString = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
NameMapping nameMapping =
nameMappingString != null ? NameMappingParser.fromJson(nameMappingString) : null;
return TableMigrationUtil.getParquetMetrics(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should do a check to make sure the file is indeed Parquet

return spec == null ? PartitionSpec.unpartitioned() : spec;
}

private StructType getStructTypeFromDeltaSnapshot() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just convert directly from Delta schema to Iceberg schema? Here we ended up converting to Spark and then convert to Iceberg, and as a result you had to make a few Spark classes and methods public.

private final String deltaTableLocation;
private final Identifier newIdentifier;

MigrateDeltaLakeTableSparkAction(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a class BaseMigrateDeltaLakeTableActon? Because lots of logic could be shared when we want to extend this feature to other engines like Flink and Trino.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and that class can live in iceberg-core library.

@JonasJ-ap
Copy link
Contributor

Hi @ericlgoodman. My name is Rushan Jiang, a CS undergrad at CMU. I am interested in learning and contributing to this migration support. I saw you did not update this PR for some time. Would you mind allowing me to continue your work?

I appreciate your time and consideration.

@ericlgoodman
Copy link
Contributor Author

Hi @ericlgoodman. My name is Rushan Jiang, a CS undergrad at CMU. I am interested in learning and contributing to this migration support. I saw you did not update this PR for some time. Would you mind allowing me to continue your work?

I appreciate your time and consideration.

Followed up with you on the Iceberg Slack.

@jackye1995
Copy link
Contributor

Close since the PR is merged.

@jackye1995 jackye1995 closed this Mar 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants