Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1709] Create Iceberg Datasets Finder, Iceberg Dataset and FileSet to generate Copy Entities to support Distcp for Iceberg #3560

Merged
merged 12 commits into from
Sep 22, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class DatasetConstants {
public static final String PLATFORM_HIVE = "hive";
public static final String PLATFORM_SALESFORCE = "salesforce";
public static final String PLATFORM_MYSQL = "mysql";
public static final String PLATFORM_ICEBERG = "iceberg";

/** Common metadata */
public static final String BRANCH = "branch";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.jetbrains.annotations.NotNull;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.util.request_allocation.PushDownRequestor;

/**
* Iceberg dataset implementing {@link CopyableDataset}.
*/
@Slf4j
@Getter
public class IcebergDataset implements PrioritizedCopyableDataset {
private final String dbName;
private final String inputTableName;
private final IcebergTable icebergTable;
protected final Properties properties;
protected final FileSystem fs;

private final Optional<String> sourceMetastoreURI;
private final Optional<String> targetMetastoreURI;

/** Target metastore URI */
public static final String TARGET_METASTORE_URI_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.metastore.uri";
/** Target database name */
public static final String TARGET_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";

public IcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, FileSystem fs) {
this.dbName = db;
this.inputTableName = table;
this.icebergTable = icebergTbl;
this.properties = properties;
this.fs = fs;
this.sourceMetastoreURI =
Optional.fromNullable(this.properties.getProperty(IcebergDatasetFinder.ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY));
this.targetMetastoreURI =
Optional.fromNullable(this.properties.getProperty(TARGET_METASTORE_URI_KEY));
}

/**
* Represents a source {@link FileStatus} and a {@link Path} destination.
*/
@Data
private static class SourceAndDestination {
private final FileStatus source;
private final Path destination;
}

@Override
public String datasetURN() {
// TODO: verify!
return this.dbName + "." + this.inputTableName;
}

/**
* Finds all files read by the table and generates CopyableFiles.
* For the specific semantics see {@link #getCopyEntities}.
*/
@Override
public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, CopyConfiguration configuration) {
return getCopyEntities(configuration);
}
/**
* Finds all files read by the table and generates CopyableFiles.
* For the specific semantics see {@link #getCopyEntities}.
*/
@Override
public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, CopyConfiguration configuration,
Comparator<FileSet<CopyEntity>> prioritizer, PushDownRequestor<FileSet<CopyEntity>> requestor) {
// TODO: Implement PushDownRequestor and priority based copy entity iteration
return getCopyEntities(configuration);
}

/**
* Finds all files read by the table and generates {@link CopyEntity}s for duplicating the table.
*/
Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration configuration) {
FileSet<CopyEntity> fileSet = new IcebergTableFileSet(this.getInputTableName(), this, configuration);
return Iterators.singletonIterator(fileSet); }

/**
* Finds all files read by the table file set and generates {@link CopyEntity}s for duplicating the table.
*/
@VisibleForTesting
Collection<CopyEntity> generateCopyEntities(CopyConfiguration configuration) throws IOException {
String fileSet = this.getInputTableName();
List<CopyEntity> copyEntities = Lists.newArrayList();
Map<Path, FileStatus> pathToFileStatus = getFilePathsToFileStatus();
log.info("{}.{} - found {} candidate source paths", dbName, inputTableName, pathToFileStatus.size());

for (CopyableFile.Builder builder : getCopyableFilesFromPaths(pathToFileStatus, configuration)) {
CopyableFile fileEntity =
builder.fileSet(fileSet).datasetOutputPath(this.fs.getUri().getPath()).build();
fileEntity.setSourceData(getSourceDataset());
fileEntity.setDestinationData(getDestinationDataset());
copyEntities.add(fileEntity);
}
log.info("{}.{} - generated {} copy entities", dbName, inputTableName, copyEntities.size());
return copyEntities;
}

/**
* Get builders for a {@link CopyableFile} for each file path
*/
protected List<CopyableFile.Builder> getCopyableFilesFromPaths(Map<Path, FileStatus> pathToFileStatus, CopyConfiguration configuration) throws IOException {

List<CopyableFile.Builder> builders = Lists.newArrayList();
List<SourceAndDestination> dataFiles = Lists.newArrayList();
Configuration defaultHadoopConfiguration = new Configuration();
FileSystem actualSourceFs;

for (Map.Entry<Path, FileStatus> entry : pathToFileStatus.entrySet()) {
dataFiles.add(new SourceAndDestination(entry.getValue(), this.fs.makeQualified(entry.getKey())));
}

for (SourceAndDestination sourceAndDestination : dataFiles) {
actualSourceFs = sourceAndDestination.getSource().getPath().getFileSystem(defaultHadoopConfiguration);

// TODO: Add ancestor owner and permissions in future releases
builders.add(CopyableFile.fromOriginAndDestination(actualSourceFs, sourceAndDestination.getSource(),
sourceAndDestination.getDestination(), configuration));
}
return builders;
}
/**
* Finds all files of the Iceberg's current snapshot
* Returns a map of path, file status for each file that needs to be copied
*/
protected Map<Path, FileStatus> getFilePathsToFileStatus() throws IOException {
Map<Path, FileStatus> result = Maps.newHashMap();
IcebergTable icebergTable = this.getIcebergTable();
IcebergSnapshotInfo icebergSnapshotInfo = icebergTable.getCurrentSnapshotInfo();

log.info("{}.{} - loaded snapshot '{}' from metadata path: '{}'", dbName, inputTableName,
icebergSnapshotInfo.getSnapshotId(), icebergSnapshotInfo.getMetadataPath());
List<String> pathsToCopy = icebergSnapshotInfo.getAllPaths();

for (String pathString : pathsToCopy) {
Path path = new Path(pathString);
result.put(path, this.fs.getFileStatus(path));
}
return result;
}

DatasetDescriptor getSourceDataset() {
return getDatasetDescriptor(sourceMetastoreURI);
}

DatasetDescriptor getDestinationDataset() {
return getDatasetDescriptor(targetMetastoreURI);
}

@NotNull
private DatasetDescriptor getDatasetDescriptor(Optional<String> stringMetastoreURI) {
String destinationTable = this.getDbName() + "." + this.getInputTableName();

URI hiveMetastoreURI = null;
if (stringMetastoreURI.isPresent()) {
hiveMetastoreURI = URI.create(stringMetastoreURI.get());
}

DatasetDescriptor destinationDataset =
new DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG, hiveMetastoreURI, destinationTable);
destinationDataset.addMetadata(DatasetConstants.FS_URI, this.getFs().getUri().toString());
return destinationDataset;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.IterableDatasetFinder;
import org.apache.gobblin.util.HadoopUtils;

/**
* Finds {@link IcebergDataset}s. Will look for tables in a database using a {@link IcebergCatalog},
* and creates a {@link IcebergDataset} for each one.
*/
@Slf4j
@AllArgsConstructor
public class IcebergDatasetFinder implements IterableDatasetFinder<IcebergDataset> {

public static final String ICEBERG_DATASET_PREFIX = DatasetConstants.PLATFORM_ICEBERG + ".dataset";
public static final String ICEBERG_HIVE_CATALOG_METASTORE_URI_KEY = ICEBERG_DATASET_PREFIX + ".hive.metastore.uri";
public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name";
public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name";

private String dbName;
private String tblName;
private final Properties properties;
protected final FileSystem fs;

/**
* Finds all {@link IcebergDataset}s in the file system using the Iceberg Catalog.
* Both Iceberg database name and table name are mandatory based on current implementation.
* Later we may explore supporting datasets similar to Hive
* @return List of {@link IcebergDataset}s in the file system.
* @throws IOException
*/
@Override
public List<IcebergDataset> findDatasets() throws IOException {
List<IcebergDataset> matchingDatasets = new ArrayList<>();
if (StringUtils.isNotBlank(properties.getProperty(ICEBERG_DB_NAME)) || StringUtils.isNotBlank(properties.getProperty(ICEBERG_TABLE_NAME))) {
throw new IllegalArgumentException(String.format("Iceberg database name: {%s} or Iceberg table name: {%s} is missing",
ICEBERG_DB_NAME, ICEBERG_TABLE_NAME));
}
this.dbName = properties.getProperty(ICEBERG_DB_NAME);
this.tblName = properties.getProperty(ICEBERG_TABLE_NAME);

Configuration configuration = HadoopUtils.getConfFromProperties(properties);

IcebergCatalog icebergCatalog = IcebergCatalogFactory.create(configuration);
/* Each Iceberg dataset maps to an Iceberg table
* TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table
*/
matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog, properties, fs));
log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(), matchingDatasets, dbName, tblName);

return matchingDatasets;
}

@Override
public Path commonDatasetRoot() {
return new Path("/");
}

@Override
public Iterator<IcebergDataset> getDatasetsIterator() throws IOException {
return findDatasets().iterator();
}

protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;
import java.util.Collection;

import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.partition.FileSet;


/**
* A {@link FileSet} for Iceberg datasets containing information associated with an Iceberg table and generates {@link CopyEntity}
*/
public class IcebergTableFileSet extends FileSet<CopyEntity> {

private final CopyConfiguration copyConfiguration;
private final IcebergDataset icebergDataset;

public IcebergTableFileSet(String name, IcebergDataset icebergDataset, CopyConfiguration configuration) {
super(name, icebergDataset);
this.copyConfiguration = configuration;
this.icebergDataset = icebergDataset;
}

@Override
protected Collection<CopyEntity> generateCopyEntities() throws IOException {
return this.icebergDataset.generateCopyEntities(this.copyConfiguration);
}
}
Loading