Skip to content

Commit

Permalink
WIP: Delta to Iceberg
Browse files Browse the repository at this point in the history
Rewritten logic for handling copying of transaction data
  • Loading branch information
ericlgoodman committed Aug 23, 2022
1 parent 0c96b9c commit 9d5e53f
Show file tree
Hide file tree
Showing 10 changed files with 642 additions and 324 deletions.
60 changes: 26 additions & 34 deletions api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,66 +16,58 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

import org.apache.iceberg.Table;

/**
* An API that should be implemented by query engine integrations for providing actions.
*/
/** An API that should be implemented by query engine integrations for providing actions. */
public interface ActionsProvider {

/**
* Instantiates an action to snapshot an existing table as a new Iceberg table.
*/
/** Instantiates an action to snapshot an existing table as a new Iceberg table. */
default SnapshotTable snapshotTable(String sourceTableIdent) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement snapshotTable");
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement snapshotTable");
}

/**
* Instantiates an action to migrate an existing table to Iceberg.
*/
/** Instantiates an action to migrate an existing table to Iceberg. */
default MigrateTable migrateTable(String tableIdent) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement migrateTable");
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement migrateTable");
}

default MigrateDeltaLakeTable migrateDeltaLakeTable(String tableIdent) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement migrateDeltaLakeTable");
/** Instantiates an action to delete orphan files. */
default MigrateDeltaLakeTable migrateDeltaLakeTable(String tableIdent, String deltaS3Location) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement migrateDeltaLakeTable");
}

/**
* Instantiates an action to delete orphan files.
*/
/** Instantiates an action to delete orphan files. */
default DeleteOrphanFiles deleteOrphanFiles(Table table) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement deleteOrphanFiles");
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement deleteOrphanFiles");
}

/**
* Instantiates an action to rewrite manifests.
*/
/** Instantiates an action to rewrite manifests. */
default RewriteManifests rewriteManifests(Table table) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement rewriteManifests");
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement rewriteManifests");
}

/**
* Instantiates an action to rewrite data files.
*/
/** Instantiates an action to rewrite data files. */
default RewriteDataFiles rewriteDataFiles(Table table) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement rewriteDataFiles");
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement rewriteDataFiles");
}

/**
* Instantiates an action to expire snapshots.
*/
/** Instantiates an action to expire snapshots. */
default ExpireSnapshots expireSnapshots(Table table) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement expireSnapshots");
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement expireSnapshots");
}

/**
* Instantiates an action to delete all the files reachable from given metadata location.
*/
/** Instantiates an action to delete all the files reachable from given metadata location. */
default DeleteReachableFiles deleteReachableFiles(String metadataLocation) {
throw new UnsupportedOperationException(this.getClass().getName() + " does not implement deleteReachableFiles");
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement deleteReachableFiles");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

import java.util.Map;

/**
* Migrates a Delta Lake table to Iceberg in place.
*/
public interface MigrateDeltaLakeTable extends Action<MigrateDeltaLakeTable, MigrateDeltaLakeTable.Result> {
/** Migrates a Delta Lake table to Iceberg in place. */
public interface MigrateDeltaLakeTable
extends Action<MigrateDeltaLakeTable, MigrateDeltaLakeTable.Result> {

MigrateDeltaLakeTable tableProperties(Map<String, String> properties);

interface Result {

/**
* Returns the number of imported data files.
*/
/** Returns the number of imported data files. */
long importedDataFilesCount();
}
}
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ allprojects {
repositories {
mavenCentral()
mavenLocal()
// TODO: remove once Delta Lake 2.1.0 is officially released
maven {
name = 'staging-repo'
url = 'https://oss.sonatype.org/content/repositories/iodelta-1087/'
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.actions;

public class BaseMigrateDeltaLakeTableActionResult implements MigrateDeltaLakeTable.Result {

private final long numFilesImported;
private final long importedDataFilesCount;

public BaseMigrateDeltaLakeTableActionResult(long numFilesImported) {
this.numFilesImported = numFilesImported;
public BaseMigrateDeltaLakeTableActionResult(long importedDataFilesCount) {
this.importedDataFilesCount = importedDataFilesCount;
}

@Override
public long importedDataFilesCount() {
return numFilesImported;
return importedDataFilesCount;
}
}
141 changes: 77 additions & 64 deletions data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.data;

import java.io.IOException;
Expand Down Expand Up @@ -53,73 +52,92 @@ public class TableMigrationUtil {
private static final PathFilter HIDDEN_PATH_FILTER =
p -> !p.getName().startsWith("_") && !p.getName().startsWith(".");

private TableMigrationUtil() {
}
private TableMigrationUtil() {}

/**
* Returns the data files in a partition by listing the partition location.
* <p>
* For Parquet and ORC partitions, this will read metrics from the file footer. For Avro partitions,
* metrics are set to null.
* <p>
* Note: certain metrics, like NaN counts, that are only supported by iceberg file writers but not file footers,
* will not be populated.
*
* <p>For Parquet and ORC partitions, this will read metrics from the file footer. For Avro
* partitions, metrics are set to null.
*
* <p>Note: certain metrics, like NaN counts, that are only supported by iceberg file writers but
* not file footers, will not be populated.
*
* @param partition partition key, e.g., "a=1/b=2"
* @param uri partition location URI
* @param format partition format, avro, parquet or orc
* @param spec a partition spec
* @param conf a Hadoop conf
* @param uri partition location URI
* @param format partition format, avro, parquet or orc
* @param spec a partition spec
* @param conf a Hadoop conf
* @param metricsConfig a metrics conf
* @param mapping a name mapping
* @param mapping a name mapping
* @return a List of DataFile
*/
public static List<DataFile> listPartition(Map<String, String> partition, String uri, String format,
PartitionSpec spec, Configuration conf, MetricsConfig metricsConfig,
NameMapping mapping) {
public static List<DataFile> listPartition(
Map<String, String> partition,
String uri,
String format,
PartitionSpec spec,
Configuration conf,
MetricsConfig metricsConfig,
NameMapping mapping) {
return listPartition(partition, uri, format, spec, conf, metricsConfig, mapping, 1);
}

public static List<DataFile> listPartition(Map<String, String> partitionPath, String partitionUri, String format,
PartitionSpec spec, Configuration conf, MetricsConfig metricsSpec,
NameMapping mapping, int parallelism) {
public static List<DataFile> listPartition(
Map<String, String> partitionPath,
String partitionUri,
String format,
PartitionSpec spec,
Configuration conf,
MetricsConfig metricsSpec,
NameMapping mapping,
int parallelism) {
ExecutorService service = null;
try {
String partitionKey = spec.fields().stream()
String partitionKey =
spec.fields().stream()
.map(PartitionField::name)
.map(name -> String.format("%s=%s", name, partitionPath.get(name)))
.collect(Collectors.joining("/"));

Path partition = new Path(partitionUri);
FileSystem fs = partition.getFileSystem(conf);
List<FileStatus> fileStatus = Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
List<FileStatus> fileStatus =
Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
.filter(FileStatus::isFile)
.collect(Collectors.toList());
DataFile[] datafiles = new DataFile[fileStatus.size()];
Tasks.Builder<Integer> task = Tasks.range(fileStatus.size())
.stopOnFailure()
.throwFailureWhenFinished();
Tasks.Builder<Integer> task =
Tasks.range(fileStatus.size()).stopOnFailure().throwFailureWhenFinished();

if (parallelism > 1) {
service = migrationService(parallelism);
task.executeWith(service);
}

if (format.contains("avro")) {
task.run(index -> {
Metrics metrics = getAvroMetrics(fileStatus.get(index).getPath(), conf);
datafiles[index] = buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "avro");
});
task.run(
index -> {
Metrics metrics = getAvroMetrics(fileStatus.get(index).getPath(), conf);
datafiles[index] =
buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "avro");
});
} else if (format.contains("parquet")) {
task.run(index -> {
Metrics metrics = getParquetMetrics(fileStatus.get(index).getPath(), conf, metricsSpec, mapping);
datafiles[index] = buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "parquet");
});
task.run(
index -> {
Metrics metrics =
getParquetMetrics(fileStatus.get(index).getPath(), conf, metricsSpec, mapping);
datafiles[index] =
buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "parquet");
});
} else if (format.contains("orc")) {
task.run(index -> {
Metrics metrics = getOrcMetrics(fileStatus.get(index).getPath(), conf, metricsSpec, mapping);
datafiles[index] = buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "orc");
});
task.run(
index -> {
Metrics metrics =
getOrcMetrics(fileStatus.get(index).getPath(), conf, metricsSpec, mapping);
datafiles[index] =
buildDataFile(fileStatus.get(index), partitionKey, spec, metrics, "orc");
});
} else {
throw new UnsupportedOperationException("Unknown partition format: " + format);
}
Expand All @@ -133,56 +151,51 @@ public static List<DataFile> listPartition(Map<String, String> partitionPath, St
}
}

private static Metrics getAvroMetrics(Path path, Configuration conf) {
private static Metrics getAvroMetrics(Path path, Configuration conf) {
try {
InputFile file = HadoopInputFile.fromPath(path, conf);
long rowCount = Avro.rowCount(file);
return new Metrics(rowCount, null, null, null, null);
} catch (UncheckedIOException e) {
throw new RuntimeException("Unable to read Avro file: " +
path, e);
throw new RuntimeException("Unable to read Avro file: " + path, e);
}
}

private static Metrics getParquetMetrics(Path path, Configuration conf,
MetricsConfig metricsSpec, NameMapping mapping) {
public static Metrics getParquetMetrics(
Path path, Configuration conf, MetricsConfig metricsSpec, NameMapping mapping) {
try {
InputFile file = HadoopInputFile.fromPath(path, conf);
return ParquetUtil.fileMetrics(file, metricsSpec, mapping);
} catch (UncheckedIOException e) {
throw new RuntimeException("Unable to read the metrics of the Parquet file: " +
path, e);
throw new RuntimeException("Unable to read the metrics of the Parquet file: " + path, e);
}
}

private static Metrics getOrcMetrics(Path path, Configuration conf,
MetricsConfig metricsSpec, NameMapping mapping) {
private static Metrics getOrcMetrics(
Path path, Configuration conf, MetricsConfig metricsSpec, NameMapping mapping) {
try {
return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(path, conf),
metricsSpec, mapping);
return OrcMetrics.fromInputFile(HadoopInputFile.fromPath(path, conf), metricsSpec, mapping);
} catch (UncheckedIOException e) {
throw new RuntimeException("Unable to read the metrics of the Orc file: " +
path, e);
throw new RuntimeException("Unable to read the metrics of the Orc file: " + path, e);
}
}

private static DataFile buildDataFile(FileStatus stat, String partitionKey,
PartitionSpec spec, Metrics metrics, String format) {
return DataFiles.builder(spec)
.withPath(stat.getPath().toString())
.withFormat(format)
.withFileSizeInBytes(stat.getLen())
.withMetrics(metrics)
.withPartitionPath(partitionKey)
.build();
private static DataFile buildDataFile(
FileStatus stat, String partitionKey, PartitionSpec spec, Metrics metrics, String format) {
return DataFiles.builder(spec)
.withPath(stat.getPath().toString())
.withFormat(format)
.withFileSizeInBytes(stat.getLen())
.withMetrics(metrics)
.withPartitionPath(partitionKey)
.build();
}

private static ExecutorService migrationService(int concurrentDeletes) {
return MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(
concurrentDeletes,
new ThreadFactoryBuilder()
.setNameFormat("table-migration-%d")
.build()));
(ThreadPoolExecutor)
Executors.newFixedThreadPool(
concurrentDeletes,
new ThreadFactoryBuilder().setNameFormat("table-migration-%d").build()));
}
}
Loading

0 comments on commit 9d5e53f

Please sign in to comment.