Skip to content

Commit

Permalink
Return empty PageSource when there's unsupported Serde property
Browse files Browse the repository at this point in the history
  • Loading branch information
linlin-s authored and rmarrowstone committed Dec 18, 2024
1 parent 3871c1f commit 0d593c2
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import com.amazon.ion.IonReader;
import com.amazon.ion.system.IonReaderBuilder;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.io.CountingInputStream;
import com.google.inject.Inject;
import io.trino.filesystem.Location;
Expand Down Expand Up @@ -43,15 +45,31 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.regex.Pattern;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.hive.formats.HiveClassNames.ION_SERDE_CLASS;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
import static io.trino.plugin.hive.HivePageSourceProvider.projectBaseColumns;
import static io.trino.plugin.hive.ReaderPageSource.noProjectionAdaptation;
import static io.trino.plugin.hive.ion.IonReaderOptions.FAIL_ON_OVERFLOW_PROPERTY;
import static io.trino.plugin.hive.ion.IonReaderOptions.FAIL_ON_OVERFLOW_PROPERTY_DEFAULT;
import static io.trino.plugin.hive.ion.IonReaderOptions.FAIL_ON_OVERFLOW_PROPERTY_WITH_COLUMN;
import static io.trino.plugin.hive.ion.IonReaderOptions.IGNORE_MALFORMED;
import static io.trino.plugin.hive.ion.IonReaderOptions.IGNORE_MALFORMED_DEFAULT;
import static io.trino.plugin.hive.ion.IonReaderOptions.PATH_EXTRACTION_CASE_SENSITIVITY;
import static io.trino.plugin.hive.ion.IonReaderOptions.PATH_EXTRACTION_CASE_SENSITIVITY_DEFAULT;
import static io.trino.plugin.hive.ion.IonReaderOptions.PATH_EXTRACTOR_PROPERTY;
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_SERIALIZATION_AS_NULL_DEFAULT;
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_SERIALIZATION_AS_NULL_PROPERTY;
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_SERIALIZATION_AS_PROPERTY;
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_TIMESTAMP_OFFSET_DEFAULT;
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_TIMESTAMP_OFFSET_PROPERTY;
import static io.trino.plugin.hive.util.HiveUtil.splitError;

public class IonPageSourceFactory
Expand All @@ -61,6 +79,18 @@ public class IonPageSourceFactory
// this is used as a feature flag to enable Ion native trino integration
private final boolean nativeTrinoEnabled;

private static final Map<String, String> TABLE_PROPERTIES = ImmutableMap.of(
FAIL_ON_OVERFLOW_PROPERTY, FAIL_ON_OVERFLOW_PROPERTY_DEFAULT,
IGNORE_MALFORMED, IGNORE_MALFORMED_DEFAULT,
PATH_EXTRACTION_CASE_SENSITIVITY, PATH_EXTRACTION_CASE_SENSITIVITY_DEFAULT,
ION_TIMESTAMP_OFFSET_PROPERTY, ION_TIMESTAMP_OFFSET_DEFAULT,
ION_SERIALIZATION_AS_NULL_PROPERTY, ION_SERIALIZATION_AS_NULL_DEFAULT);

private static final Set<Pattern> COLUMN_PROPERTIES = ImmutableSet.of(
Pattern.compile(FAIL_ON_OVERFLOW_PROPERTY_WITH_COLUMN),
Pattern.compile(ION_SERIALIZATION_AS_PROPERTY),
Pattern.compile(PATH_EXTRACTOR_PROPERTY));

@Inject
public IonPageSourceFactory(TrinoFileSystemFactory trinoFileSystemFactory, HiveConfig hiveConfig)
{
Expand Down Expand Up @@ -89,6 +119,11 @@ public Optional<ReaderPageSource> createPageSource(
// on their use case
return Optional.empty();
}

if (schema.serdeProperties().entrySet().stream().filter(entry -> entry.getKey().startsWith("ion.")).anyMatch(this::isUnsupportedProperty)) {
return Optional.empty();
}

if (!ION_SERDE_CLASS.equals(schema.serializationLibraryName())) {
return Optional.empty();
}
Expand Down Expand Up @@ -147,4 +182,20 @@ public Optional<ReaderPageSource> createPageSource(
throw new TrinoException(HIVE_CANNOT_OPEN_SPLIT, splitError(e, path, start, length), e);
}
}

private boolean isUnsupportedProperty(Map.Entry<String, String> entry)
{
String key = entry.getKey();
String value = entry.getValue();

String propertyDefault = TABLE_PROPERTIES.get(key);
if (propertyDefault != null) {
return !propertyDefault.equals(value);
}

// For now, any column-specific properties result in an empty PageSource
// since they have no default values for comparison.
return COLUMN_PROPERTIES.stream()
.anyMatch(pattern -> pattern.matcher(key).matches());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ public final class IonReaderOptions
{
public static final String STRICT_PATH_TYPING_PROPERTY = "ion.path_extractor.strict";
public static final String STRICT_PATH_TYPING_DEFAULT = "false";
public static final String PATH_EXTRACTOR_PROPERTY = "ion.\\w+.path_extractor";
public static final String PATH_EXTRACTION_CASE_SENSITIVITY = "ion.path_extractor.case_sensitive";
public static final String PATH_EXTRACTION_CASE_SENSITIVITY_DEFAULT = "false";
public static final String FAIL_ON_OVERFLOW_PROPERTY_WITH_COLUMN = "ion.\\w+.fail_on_overflow";
public static final String FAIL_ON_OVERFLOW_PROPERTY = "ion.fail_on_overflow";
public static final String FAIL_ON_OVERFLOW_PROPERTY_DEFAULT = "true";
public static final String IGNORE_MALFORMED = "ion.ignore_malformed";
public static final String IGNORE_MALFORMED_DEFAULT = "false";

private IonReaderOptions() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
public final class IonWriterOptions
{
public static final String ION_ENCODING_PROPERTY = "ion.encoding";
public static final String ION_TIMESTAMP_OFFSET_PROPERTY = "ion.timestamp.serialization_offset";
public static final String ION_TIMESTAMP_OFFSET_DEFAULT = "Z";
public static final String ION_SERIALIZATION_AS_NULL_PROPERTY = "ion.serialize_null";
public static final String ION_SERIALIZATION_AS_NULL_DEFAULT = "OMIT";
public static final String ION_SERIALIZATION_AS_PROPERTY = "ion.\\w+.serialize_as";

public static final String TEXT_ENCODING = "text";
public static final String BINARY_ENCODING = "binary";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,40 @@
import io.trino.testing.MaterializedResult;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.trino.plugin.hive.HivePageSourceProvider.ColumnMapping.buildColumnMappings;
import static io.trino.plugin.hive.HiveStorageFormat.ION;
import static io.trino.plugin.hive.HiveTestUtils.getHiveSession;
import static io.trino.plugin.hive.HiveTestUtils.projectedColumn;
import static io.trino.plugin.hive.HiveTestUtils.toHiveBaseColumnHandle;
import static io.trino.plugin.hive.acid.AcidTransaction.NO_ACID_TRANSACTION;
import static io.trino.plugin.hive.ion.IonReaderOptions.FAIL_ON_OVERFLOW_PROPERTY;
import static io.trino.plugin.hive.ion.IonReaderOptions.FAIL_ON_OVERFLOW_PROPERTY_DEFAULT;
import static io.trino.plugin.hive.ion.IonReaderOptions.IGNORE_MALFORMED;
import static io.trino.plugin.hive.ion.IonReaderOptions.IGNORE_MALFORMED_DEFAULT;
import static io.trino.plugin.hive.ion.IonReaderOptions.PATH_EXTRACTION_CASE_SENSITIVITY;
import static io.trino.plugin.hive.ion.IonReaderOptions.PATH_EXTRACTION_CASE_SENSITIVITY_DEFAULT;
import static io.trino.plugin.hive.ion.IonWriterOptions.BINARY_ENCODING;
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_ENCODING_PROPERTY;
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_SERIALIZATION_AS_NULL_DEFAULT;
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_SERIALIZATION_AS_NULL_PROPERTY;
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_TIMESTAMP_OFFSET_DEFAULT;
import static io.trino.plugin.hive.ion.IonWriterOptions.ION_TIMESTAMP_OFFSET_PROPERTY;
import static io.trino.plugin.hive.ion.IonWriterOptions.TEXT_ENCODING;
import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMNS;
import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMN_TYPES;
Expand Down Expand Up @@ -161,6 +175,60 @@ public void testPageSourceWithNativeTrinoDisabled()
Assertions.assertTrue(connectorPageSource.isEmpty(), "Expected empty page source when native Trino is disabled");
}

private static Stream<Map.Entry<String, String>> propertiesWithDefaults()
{
return Stream.of(
entry(FAIL_ON_OVERFLOW_PROPERTY, FAIL_ON_OVERFLOW_PROPERTY_DEFAULT),
entry(PATH_EXTRACTION_CASE_SENSITIVITY, PATH_EXTRACTION_CASE_SENSITIVITY_DEFAULT),
entry(IGNORE_MALFORMED, IGNORE_MALFORMED_DEFAULT),
entry(ION_TIMESTAMP_OFFSET_PROPERTY, ION_TIMESTAMP_OFFSET_DEFAULT),
entry(ION_SERIALIZATION_AS_NULL_PROPERTY, ION_SERIALIZATION_AS_NULL_DEFAULT));
}

private static Stream<Map.Entry<String, String>> propertiesWithValues()
{
return Stream.of(
entry(FAIL_ON_OVERFLOW_PROPERTY, "false"),
entry(PATH_EXTRACTION_CASE_SENSITIVITY, "true"),
entry(IGNORE_MALFORMED, "true"),
entry(ION_TIMESTAMP_OFFSET_PROPERTY, "01:00"),
entry(ION_SERIALIZATION_AS_NULL_PROPERTY, "TYPED"),
// These entries represent column-specific properties that are not supported.
// Any presence of these properties in the schema will result in an empty PageSource,
// regardless of their assigned values.
entry("ion.foo.fail_on_overflow", "property_value"),
entry("ion.foo.serialize_as", "property_value"),
entry("ion.foo.path_extractor", "property_value"));
}

private static Map.Entry<String, String> entry(String key, String value)
{
return new AbstractMap.SimpleEntry<>(key, value);
}

@ParameterizedTest
@MethodSource("propertiesWithValues")
void testPropertiesWithValues(Map.Entry<String, String> property)
throws IOException
{
TestFixture fixture = new TestFixture(FOO_BAR_COLUMNS)
.withSerdeProperties(property);
fixture.writeIonTextFile("{ foo: 31, bar: baz } { foo: 31, bar: \"baz\" }");

Optional<ConnectorPageSource> connectorPageSource = fixture.getOptionalPageSource();
Assertions.assertTrue(connectorPageSource.isEmpty(), "Expected empty page source when there are unsupported Serde properties");
}

@ParameterizedTest
@MethodSource("propertiesWithDefaults")
void testPropertiesWithDefaults(Map.Entry<String, String> propertyEntry)
throws IOException
{
TestFixture fixture = new TestFixture(FOO_BAR_COLUMNS)
.withSerdeProperties(propertyEntry);
fixture.assertRowCount("{ foo: 31, bar: baz } { foo: 31, bar: \"baz\" }", 2);
}

@Test
public void testTextEncoding()
throws IOException
Expand Down Expand Up @@ -267,6 +335,13 @@ TestFixture withStrictPathTyping(String strict)
return this;
}

TestFixture withSerdeProperties(Map.Entry<String, String> propertyEntry)
{
// The value of the property is just placeholder
tableProperties.put(propertyEntry.getKey(), propertyEntry.getValue());
return this;
}

Optional<ConnectorPageSource> getOptionalPageSource()
throws IOException
{
Expand Down

0 comments on commit 0d593c2

Please sign in to comment.