Skip to content

Commit

Permalink
Fix reading table with headers in kerberized environments
Browse files Browse the repository at this point in the history
Previously when table had non zero value set for header or footer line
count, then Presto was unable to generate splits for it, raising:
Unable to query tables due to Can't get Master Kerberos principal for
use as renewer.

Mentioned above error was raised from internals of FileInputFormat. This
change avoids using FileInputFormat.getSplits.
  • Loading branch information
kokosing committed Jul 4, 2018
1 parent 05ea5a8 commit 67ce2ed
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ script:
- |
if [[ -v PRODUCT_TESTS_SPECIFIC_ENVIRONMENT ]]; then
presto-product-tests/bin/run_on_docker.sh \
singlenode-kerberos-hdfs-impersonation -g storage_formats,cli,hdfs_impersonation,authorization
singlenode-kerberos-hdfs-impersonation -g storage_formats,cli,hdfs_impersonation,authorization,hive_file_header
fi
- |
if [[ -v PRODUCT_TESTS_SPECIFIC_ENVIRONMENT_2 ]]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
// get the configuration for the target path -- it may be a different hdfs instance
FileSystem targetFilesystem = hdfsEnvironment.getFileSystem(hdfsContext, targetPath);
JobConf targetJob = toJobConf(targetFilesystem.getConf());
handleFileHeader(schema, targetJob);
targetJob.setInputFormat(TextInputFormat.class);
targetInputFormat.configure(targetJob);
FileInputFormat.setInputPaths(targetJob, targetPath);
Expand Down Expand Up @@ -332,12 +331,11 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)

// To support custom input formats, we want to call getSplits()
// on the input format to obtain file splits.
if (shouldUseFileSplitsFromInputFormat(inputFormat) || getHeaderCount(schema) > 0 || getFooterCount(schema) > 0) {
if (shouldUseFileSplitsFromInputFormat(inputFormat)) {
if (tableBucketInfo.isPresent()) {
throw new PrestoException(NOT_SUPPORTED, "Presto cannot read bucketed partition in an input format with UseFileSplitsFromInputFormat annotation: " + inputFormat.getClass().getSimpleName());
}
JobConf jobConf = toJobConf(configuration);
handleFileHeader(schema, jobConf);
FileInputFormat.setInputPaths(jobConf, path);
InputSplit[] splits = inputFormat.getSplits(jobConf, 0);

Expand All @@ -349,22 +347,11 @@ private ListenableFuture<?> loadPartition(HivePartitionMetadata partition)
return hiveSplitSource.addToQueue(getBucketedSplits(path, fs, splitFactory, tableBucketInfo.get(), bucketConversion));
}

fileIterators.addLast(createInternalHiveSplitIterator(path, fs, splitFactory));
boolean splittable = getHeaderCount(schema) == 0 && getFooterCount(schema) == 0;
fileIterators.addLast(createInternalHiveSplitIterator(path, fs, splitFactory, splittable));
return COMPLETED_FUTURE;
}

private void handleFileHeader(Properties schema, JobConf jobConf)
{
int headerCount = getHeaderCount(schema);
int footerCount = getFooterCount(schema);
if (headerCount > 0 || footerCount > 0) {
// do not split file when skip.header.line.count or skip.footer.line.count is used
jobConf.setLong("mapreduce.input.fileinputformat.split.minsize", Long.MAX_VALUE);
// TODO remove this when Hadoop 1.x is not supported
jobConf.setLong("mapred.min.split.size", Long.MAX_VALUE);
}
}

private ListenableFuture<?> addSplitsToSource(InputSplit[] targetSplits, InternalHiveSplitFactory splitFactory)
throws IOException
{
Expand All @@ -389,10 +376,10 @@ private static boolean shouldUseFileSplitsFromInputFormat(InputFormat<?, ?> inpu
.anyMatch(name -> name.equals("UseFileSplitsFromInputFormat"));
}

private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory)
private Iterator<InternalHiveSplit> createInternalHiveSplitIterator(Path path, FileSystem fileSystem, InternalHiveSplitFactory splitFactory, boolean splittable)
{
return Streams.stream(new HiveFileIterator(path, fileSystem, directoryLister, namenodeStats, recursiveDirWalkerEnabled ? RECURSE : IGNORED))
.map(splitFactory::createInternalHiveSplit)
.map(status -> splitFactory.createInternalHiveSplit(status, splittable))
.filter(Optional::isPresent)
.map(Optional::get)
.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,27 @@ public String getPartitionName()
return partitionName;
}

public Optional<InternalHiveSplit> createInternalHiveSplit(LocatedFileStatus status)
public Optional<InternalHiveSplit> createInternalHiveSplit(LocatedFileStatus status, boolean splittable)
{
return createInternalHiveSplit(status, OptionalInt.empty());
return createInternalHiveSplit(status, OptionalInt.empty(), splittable);
}

public Optional<InternalHiveSplit> createInternalHiveSplit(LocatedFileStatus status, int bucketNumber)
{
return createInternalHiveSplit(status, OptionalInt.of(bucketNumber));
return createInternalHiveSplit(status, OptionalInt.of(bucketNumber), false);
}

private Optional<InternalHiveSplit> createInternalHiveSplit(LocatedFileStatus status, OptionalInt bucketNumber)
private Optional<InternalHiveSplit> createInternalHiveSplit(LocatedFileStatus status, OptionalInt bucketNumber, boolean splittable)
{
splittable = splittable && isSplittable(inputFormat, fileSystem, status.getPath());
return createInternalHiveSplit(
status.getPath(),
status.getBlockLocations(),
0,
status.getLen(),
status.getLen(),
bucketNumber,
isSplittable(inputFormat, fileSystem, status.getPath()));
splittable);
}

public Optional<InternalHiveSplit> createInternalHiveSplit(FileSplit split)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- database: presto; tables: table_with_header, table_with_footer, table_with_header_and_footer; groups: hive;
-- database: presto; tables: table_with_header, table_with_footer, table_with_header_and_footer; groups: hive, hive_file_header;
--! name: simple_scan with header
SELECT count(*) FROM table_with_header
--!
Expand Down

0 comments on commit 67ce2ed

Please sign in to comment.