Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2415: Reuse hadoop file status and footer in ParquetRecordReader #1242

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -64,13 +65,19 @@ public int run() throws IOException {
return 0;
}

abstract class MixIn {
@JsonIgnore
abstract int getInputFile();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1242 (comment)

I'm sorry, the UT failed, I don't know why.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean this is a workaround to get rid of the test failure at the cost of a new dependency?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the UT will fail if just annotating the getInputFile method, create a MixIn class here (parquet-cli module) to workaround.

Parquet project already has a dependency of jackson-annotations library in some other modules. So I don't think this PR will add a new dependency in parquet-hadoop module.

image

}

private String readFooter(InputFile inputFile) throws JsonProcessingException, IOException {
String json;
try (ParquetFileReader reader = ParquetFileReader.open(inputFile)) {
ParquetMetadata footer = reader.getFooter();
ObjectMapper mapper = RawUtils.createObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, Visibility.NONE);
mapper.setVisibility(PropertyAccessor.FIELD, Visibility.ANY);
mapper.addMixIn(ParquetMetadata.class, MixIn.class);
json = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(footer);
}
return json;
Expand Down
5 changes: 5 additions & 0 deletions parquet-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@
<artifactId>jackson-databind</artifactId>
<version>${jackson-databind.version}</version>
</dependency>
<dependency>
<groupId>${jackson.groupId}</groupId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid adding dependency?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The jackson-annotations dependency is used in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java . Do not serialize InputFile inputFile to json and keep the the same behavior as before. I'm sorry I'm not familiar with jackson library and not sure is there any other way to do this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I happened to find that we have a parquet-jackson module which shades jackson-core and jackson-databind. But in the parquet-hadoop (and other modules) it also explicitly depends on parquet-jackson and jackson-xxx at the same time. I'm not familiar with this history, do you know why? @gszadovszky @Fokko @shangxinli

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wgtmac, the README of parquet-jackson describes how it works. This is only for doing the shading once (and having one shaded jar) instead of in all the modules which requires jackson.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Sorry for missing that.

<artifactId>jackson-annotations</artifactId>
<version>${jackson-databind.version}</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,8 +613,10 @@ private static final ParquetMetadata readFooter(

// Regular file, or encrypted file with plaintext footer
if (!encryptedFooterMode) {
return converter.readParquetMetadata(
ParquetMetadata parquetMetadata = converter.readParquetMetadata(
footerBytesStream, options.getMetadataFilter(), fileDecryptor, false, fileMetadataLength);
parquetMetadata.setInputFile(file);
return parquetMetadata;
}

// Encrypted file with encrypted footer
Expand All @@ -625,7 +627,10 @@ private static final ParquetMetadata readFooter(
fileDecryptor.setFileCryptoMetaData(
fileCryptoMetaData.getEncryption_algorithm(), true, fileCryptoMetaData.getKey_metadata());
// footer length is required only for signed plaintext footers
return converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter(), fileDecryptor, true, 0);
ParquetMetadata parquetMetadata =
converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter(), fileDecryptor, true, 0);
parquetMetadata.setInputFile(file);
return parquetMetadata;
}

/**
Expand Down Expand Up @@ -824,12 +829,19 @@ public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer)
}

public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException {
this(file, options, null);
}

public ParquetFileReader(InputFile file, ParquetReadOptions options, ParquetMetadata footer) throws IOException {
this.converter = new ParquetMetadataConverter(options);
this.file = file;
this.f = file.newStream();
this.options = options;
try {
this.footer = readFooter(file, options, f, converter);
if (footer == null) {
footer = readFooter(file, options, f, converter);
}
this.footer = footer;
} catch (Exception e) {
// In case that reading footer throws an exception in the constructor, the new stream
// should be closed. Otherwise, there's no way to close this outside.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.parquet.hadoop;

import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
Expand All @@ -36,6 +37,7 @@
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;

Expand All @@ -55,6 +57,9 @@ public class ParquetInputSplit extends FileSplit implements Writable {
private long end;
private long[] rowGroupOffsets;

@JsonIgnore
private volatile ParquetMetadata footer;

/**
* Writables must have a parameterless constructor
*/
Expand Down Expand Up @@ -222,6 +227,14 @@ public long[] getRowGroupOffsets() {
return rowGroupOffsets;
}

public ParquetMetadata getFooter() {
return footer;
}

public void setFooter(ParquetMetadata footer) {
Copy link
Contributor

@ConeyLiu ConeyLiu Jan 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ParquetInputSplit is marked as deprecated. And the recommended usage is FileSplit. How does Spark set the footer after the ParquetInputSplit is removed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now parquet will convert the input split to ParquetInputSplit and build the reader with it. I think if ParquetInputSplit was removed from ParquetFileReader class, spark need a shim class to work with different parquet version.

That will be a big change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR can be a good reason to push the spark community to migrate. Or we can fix this in only spark 4.x?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already filed a WIP ticket apache/spark#44853 for spark 4 and will discuss about this change in spark side in that PR after this PR is merged.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, other comments have suggested that we should not work on a deprecated interface. Therefore I don't expect this PR will be merged as is. It would be good to figure out the final solution on the spark side before any action here.

this.footer = footer;
}

@Override
public String toString() {
String hosts;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.parquet.hadoop.util.ContextUtil;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.ParquetDecodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -155,8 +156,13 @@ private void initializeInternalReader(ParquetInputSplit split, Configuration con
}

// open a reader with the metadata filter
ParquetFileReader reader =
ParquetFileReader.open(HadoopInputFile.fromPath(path, configuration), optionsBuilder.build());
InputFile inputFile;
if (split.getFooter() != null && split.getFooter().getInputFile() != null) {
inputFile = split.getFooter().getInputFile();
} else {
inputFile = HadoopInputFile.fromPath(path, configuration);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the filestatus (or at least file length) can get down here then it becomes possible to skip a HEAD request when opening a file against cloud storage. the api you need is in 3.3.0, and not very reflection friendly. we could add something to assist there.

what is key is: get as much info as possible into HadoopInputFile, especially expected length

}
ParquetFileReader reader = new ParquetFileReader(inputFile, optionsBuilder.build(), split.getFooter());

if (rowGroupOffsets != null) {
// verify a row group was found for each offset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
*/
package org.apache.parquet.hadoop.metadata;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.List;
import org.apache.parquet.io.InputFile;

/**
* Meta Data block stored in the footer of the file
Expand Down Expand Up @@ -84,6 +86,9 @@ public static ParquetMetadata fromJSON(String json) {
private final FileMetaData fileMetaData;
private final List<BlockMetaData> blocks;

@JsonIgnore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this annotation required?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The jackson mapper will not serialize this field to json with this annotation and keep the the same behavior as before.

private volatile InputFile inputFile;

/**
* @param fileMetaData file level metadata
* @param blocks block level metadata
Expand All @@ -107,6 +112,22 @@ public FileMetaData getFileMetaData() {
return fileMetaData;
}

/**
* Reuse the inputFile in ParquetFileReader if it is not null
* @return
*/
public InputFile getInputFile() {
wgtmac marked this conversation as resolved.
Show resolved Hide resolved
return inputFile;
}

/**
*
* @param inputFile Cache the inputFile in readFooter method and reuse it in ParquetFileReader
*/
public void setInputFile(InputFile inputFile) {
this.inputFile = inputFile;
}

@Override
public String toString() {
return "ParquetMetaData{" + fileMetaData + ", blocks: " + blocks + "}";
Expand Down