Skip to content

Commit

Permalink
Add readTail to OrcDataSource
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Sep 12, 2020
1 parent 794b5e7 commit 95dedea
Show file tree
Hide file tree
Showing 26 changed files with 192 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import io.prestosql.memory.context.AggregatedMemoryContext;
import io.prestosql.orc.OrcColumn;
import io.prestosql.orc.OrcCorruptionException;
import io.prestosql.orc.OrcDataSource;
import io.prestosql.orc.OrcDataSourceId;
import io.prestosql.orc.OrcPredicate;
Expand All @@ -38,12 +39,14 @@
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.collect.Maps.uniqueIndex;
import static io.prestosql.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.prestosql.orc.OrcReader.MAX_BATCH_SIZE;
import static io.prestosql.orc.OrcReader.createOrcReader;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_MISSING_DATA;
import static io.prestosql.plugin.hive.orc.OrcPageSource.handleException;
Expand All @@ -66,7 +69,7 @@ public class OrcDeleteDeltaPageSource

private boolean closed;

public OrcDeleteDeltaPageSource(
public static Optional<ConnectorPageSource> createOrcDeleteDeltaPageSource(
Path path,
long fileSize,
OrcReaderOptions options,
Expand All @@ -75,8 +78,7 @@ public OrcDeleteDeltaPageSource(
HdfsEnvironment hdfsEnvironment,
FileFormatDataSourceStats stats)
{
this.stats = requireNonNull(stats, "stats is null");

OrcDataSource orcDataSource;
try {
FileSystem fileSystem = hdfsEnvironment.getFileSystem(sessionUser, path, configuration);
FSDataInputStream inputStream = hdfsEnvironment.doAs(sessionUser, () -> fileSystem.open(path));
Expand All @@ -96,26 +98,11 @@ public OrcDeleteDeltaPageSource(
}

try {
OrcReader reader = new OrcReader(orcDataSource, options);

verifyAcidSchema(reader, path);
Map<String, OrcColumn> acidColumns = uniqueIndex(
reader.getRootColumn().getNestedColumns(),
orcColumn -> orcColumn.getColumnName().toLowerCase(ENGLISH));
List<OrcColumn> rowIdColumns = ImmutableList.of(
acidColumns.get(ACID_COLUMN_ORIGINAL_TRANSACTION.toLowerCase(ENGLISH)),
acidColumns.get(ACID_COLUMN_ROW_ID.toLowerCase(ENGLISH)));

recordReader = reader.createRecordReader(
rowIdColumns,
ImmutableList.of(BIGINT, BIGINT),
OrcPredicate.TRUE,
0,
fileSize,
UTC,
systemMemoryContext,
MAX_BATCH_SIZE,
exception -> handleException(orcDataSource.getId(), exception));
Optional<OrcReader> orcReader = createOrcReader(orcDataSource, options);
if (orcReader.isPresent()) {
return Optional.of(new OrcDeleteDeltaPageSource(path, fileSize, orcReader.get(), orcDataSource, stats));
}
return Optional.empty();
}
catch (Exception e) {
try {
Expand All @@ -135,6 +122,37 @@ public OrcDeleteDeltaPageSource(
}
}

private OrcDeleteDeltaPageSource(
Path path,
long fileSize,
OrcReader reader,
OrcDataSource orcDataSource,
FileFormatDataSourceStats stats)
throws OrcCorruptionException
{
this.stats = requireNonNull(stats, "stats is null");
this.orcDataSource = requireNonNull(orcDataSource, "orcDataSource is null");

verifyAcidSchema(reader, path);
Map<String, OrcColumn> acidColumns = uniqueIndex(
reader.getRootColumn().getNestedColumns(),
orcColumn -> orcColumn.getColumnName().toLowerCase(ENGLISH));
List<OrcColumn> rowIdColumns = ImmutableList.of(
acidColumns.get(ACID_COLUMN_ORIGINAL_TRANSACTION.toLowerCase(ENGLISH)),
acidColumns.get(ACID_COLUMN_ROW_ID.toLowerCase(ENGLISH)));

recordReader = reader.createRecordReader(
rowIdColumns,
ImmutableList.of(BIGINT, BIGINT),
OrcPredicate.TRUE,
0,
fileSize,
UTC,
systemMemoryContext,
MAX_BATCH_SIZE,
exception -> handleException(orcDataSource.getId(), exception));
}

@Override
public long getCompletedBytes()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
import io.prestosql.orc.OrcReaderOptions;
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
import io.prestosql.plugin.hive.HdfsEnvironment;
import io.prestosql.spi.connector.ConnectorPageSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.util.Optional;

import static io.prestosql.plugin.hive.orc.OrcDeleteDeltaPageSource.createOrcDeleteDeltaPageSource;
import static java.util.Objects.requireNonNull;

public class OrcDeleteDeltaPageSourceFactory
Expand All @@ -43,9 +47,9 @@ public OrcDeleteDeltaPageSourceFactory(
this.stats = requireNonNull(stats, "stats is null");
}

public OrcDeleteDeltaPageSource createPageSource(Path path, long fileSize)
public Optional<ConnectorPageSource> createPageSource(Path path, long fileSize)
{
return new OrcDeleteDeltaPageSource(
return createOrcDeleteDeltaPageSource(
path,
fileSize,
options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.prestosql.spi.block.Block;
import io.prestosql.spi.block.DictionaryBlock;
import io.prestosql.spi.connector.ConnectorPageSource;
import io.prestosql.spi.connector.FixedPageSource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -210,7 +211,7 @@ private Set<RowId> getDeletedRows()
FileSystem fileSystem = hdfsEnvironment.getFileSystem(sessionUser, path, configuration);
FileStatus fileStatus = hdfsEnvironment.doAs(sessionUser, () -> fileSystem.getFileStatus(path));

try (ConnectorPageSource pageSource = pageSourceFactory.createPageSource(fileStatus.getPath(), fileStatus.getLen())) {
try (ConnectorPageSource pageSource = pageSourceFactory.createPageSource(fileStatus.getPath(), fileStatus.getLen()).orElseGet(() -> new FixedPageSource(ImmutableSet.of()))) {
while (!pageSource.isFinished()) {
Page page = pageSource.getNextPage();
if (page != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,14 @@ public Optional<ReaderPageSourceWithProjections> createPageSource(
return Optional.of(new ReaderPageSourceWithProjections(orcPageSource, projectedReaderColumns));
}

private static OrcPageSource createOrcPageSource(
private static ConnectorPageSource createOrcPageSource(
HdfsEnvironment hdfsEnvironment,
String sessionUser,
Configuration configuration,
Path path,
long start,
long length,
long fileSize,
long estimatedFileSize,
List<HiveColumnHandle> columns,
List<HiveColumnHandle> projections,
boolean useOrcColumnNames,
Expand All @@ -221,7 +221,7 @@ private static OrcPageSource createOrcPageSource(
FSDataInputStream inputStream = hdfsEnvironment.doAs(sessionUser, () -> fileSystem.open(path));
orcDataSource = new HdfsOrcDataSource(
new OrcDataSourceId(path.toString()),
fileSize,
estimatedFileSize,
options,
inputStream,
stats);
Expand All @@ -236,7 +236,11 @@ private static OrcPageSource createOrcPageSource(

AggregatedMemoryContext systemMemoryUsage = newSimpleAggregatedMemoryContext();
try {
OrcReader reader = new OrcReader(orcDataSource, options);
Optional<OrcReader> optionalOrcReader = OrcReader.createOrcReader(orcDataSource, options);
if (optionalOrcReader.isEmpty()) {
return new FixedPageSource(ImmutableList.of());
}
OrcReader reader = optionalOrcReader.get();

List<OrcColumn> fileColumns = reader.getRootColumn().getNestedColumns();
List<OrcColumn> fileReadColumns = new ArrayList<>(columns.size() + (isFullAcid ? 2 : 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.util.Collection;

import static io.prestosql.orc.OrcReader.createOrcReader;
import static io.prestosql.plugin.hive.AcidInfo.OriginalFileInfo;
import static io.prestosql.plugin.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;

Expand Down Expand Up @@ -84,10 +85,14 @@ private static Long getRowsInFile(
options,
inputStream,
stats)) {
OrcReader reader = new OrcReader(orcDataSource, options);
OrcReader reader = createOrcReader(orcDataSource, options)
.orElseThrow(() -> new PrestoException(HIVE_CANNOT_OPEN_SPLIT, "Could not read ORC footer from empty file: " + splitPath));
return reader.getFooter().getNumberOfRows();
}
}
catch (PrestoException e) {
throw e;
}
catch (Exception e) {
throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, "Could not read ORC footer from file: " + splitPath, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public TempFileReader(List<Type> types, OrcDataSource dataSource)
requireNonNull(types, "types is null");

try {
OrcReader orcReader = new OrcReader(dataSource, new OrcReaderOptions());
OrcReader orcReader = OrcReader.createOrcReader(dataSource, new OrcReaderOptions())
.orElseThrow(() -> new PrestoException(HIVE_WRITER_DATA_ERROR, "Temporary data file is empty"));
reader = orcReader.createRecordReader(
orcReader.getRootColumn().getNestedColumns(),
types,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.io.Resources;
import io.prestosql.orc.OrcReaderOptions;
import io.prestosql.plugin.hive.FileFormatDataSourceStats;
import io.prestosql.spi.connector.ConnectorPageSource;
import io.prestosql.testing.MaterializedResult;
import io.prestosql.testing.MaterializedRow;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -45,7 +46,7 @@ public void testReadingDeletedRows()
HDFS_ENVIRONMENT,
new FileFormatDataSourceStats());

OrcDeleteDeltaPageSource pageSource = pageSourceFactory.createPageSource(new Path(deleteDeltaFile.toURI()), deleteDeltaFile.length());
ConnectorPageSource pageSource = pageSourceFactory.createPageSource(new Path(deleteDeltaFile.toURI()), deleteDeltaFile.length()).orElseThrow();
MaterializedResult materializedRows = MaterializedResult.materializeSourceDataStream(SESSION, pageSource, ImmutableList.of(BIGINT, BIGINT));

assertEquals(materializedRows.getRowCount(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ private static ConnectorPageSource createOrcPageSource(
inputStream,
stats);

OrcReader reader = new OrcReader(orcDataSource, options);
OrcReader reader = OrcReader.createOrcReader(orcDataSource, options)
.orElseThrow(() -> new PrestoException(ICEBERG_BAD_DATA, "ORC file is zero length"));
List<OrcColumn> fileColumns = reader.getRootColumn().getNestedColumns();
Map<Integer, OrcColumn> fileColumnsByIcebergId = fileColumns.stream()
.filter(orcColumn -> orcColumn.getAttributes().containsKey(ORC_ICEBERG_ID_KEY))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,16 @@ public abstract class AbstractOrcDataSource
implements OrcDataSource
{
private final OrcDataSourceId id;
private final long size;
private final long estimatedSize;
private final OrcReaderOptions options;
private long readTimeNanos;
private long readBytes;

public AbstractOrcDataSource(OrcDataSourceId id, long size, OrcReaderOptions options)
public AbstractOrcDataSource(OrcDataSourceId id, long estimatedSize, OrcReaderOptions options)
{
this.id = requireNonNull(id, "id is null");

this.size = size;
checkArgument(size > 0, "size must be at least 1");
this.estimatedSize = estimatedSize;
this.options = requireNonNull(options, "options is null");
}

Expand All @@ -74,9 +73,16 @@ public final long getReadTimeNanos()
}

@Override
public final long getSize()
public final long getEstimatedSize()
{
return size;
return estimatedSize;
}

@Override
public Slice readTail(int length)
throws IOException
{
return readFully(estimatedSize - length, length);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ public long getReadTimeNanos()
}

@Override
public long getSize()
public long getEstimatedSize()
{
return dataSource.getSize();
return dataSource.getEstimatedSize();
}

@Override
Expand All @@ -87,6 +87,13 @@ void readCacheAt(long offset)
cache = dataSource.readFully(newCacheRange.getOffset(), cacheLength);
}

@Override
public Slice readTail(int length)
{
// caching data source is not used for tail reads, and would be complex to implement
throw new UnsupportedOperationException();
}

@Override
public Slice readFully(long position, int length)
throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public long getReadTimeNanos()
}

@Override
public final long getSize()
public final long getEstimatedSize()
{
return data.length();
}
Expand All @@ -67,6 +67,12 @@ public long getRetainedSize()
return data.getRetainedSize();
}

@Override
public Slice readTail(int length)
{
return readFully(data.length() - length, length);
}

@Override
public final Slice readFully(long position, int length)
{
Expand Down
5 changes: 4 additions & 1 deletion presto-orc/src/main/java/io/prestosql/orc/OrcDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@ public interface OrcDataSource

long getReadTimeNanos();

long getSize();
long getEstimatedSize();

/**
* Gets the memory size of this data source. This only includes memory
* used for the data source and not memory assigned to a specific OrcDataReader.
*/
long getRetainedSize();

Slice readTail(int length)
throws IOException;

Slice readFully(long position, int length)
throws IOException;

Expand Down
Loading

0 comments on commit 95dedea

Please sign in to comment.