Skip to content

Commit

Permalink
Add default-reader-version and default-writer-version delta config pr…
Browse files Browse the repository at this point in the history
…operties
  • Loading branch information
krvikash authored and ebyhr committed Feb 24, 2023
1 parent 82b6735 commit 5416843
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 20 deletions.
10 changes: 10 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ values. Typical usage does not require you to configure them.
* - ``delta.register-table-procedure.enabled``
- Enable to allow users to call the ``register_table`` procedure
- ``false``
* - ``delta.default-reader-version``
- The default reader version used by new tables.
The value can be overridden for a specific table with the
``reader_version`` table property.
- ``1``
* - ``delta.default-writer-version``
- The default writer version used by new tables.
The value can be overridden for a specific table with the
``writer_version`` table property.
- ``2``

The following table describes performance tuning catalog properties for the
connector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

Expand All @@ -48,6 +49,12 @@ public class DeltaLakeConfig
@VisibleForTesting
static final DataSize DEFAULT_DATA_FILE_CACHE_SIZE = DataSize.succinctBytes(Math.floorDiv(Runtime.getRuntime().maxMemory(), 10L));

public static final int MIN_READER_VERSION = 1;
public static final int MIN_WRITER_VERSION = 2;
// The highest reader and writer versions Trino supports writing to
public static final int MAX_READER_VERSION = 2;
public static final int MAX_WRITER_VERSION = 4;

private Duration metadataCacheTtl = new Duration(5, TimeUnit.MINUTES);
private long metadataCacheMaxSize = 1000;
private DataSize dataFileCacheSize = DEFAULT_DATA_FILE_CACHE_SIZE;
Expand Down Expand Up @@ -77,6 +84,8 @@ public class DeltaLakeConfig
private boolean uniqueTableLocation = true;
private boolean legacyCreateTableWithExistingLocationEnabled;
private boolean registerTableProcedureEnabled;
private int defaultReaderVersion = MIN_READER_VERSION;
private int defaultWriterVersion = MIN_WRITER_VERSION;

public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -475,4 +484,34 @@ public DeltaLakeConfig setRegisterTableProcedureEnabled(boolean registerTablePro
this.registerTableProcedureEnabled = registerTableProcedureEnabled;
return this;
}

@Min(value = MIN_READER_VERSION, message = "Must be in between " + MIN_READER_VERSION + " and " + MAX_READER_VERSION)
@Max(value = MAX_READER_VERSION, message = "Must be in between " + MIN_READER_VERSION + " and " + MAX_READER_VERSION)
public int getDefaultReaderVersion()
{
return defaultReaderVersion;
}

@Config("delta.default-reader-version")
@ConfigDescription("The default reader version used by new tables")
public DeltaLakeConfig setDefaultReaderVersion(int defaultReaderVersion)
{
this.defaultReaderVersion = defaultReaderVersion;
return this;
}

@Min(value = MIN_WRITER_VERSION, message = "Must be in between " + MIN_WRITER_VERSION + " and " + MAX_WRITER_VERSION)
@Max(value = MAX_WRITER_VERSION, message = "Must be in between " + MIN_WRITER_VERSION + " and " + MAX_WRITER_VERSION)
public int getDefaultWriterVersion()
{
return defaultWriterVersion;
}

@Config("delta.default-writer-version")
@ConfigDescription("The default writer version used by new tables")
public DeltaLakeConfig setDefaultWriterVersion(int defaultWriterVersion)
{
this.defaultWriterVersion = defaultWriterVersion;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
import static io.trino.plugin.deltalake.DeltaLakeColumnType.PARTITION_KEY;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.SYNTHESIZED;
import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_BAD_WRITE;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getHiveCatalogName;
Expand Down Expand Up @@ -286,13 +287,6 @@ public class DeltaLakeMetadata
public static final String CHANGE_COLUMN_OPERATION = "CHANGE COLUMN";
public static final String ISOLATION_LEVEL = "WriteSerializable";

// The required reader and writer versions used by tables created by Trino
public static final int MIN_READER_VERSION = 1;
public static final int MIN_WRITER_VERSION = 2;
// The highest reader and writer versions Trino supports writing to
public static final int MAX_READER_VERSION = 2;
public static final int MAX_WRITER_VERSION = 4;

private static final int CDF_SUPPORTED_WRITER_VERSION = 4;

// Matches the dummy column Databricks stores in the metastore
Expand Down Expand Up @@ -326,6 +320,8 @@ public class DeltaLakeMetadata
private final boolean deleteSchemaLocationsFallback;
private final boolean useUniqueTableLocation;
private final boolean allowManagedTableRename;
private final int defaultReaderVersion;
private final int defaultWriterVersion;

public DeltaLakeMetadata(
DeltaLakeMetastore metastore,
Expand All @@ -346,7 +342,9 @@ public DeltaLakeMetadata(
DeltaLakeRedirectionsProvider deltaLakeRedirectionsProvider,
ExtendedStatisticsAccess statisticsAccess,
boolean useUniqueTableLocation,
boolean allowManagedTableRename)
boolean allowManagedTableRename,
int defaultReaderVersion,
int defaultWriterVersion)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
Expand All @@ -368,6 +366,8 @@ public DeltaLakeMetadata(
this.deleteSchemaLocationsFallback = deleteSchemaLocationsFallback;
this.useUniqueTableLocation = useUniqueTableLocation;
this.allowManagedTableRename = allowManagedTableRename;
this.defaultReaderVersion = defaultReaderVersion;
this.defaultWriterVersion = defaultWriterVersion;
}

@Override
Expand Down Expand Up @@ -1776,8 +1776,8 @@ private ProtocolEntry getProtocolEntry(Map<String, Object> properties)
}
}
return new ProtocolEntry(
readerVersion.orElse(MIN_READER_VERSION),
writerVersion.orElse(MIN_WRITER_VERSION));
readerVersion.orElse(defaultReaderVersion),
writerVersion.orElse(defaultWriterVersion));
}

private void writeCheckpointIfNeeded(ConnectorSession session, SchemaTableName table, Optional<Long> checkpointInterval, long newVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public class DeltaLakeMetadataFactory
private final long perTransactionMetastoreCacheMaximumSize;
private final boolean deleteSchemaLocationsFallback;
private final boolean useUniqueTableLocation;
private final int defaultReaderVersion;
private final int defaultWriterVersion;

private final boolean allowManagedTableRename;
private final String trinoVersion;
Expand Down Expand Up @@ -100,6 +102,8 @@ public DeltaLakeMetadataFactory(
this.perTransactionMetastoreCacheMaximumSize = deltaLakeConfig.getPerTransactionMetastoreCacheMaximumSize();
this.deleteSchemaLocationsFallback = deltaLakeConfig.isDeleteSchemaLocationsFallback();
this.useUniqueTableLocation = deltaLakeConfig.isUniqueTableLocation();
this.defaultReaderVersion = deltaLakeConfig.getDefaultReaderVersion();
this.defaultWriterVersion = deltaLakeConfig.getDefaultWriterVersion();
this.allowManagedTableRename = allowManagedTableRename;
this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString();
}
Expand Down Expand Up @@ -141,6 +145,8 @@ public DeltaLakeMetadata create(ConnectorIdentity identity)
deltaLakeRedirectionsProvider,
statisticsAccess,
useUniqueTableLocation,
allowManagedTableRename);
allowManagedTableRename,
defaultReaderVersion,
defaultWriterVersion);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import java.util.Optional;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_READER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MIN_READER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MIN_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_READER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeConfig.MAX_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeConfig.MIN_READER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeConfig.MIN_WRITER_VERSION;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
import io.trino.plugin.hive.HiveCompressionCodec;
import org.testng.annotations.Test;

import javax.validation.constraints.Max;
import javax.validation.constraints.Min;

import java.lang.annotation.Annotation;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.airlift.testing.ValidationAssertions.assertFailsValidation;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.trino.plugin.hive.util.TestHiveUtil.nonDefaultTimeZone;
import static java.util.concurrent.TimeUnit.DAYS;
Expand Down Expand Up @@ -67,7 +72,9 @@ public void testDefaults()
.setTargetMaxFileSize(DataSize.of(1, GIGABYTE))
.setUniqueTableLocation(true)
.setLegacyCreateTableWithExistingLocationEnabled(false)
.setRegisterTableProcedureEnabled(false));
.setRegisterTableProcedureEnabled(false)
.setDefaultReaderVersion(1)
.setDefaultWriterVersion(2));
}

@Test
Expand Down Expand Up @@ -103,6 +110,8 @@ public void testExplicitPropertyMappings()
.put("delta.unique-table-location", "false")
.put("delta.legacy-create-table-with-existing-location.enabled", "true")
.put("delta.register-table-procedure.enabled", "true")
.put("delta.default-reader-version", "2")
.put("delta.default-writer-version", "3")
.buildOrThrow();

DeltaLakeConfig expected = new DeltaLakeConfig()
Expand Down Expand Up @@ -134,8 +143,29 @@ public void testExplicitPropertyMappings()
.setTargetMaxFileSize(DataSize.of(2, GIGABYTE))
.setUniqueTableLocation(false)
.setLegacyCreateTableWithExistingLocationEnabled(true)
.setRegisterTableProcedureEnabled(true);
.setRegisterTableProcedureEnabled(true)
.setDefaultReaderVersion(2)
.setDefaultWriterVersion(3);

assertFullMapping(properties, expected);
}

@Test
public void testValidation()
{
assertFailsReaderVersionValidation(new DeltaLakeConfig().setDefaultReaderVersion(0), Min.class);
assertFailsReaderVersionValidation(new DeltaLakeConfig().setDefaultReaderVersion(3), Max.class);
assertFailsWriterVersionValidation(new DeltaLakeConfig().setDefaultWriterVersion(1), Min.class);
assertFailsWriterVersionValidation(new DeltaLakeConfig().setDefaultWriterVersion(5), Max.class);
}

private void assertFailsReaderVersionValidation(DeltaLakeConfig config, Class<? extends Annotation> annotation)
{
assertFailsValidation(config, "defaultReaderVersion", "Must be in between 1 and 2", annotation);
}

private void assertFailsWriterVersionValidation(DeltaLakeConfig config, Class<? extends Annotation> annotation)
{
assertFailsValidation(config, "defaultWriterVersion", "Must be in between 2 and 4", annotation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.trino.plugin.deltalake.DeltaLakeColumnType.REGULAR;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MIN_READER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MIN_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaTestingConnectorSession.SESSION;
import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static io.trino.spi.type.BigintType.BIGINT;
Expand Down Expand Up @@ -163,7 +161,7 @@ private static ConnectorPageSink createPageSink(Path outputPath, DeltaLakeWriter
true,
Optional.empty(),
Optional.of(false),
new ProtocolEntry(MIN_READER_VERSION, MIN_WRITER_VERSION));
new ProtocolEntry(deltaLakeConfig.getDefaultReaderVersion(), deltaLakeConfig.getDefaultWriterVersion()));

DeltaLakePageSinkProvider provider = new DeltaLakePageSinkProvider(
new GroupByHashPageIndexerFactory(new JoinCompiler(new TypeOperators()), new BlockTypeOperators()),
Expand Down

0 comments on commit 5416843

Please sign in to comment.