Skip to content

Commit

Permalink
[GOBBLIN-1709] Create Iceberg Datasets Finder, Iceberg Dataset and Fi…
Browse files Browse the repository at this point in the history
…leSet to generate Copy Entities to support Distcp for Iceberg (apache#3560)

* initial commit for iceberg distcp.

* adding copy entity helper and icerbeg distcp template and test case.

* Adding unit tests and refactoring method definitions for an Iceberg dataset.

* resolve conflicts after cleaning history

* update iceberg dataset and finder to include javadoc

* addressed comments on PR and aligned code check style

* renamed vars, added logging and updated javadoc

* update dataset descriptor with ternary operation and rename fs to sourceFs

* added source and target fs and update iceberg dataset finder constructor

* Update source and dest dataset methods as protected and add req args constructor

* change the order of attributes for iceberg dataset finder ctor

* update iceberg dataset methods with correct source and target fs

Co-authored-by: Meeth Gala <[email protected]>
  • Loading branch information
2 people authored and phet committed Sep 22, 2022
1 parent ace81f7 commit 862f9f6
Show file tree
Hide file tree
Showing 6 changed files with 553 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class DatasetConstants {
public static final String PLATFORM_HIVE = "hive";
public static final String PLATFORM_SALESFORCE = "salesforce";
public static final String PLATFORM_MYSQL = "mysql";
public static final String PLATFORM_ICEBERG = "iceberg";

/** Common metadata */
public static final String BRANCH = "branch";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.jetbrains.annotations.NotNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.util.request_allocation.PushDownRequestor;

/**
* Iceberg dataset implementing {@link CopyableDataset}.
*/
@Slf4j
@Getter
public class IcebergDataset implements PrioritizedCopyableDataset {
private final String dbName;
private final String inputTableName;
private final IcebergTable icebergTable;
protected final Properties properties;
protected final FileSystem sourceFs;

private final Optional<String> sourceMetastoreURI;
private final Optional<String> targetMetastoreURI;

/** Target metastore URI */
public static final String TARGET_METASTORE_URI_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.metastore.uri";
/** Target database name */
public static final String TARGET_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";

public IcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, FileSystem sourceFs) {
this.dbName = db;
this.inputTableName = table;
this.icebergTable = icebergTbl;
this.properties = properties;
this.sourceFs = sourceFs;
this.sourceMetastoreURI =
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY));
this.targetMetastoreURI =
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
}

/**
* Represents a source {@link FileStatus} and a {@link Path} destination.
*/
@Data
private static class SourceAndDestination {
private final FileStatus source;
private final Path destination;
}

@Override
public String datasetURN() {
// TODO: verify!
return this.dbName + "." + this.inputTableName;
}

/**
* Finds all files read by the table and generates CopyableFiles.
* For the specific semantics see {@link #getCopyEntities}.
*/
@Override
public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, CopyConfiguration configuration) {
return getCopyEntities(targetFs, configuration);
}
/**
* Finds all files read by the table and generates CopyableFiles.
* For the specific semantics see {@link #getCopyEntities}.
*/
@Override
public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, CopyConfiguration configuration,
Comparator<FileSet<CopyEntity>> prioritizer, PushDownRequestor<FileSet<CopyEntity>> requestor) {
// TODO: Implement PushDownRequestor and priority based copy entity iteration
return getCopyEntities(targetFs, configuration);
}

/**
* Finds all files read by the table and generates {@link CopyEntity}s for duplicating the table.
*/
Iterator<FileSet<CopyEntity>> getCopyEntities(FileSystem targetFs, CopyConfiguration configuration) {
FileSet<CopyEntity> fileSet = new IcebergTableFileSet(this.getInputTableName(), this, targetFs, configuration);
return Iterators.singletonIterator(fileSet); }

/**
* Finds all files read by the table file set and generates {@link CopyEntity}s for duplicating the table.
*/
@VisibleForTesting
Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfiguration configuration) throws IOException {
String fileSet = this.getInputTableName();
List<CopyEntity> copyEntities = Lists.newArrayList();
Map<Path, FileStatus> pathToFileStatus = getFilePathsToFileStatus();
log.info("{}.{} - found {} candidate source paths", dbName, inputTableName, pathToFileStatus.size());

for (CopyableFile.Builder builder : getCopyableFilesFromPaths(pathToFileStatus, configuration, targetFs)) {
CopyableFile fileEntity =
builder.fileSet(fileSet).datasetOutputPath(targetFs.getUri().getPath()).build();
fileEntity.setSourceData(getSourceDataset(this.sourceFs));
fileEntity.setDestinationData(getDestinationDataset(targetFs));
copyEntities.add(fileEntity);
}
log.info("{}.{} - generated {} copy entities", dbName, inputTableName, copyEntities.size());
return copyEntities;
}

/**
* Get builders for a {@link CopyableFile} for each file path
*/
protected List<CopyableFile.Builder> getCopyableFilesFromPaths(Map<Path, FileStatus> pathToFileStatus, CopyConfiguration configuration, FileSystem targetFs) throws IOException {

List<CopyableFile.Builder> builders = Lists.newArrayList();
List<SourceAndDestination> dataFiles = Lists.newArrayList();
Configuration defaultHadoopConfiguration = new Configuration();
FileSystem actualSourceFs;

for (Map.Entry<Path, FileStatus> entry : pathToFileStatus.entrySet()) {
dataFiles.add(new SourceAndDestination(entry.getValue(), targetFs.makeQualified(entry.getKey())));
}

for (SourceAndDestination sourceAndDestination : dataFiles) {
actualSourceFs = sourceAndDestination.getSource().getPath().getFileSystem(defaultHadoopConfiguration);

// TODO: Add ancestor owner and permissions in future releases
builders.add(CopyableFile.fromOriginAndDestination(actualSourceFs, sourceAndDestination.getSource(),
sourceAndDestination.getDestination(), configuration));
}
return builders;
}
/**
* Finds all files of the Iceberg's current snapshot
* Returns a map of path, file status for each file that needs to be copied
*/
protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
Map<Path, FileStatus> result = Maps.newHashMap();
IcebergTable icebergTable = this.getIcebergTable();
IcebergSnapshotInfo icebergSnapshotInfo = icebergTable.getCurrentSnapshotInfo();

log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
icebergSnapshotInfo.getSnapshotId(), icebergSnapshotInfo.getMetadataPath());
List<String> pathsToCopy = icebergSnapshotInfo.getAllPaths();

for (String pathString : pathsToCopy) {
Path path = new Path(pathString);
result.put(path, this.sourceFs.getFileStatus(path));
}
return result;
}

protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
return getDatasetDescriptor(sourceMetastoreURI, sourceFs);
}

protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
return getDatasetDescriptor(targetMetastoreURI, targetFs);
}

@NotNull
private DatasetDescriptor getDatasetDescriptor(Optional<String> stringMetastoreURI, FileSystem fs) {
String currentTable = this.getDbName() + "." + this.getInputTableName();

URI hiveMetastoreURI = stringMetastoreURI.isPresent() ? URI.create(stringMetastoreURI.get()) : null;

DatasetDescriptor currentDataset =
new DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG, hiveMetastoreURI, currentTable);
currentDataset.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
return currentDataset;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.IterableDatasetFinder;
import org.apache.gobblin.util.HadoopUtils;

/**
* Finds {@link IcebergDataset}s. Will look for tables in a database using a {@link IcebergCatalog},
* and creates a {@link IcebergDataset} for each one.
*/
@Slf4j
@RequiredArgsConstructor
public class IcebergDatasetFinder implements IterableDatasetFinder<IcebergDataset> {

public static final String ICEBERG_DATASET_PREFIX = DatasetConstants.PLATFORM_ICEBERG + ".dataset";
public static final String ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY = ICEBERG_DATASET_PREFIX + ".hive.metastore.uri";
public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name";
public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name";

protected final FileSystem sourceFs;
private final Properties properties;

/**
* Finds all {@link IcebergDataset}s in the file system using the Iceberg Catalog.
* Both Iceberg database name and table name are mandatory based on current implementation.
* Later we may explore supporting datasets similar to Hive
* @return List of {@link IcebergDataset}s in the file system.
* @throws IOException
*/
@Override
public List<IcebergDataset> findDatasets() throws IOException {
List<IcebergDataset> matchingDatasets = new ArrayList<>();
if (StringUtils.isBlank(properties.getProperty(ICEBERG_DB_NAME)) || StringUtils.isBlank(properties.getProperty(ICEBERG_TABLE_NAME))) {
throw new IllegalArgumentException(String.format("Iceberg database name: {%s} or Iceberg table name: {%s} is missing",
ICEBERG_DB_NAME, ICEBERG_TABLE_NAME));
}
String dbName = properties.getProperty(ICEBERG_DB_NAME);
String tblName = properties.getProperty(ICEBERG_TABLE_NAME);

Configuration configuration = HadoopUtils.getConfFromProperties(properties);

IcebergCatalog icebergCatalog = IcebergCatalogFactory.create(configuration);
/* Each Iceberg dataset maps to an Iceberg table
* TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table
*/
matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog, properties, sourceFs));
log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(), matchingDatasets, dbName, tblName);

return matchingDatasets;
}

@Override
public Path commonDatasetRoot() {
return new Path("/");
}

@Override
public Iterator<IcebergDataset> getDatasetsIterator() throws IOException {
return findDatasets().iterator();
}

protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;
import java.util.Collection;

import org.apache.hadoop.fs.FileSystem;

import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.partition.FileSet;


/**
* A {@link FileSet} for Iceberg datasets containing information associated with an Iceberg table and generates {@link CopyEntity}
*/
public class IcebergTableFileSet extends FileSet<CopyEntity> {

private final CopyConfiguration copyConfiguration;
private final FileSystem targetFs;
private final IcebergDataset icebergDataset;

public IcebergTableFileSet(String name, IcebergDataset icebergDataset, FileSystem targetFs, CopyConfiguration configuration) {
super(name, icebergDataset);
this.copyConfiguration = configuration;
this.targetFs = targetFs;
this.icebergDataset = icebergDataset;
}

@Override
protected Collection<CopyEntity> generateCopyEntities() throws IOException {
return this.icebergDataset.generateCopyEntities(this.targetFs, this.copyConfiguration);
}
}
Loading

0 comments on commit 862f9f6

Please sign in to comment.