Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a metadata table showing commits in Hudi connector #16181

Merged
merged 2 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions docs/src/main/sphinx/connector/hudi.rst
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,52 @@ Here are some sample queries:
------------+--------+
2018-08-31 | 99 |
(1 rows)

.. _hudi-metadata-tables:

Metadata tables
---------------

The connector exposes a metadata table for each Hudi table.
The metadata table contains information about the internal structure
of the Hudi table. You can query each metadata table by appending the
metadata table name to the table name::

SELECT * FROM "test_table$timeline"

``$timeline`` table
^^^^^^^^^^^^^^^^^^^^

The ``$timeline`` table provides a detailed view of meta-data instants
in the Hudi table. Instants are specific points in time.

You can retrieve the information about the timeline of the Hudi table
``test_table`` by using the following query::

SELECT * FROM "test_table$timeline"

.. code-block:: text

timestamp | action | state
--------------------+---------+-----------
8667764846443717831 | commit | COMPLETED
7860805980949777961 | commit | COMPLETED

The output of the query has the following columns:

.. list-table:: Timeline columns
:widths: 20, 30, 50
:header-rows: 1

* - Name
- Type
- Description
* - ``timestamp``
- ``varchar``
- Instant time is typically a timestamp when the actions performed
* - ``action``
- ``varchar``
- `Type of action <https://hudi.apache.org/docs/concepts/#timeline>`_ performed on the table
* - ``state``
- ``varchar``
- Current state of the instant
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airlift.log.Logger;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.base.classloader.ClassLoaderSafeSystemTable;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hive.metastore.Column;
import io.trino.plugin.hive.metastore.HiveMetastore;
Expand All @@ -33,6 +34,7 @@
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.connector.TableColumnsMetadata;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
Expand Down Expand Up @@ -116,6 +118,35 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName
TupleDomain.all());
}

@Override
public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTableName tableName)
{
return getRawSystemTable(session, tableName)
.map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader()));
}

private Optional<SystemTable> getRawSystemTable(ConnectorSession session, SchemaTableName tableName)
{
HudiTableName name = HudiTableName.from(tableName.getTableName());
if (name.getTableType() == TableType.DATA) {
return Optional.empty();
}

Optional<Table> tableOptional = metastore.getTable(tableName.getSchemaName(), name.getTableName());
if (tableOptional.isEmpty()) {
return Optional.empty();
}
switch (name.getTableType()) {
case DATA:
break;
case TIMELINE:
albericgenius marked this conversation as resolved.
Show resolved Hide resolved
SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), name.getTableNameWithType());
Configuration configuration = hdfsEnvironment.getConfiguration(new HdfsContext(session), new Path(tableOptional.get().getStorage().getLocation()));
albericgenius marked this conversation as resolved.
Show resolved Hide resolved
return Optional.of(new TimelineTable(configuration, systemTableName, tableOptional.get()));
}
return Optional.empty();
}

@Override
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static io.trino.plugin.hudi.HudiSessionProperties.getStandardSplitWeightSize;
import static io.trino.plugin.hudi.HudiSessionProperties.isHudiMetadataEnabled;
import static io.trino.plugin.hudi.HudiSessionProperties.isSizeBasedSplitWeightsEnabled;
import static io.trino.plugin.hudi.HudiUtil.buildTableMetaClient;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -128,13 +129,6 @@ public boolean isFinished()
return queue.isFinished();
}

private static HoodieTableMetaClient buildTableMetaClient(Configuration configuration, String basePath)
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
{
HoodieTableMetaClient client = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(basePath).build();
client.getTableConfig().setValue("hoodie.bootstrap.index.enable", "false");
return client;
}

private static HudiSplitWeightProvider createSplitWeightProvider(ConnectorSession session)
{
if (isSizeBasedSplitWeightsEnabled(session)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hudi;

import io.trino.spi.TrinoException;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

public class HudiTableName
{
private static final Pattern TABLE_PATTERN = Pattern.compile("" +
"(?<table>[^$@]+)" +
"(?:\\$(?<type>[^@]+))?");

private final String tableName;
private final TableType tableType;

public HudiTableName(String tableName, TableType tableType)
{
this.tableName = requireNonNull(tableName, "tableName is null");
this.tableType = requireNonNull(tableType, "tableType is null");
}

public String getTableName()
{
return tableName;
}

public TableType getTableType()
{
return tableType;
}

public String getTableNameWithType()
{
return tableName + "$" + tableType.name().toLowerCase(ENGLISH);
}

@Override
public String toString()
{
return getTableNameWithType();
}

public static HudiTableName from(String name)
{
Matcher match = TABLE_PATTERN.matcher(name);
if (!match.matches()) {
throw new TrinoException(NOT_SUPPORTED, "Invalid Hudi table name: " + name);
}

String table = match.group("table");
String typeString = match.group("type");

TableType type = TableType.DATA;
if (typeString != null) {
try {
type = TableType.valueOf(typeString.toUpperCase(ENGLISH));
}
catch (IllegalArgumentException e) {
throw new TrinoException(NOT_SUPPORTED, format("Invalid Hudi table name (unknown type '%s'): %s", typeString, name));
}
}

return new HudiTableName(table, type);
}
}
10 changes: 10 additions & 0 deletions plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Type;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;

Expand Down Expand Up @@ -152,6 +154,14 @@ public static List<HivePartitionKey> buildPartitionKeys(List<Column> keys, List<
return partitionKeys.build();
}

public static HoodieTableMetaClient buildTableMetaClient(Configuration configuration, String basePath)
{
HoodieTableMetaClient client = HoodieTableMetaClient.builder().setConf(configuration).setBasePath(basePath).build();
// Do not load the bootstrap index, will not read bootstrap base data or a mapping index defined
client.getTableConfig().setValue("hoodie.bootstrap.index.enable", "false");
return client;
}

public static FileStatus getFileStatus(HoodieBaseFile baseFile)
{
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hudi;

public enum TableType
{
DATA,
TIMELINE;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.hudi;

import com.google.common.collect.ImmutableList;
import io.trino.plugin.hive.metastore.Table;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.InMemoryRecordSet;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.Type;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;

import java.util.ArrayList;
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.hudi.HudiUtil.buildTableMetaClient;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;

public class TimelineTable
implements SystemTable
{
private final ConnectorTableMetadata tableMetadata;
private final List<Type> types;
private final Configuration configuration;
private final String location;

public TimelineTable(Configuration configuration, SchemaTableName tableName, Table hudiTable)
{
this.tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"),
ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("timestamp", VARCHAR))
.add(new ColumnMetadata("action", VARCHAR))
.add(new ColumnMetadata("state", VARCHAR))
.build());
this.types = tableMetadata.getColumns().stream().map(ColumnMetadata::getType).collect(toImmutableList());
this.configuration = requireNonNull(configuration, "configuration is null");
this.location = requireNonNull(hudiTable.getStorage().getLocation(), "location is null");
}

@Override
public Distribution getDistribution()
{
return Distribution.SINGLE_COORDINATOR;
}

@Override
public ConnectorTableMetadata getTableMetadata()
{
return tableMetadata;
}

@Override
public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
HoodieTableMetaClient metaClient = buildTableMetaClient(configuration, location);
Iterable<List<Object>> records = () -> metaClient.getCommitsTimeline().getInstants().map(this::getRecord).iterator();
return new InMemoryRecordSet(types, records).cursor();
}

private List<Object> getRecord(HoodieInstant hudiInstant)
{
List<Object> columns = new ArrayList<>();
columns.add(hudiInstant.getTimestamp());
columns.add(hudiInstant.getAction());
columns.add(hudiInstant.getState().toString());
checkArgument(columns.size() == types.size(), "Expected %s types in row, but got %s values", types.size(), columns.size());
return columns;
}
}
Loading