Skip to content

Commit

Permalink
Merge pull request trinodb#3 from yihua/hudi-plugin-multiple-splits
Browse files Browse the repository at this point in the history
Add splits to split source
  • Loading branch information
yihua authored Dec 22, 2021
2 parents 20c5f00 + c4728b0 commit dd52c4d
Show file tree
Hide file tree
Showing 10 changed files with 311 additions and 116 deletions.
4 changes: 4 additions & 0 deletions plugin/trino-hudi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.DataSize;
import org.apache.hudi.common.model.HoodieFileFormat;

import javax.validation.constraints.NotNull;
Expand All @@ -26,6 +27,8 @@ public class HudiConfig
{
private HoodieFileFormat fileFormat = PARQUET;
private boolean metadataEnabled;
private boolean splitInSource;
private DataSize maxSplitSize = DataSize.ofBytes(128 * 1024 * 1024);

@NotNull
public HoodieFileFormat getFileFormat()
Expand Down Expand Up @@ -53,4 +56,31 @@ public boolean isMetadataEnabled()
{
return this.metadataEnabled;
}

@Config("hudi.max_split_size")
public HudiConfig setMaxSplitSize(DataSize size)
{
this.maxSplitSize = size;
return this;
}

@NotNull
public DataSize getMaxSplitSize()
{
return this.maxSplitSize;
}

@Config("hudi.split_in_source")
@ConfigDescription("Whether to split files in the HudiSplitSource. If false, done in HudiSplitManager.")
public HudiConfig setSplitInSource(boolean splitInSource)
{
this.splitInSource = splitInSource;
return this;
}

@NotNull
public boolean isSplitInSource()
{
return this.splitInSource;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.trino.plugin.hudi;

import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.session.PropertyMetadata;
Expand All @@ -24,6 +25,7 @@

import java.util.List;

import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.enumProperty;

Expand All @@ -32,6 +34,8 @@ public class HudiSessionProperties
{
private static final String FILE_FORMAT = "file_format";
private static final String METADATA_ENABLED = "metadata_enabled";
private static final String MAX_SPLIT_SIZE = "max_split_size";
private static final String SPLIT_IN_SOURCE = "split_in_source";

private final List<PropertyMetadata<?>> sessionProperties;

Expand All @@ -49,7 +53,17 @@ public HudiSessionProperties(HudiConfig hudiConfig)
METADATA_ENABLED,
"For Hudi tables prefer to fetch the list of files from its metadata",
hudiConfig.isMetadataEnabled(),
false));
false),
booleanProperty(
SPLIT_IN_SOURCE,
"Whether to split files in the HudiSplitSource. If false, done in HudiSplitManager.",
hudiConfig.isSplitInSource(),
false),
dataSizeProperty(
MAX_SPLIT_SIZE,
"Max split size",
hudiConfig.getMaxSplitSize(),
true));
}

@Override
Expand All @@ -67,4 +81,14 @@ public static boolean isHudiMetadataEnabled(ConnectorSession session)
{
return session.getProperty(METADATA_ENABLED, Boolean.class);
}

public static DataSize getMaxSplitSize(ConnectorSession session)
{
return session.getProperty(MAX_SPLIT_SIZE, DataSize.class);
}

public static boolean isSplitInSource(ConnectorSession session)
{
return session.getProperty(SPLIT_IN_SOURCE, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,34 +36,32 @@
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;

import javax.inject.Inject;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.regex.Pattern;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hive.util.ConfigurationUtils.toJobConf;
import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeys;
import static io.trino.plugin.hudi.HudiSessionProperties.isHudiMetadataEnabled;
import static io.trino.plugin.hudi.HudiUtil.getMetaClient;
import static io.trino.plugin.hudi.HudiUtil.getPartitionSchema;
import static io.trino.plugin.hudi.HudiUtil.isHudiParquetInputFormat;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_LOCATION;
import static org.apache.hudi.common.table.timeline.TimelineUtils.getPartitionsWritten;

public class HudiSplitManager
implements ConnectorSplitManager
{
public static final Pattern HOODIE_CONSUME_MODE_PATTERN_STRING = Pattern.compile("hoodie\\.(.*)\\.consume\\.mode");
private static final Logger log = Logger.get(HudiSplitManager.class);

private final HudiTransactionManager transactionManager;
Expand Down Expand Up @@ -93,16 +91,16 @@ public ConnectorSplitSource getSplits(
Table table = metastore.getTable(identity, tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));
HdfsEnvironment.HdfsContext context = new HdfsEnvironment.HdfsContext(session);
FileSystem fs = null;
try {
fs = hdfsEnvironment.getFileSystem(context, new Path(table.getStorage().getLocation()));
}
catch (IOException e) {
e.printStackTrace();
}
Configuration conf = hdfsEnvironment.getConfiguration(context, new Path(table.getStorage().getLocation()));
String tablePath = table.getStorage().getLocation();
Configuration conf = hdfsEnvironment.getConfiguration(context, new Path(tablePath));
Map<String, String> valByRegex = conf.getValByRegex(HOODIE_CONSUME_MODE_PATTERN_STRING.pattern());
log.debug("Hoodie consume mode: " + valByRegex);
HoodieTableMetaClient metaClient = hudiTable.getMetaClient().orElseGet(() -> getMetaClient(conf, hudiTable.getBasePath()));
List<String> partitionValues = getPartitionsWritten(metaClient.getActiveTimeline());
log.debug("HudiSplitManager ref: " + this.toString());
log.debug("Table ref: " + table.toString());
log.debug("HoodieTableMetaClient ref: " + metaClient.toString());
log.debug("HoodieTableMetaClient base path: " + metaClient.getBasePath());
log.warn("Fetched partitions from Hudi: " + partitionValues);
hudiTable.getPartitions().ifPresent(p -> p.forEach(p1 -> log.warn("Partitions from TableHandle: " + p1)));

Expand All @@ -111,8 +109,7 @@ public ConnectorSplitSource getSplits(
.collect(toImmutableList());
log.warn("Column Names: " + columnNames);
HudiSplitSource splitSource;
String tablePath = table.getStorage().getLocation();
Optional<FileStatus[]> fileStatuses = Optional.empty();
Map<String, List<HivePartitionKey>> partitionMap = new HashMap<>();
if (!columnNames.isEmpty()) {
List<List<String>> partitionNames = metastore.getPartitionNamesByFilter(identity, tableName.getSchemaName(), tableName.getTableName(), columnNames, TupleDomain.all())
.orElseThrow(() -> new TableNotFoundException(hudiTable.getSchemaTableName()))
Expand All @@ -121,49 +118,32 @@ public ConnectorSplitSource getSplits(
.collect(toImmutableList());
log.warn("Partition Names: " + partitionNames);

Optional<Partition> partition = metastore.getPartition(identity, table, partitionNames.get(0));

log.warn("Fetched partitions from Metastore: " + partition.get());
Properties schema = getPartitionSchema(table, partition);
String dataDir = schema.getProperty(META_TABLE_LOCATION);
log.warn("Partition schema: " + schema);

List<HivePartitionKey> partitionKeys = getPartitionKeys(table, partition);
partitionKeys.forEach(p -> log.warn("Fetched partitions from HiveUtil: " + p));

fileStatuses = getFileStatuses(fs, conf, dataDir, fileStatuses, schema);
splitSource = new HudiSplitSource(hudiTable, conf, partitionKeys, isHudiMetadataEnabled(session), fileStatuses, tablePath, dataDir);
for (List<String> partitionName : partitionNames) {
Optional<Partition> partition1 = metastore.getPartition(identity, table, partitionName);
Properties schema1 = getPartitionSchema(table, partition1);
String dataDir1 = schema1.getProperty(META_TABLE_LOCATION);
log.warn(">>> basePath: %s, dataDir1: %s", tablePath, dataDir1);
String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(tablePath), new Path(dataDir1));
List<HivePartitionKey> partitionKeys1 = getPartitionKeys(table, partition1);
partitionMap.putIfAbsent(relativePartitionPath, partitionKeys1);
partitionKeys1.forEach(p -> log.warn(">>> Fetched partitions from HiveUtil: " + p));
}
}
else {
// no partitions, so data dir is same as table path
Properties schema = getPartitionSchema(table, Optional.empty());
fileStatuses = getFileStatuses(fs, conf, tablePath, fileStatuses, schema);
splitSource = new HudiSplitSource(hudiTable, conf, ImmutableList.of(), isHudiMetadataEnabled(session), fileStatuses, tablePath, tablePath);
partitionMap.put("", ImmutableList.of());
}

log.debug("Partition map: " + partitionMap);
splitSource = new HudiSplitSource(session, hudiTable, conf, partitionMap,
isHudiMetadataEnabled(session), dynamicFilter);
return new ClassLoaderSafeConnectorSplitSource(splitSource, Thread.currentThread().getContextClassLoader());
}

private static Optional<FileStatus[]> getFileStatuses(FileSystem fs, Configuration conf, String tablePath, Optional<FileStatus[]> fileStatuses, Properties schema)
void printConf(Configuration conf)
{
InputFormat inputFormat = HiveUtil.getInputFormat(conf, schema, false);
log.warn(">>> Check for inputFormat: " + isHudiParquetInputFormat(inputFormat));

try {
if (isHudiParquetInputFormat(inputFormat)) {
fileStatuses = Optional.of(((HoodieParquetInputFormat) inputFormat).listStatus(toJobConf(conf)));
}
if (fileStatuses.isPresent()) {
log.warn(">>> Total Files: " + fileStatuses.get().length);
if (fileStatuses.get().length == 0 && fs != null) {
fileStatuses = Optional.of(fs.listStatus(new Path(tablePath)));
log.warn(">>> Total Files: " + fileStatuses.get().length);
}
}
}
catch (IOException e) {
e.printStackTrace();
for (Map.Entry<String, String> entry : conf) {
log.warn("%s=%s\n", entry.getKey(), entry.getValue());
}
return fileStatuses;
}
}
Loading

0 comments on commit dd52c4d

Please sign in to comment.