Skip to content

Commit

Permalink
Wip parquet write compression (apache#122)
Browse files Browse the repository at this point in the history
* Revert "set default parquet compression codec to SNAPPy (apache#117)"

This reverts commit 71cad74.

* allow to write parquet with compression

Signed-off-by: Yuan Zhou <[email protected]>

Signed-off-by: Yuan Zhou <[email protected]>
  • Loading branch information
zhouyuan authored Dec 5, 2022
1 parent 2a65e46 commit 7deeef2
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 15 deletions.
3 changes: 3 additions & 0 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileForma

/// \brief Get default write options for this format.
virtual std::shared_ptr<FileWriteOptions> DefaultWriteOptions() = 0;

virtual void SetCodec(std::string codec) {
};
};

/// \brief A Fragment that is stored in a file with a known format
Expand Down
27 changes: 19 additions & 8 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ using parquet::arrow::SchemaField;
using parquet::arrow::SchemaManifest;
using parquet::arrow::StatisticsAsScalars;


/// \brief A ScanTask backed by a parquet file and a RowGroup within a parquet file.
class ParquetScanTask : public ScanTask {
public:
Expand Down Expand Up @@ -406,10 +405,21 @@ Result<std::shared_ptr<FileFragment>> ParquetFileFormat::MakeFragment(
// ParquetFileWriter, ParquetFileWriteOptions
//

void ParquetFileFormat::SetCodec(std::string codec) {
auto maybe_compression = util::Codec::GetCompressionType(codec);
if (maybe_compression.ok()) {
codec_ = maybe_compression.ValueOrDie();
} else {
codec = Compression::UNCOMPRESSED;
}
}

std::shared_ptr<FileWriteOptions> ParquetFileFormat::DefaultWriteOptions() {
std::shared_ptr<ParquetFileWriteOptions> options(
new ParquetFileWriteOptions(shared_from_this()));
options->writer_properties = parquet::default_writer_properties();
options->writer_properties =
parquet::WriterProperties::Builder().compression(codec_)->build();
options->arrow_writer_properties = parquet::default_arrow_writer_properties();
return options;
}
Expand Down Expand Up @@ -499,15 +509,16 @@ Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* r
r_start = leading_cc->dictionary_page_offset();
}
int64_t r_bytes = 0L;
for (int col_id = 0; col_id < parquet_reader->RowGroup(i)
->metadata()->num_columns();
col_id++) {
r_bytes += parquet_reader->
RowGroup(i)->metadata()->ColumnChunk(col_id)->total_compressed_size();
for (int col_id = 0;
col_id < parquet_reader->RowGroup(i)->metadata()->num_columns(); col_id++) {
r_bytes += parquet_reader->RowGroup(i)
->metadata()
->ColumnChunk(col_id)
->total_compressed_size();
}
int64_t midpoint = r_start + r_bytes / 2;
if (midpoint >= source.start_offset()
&& midpoint < (source.start_offset() + source.length())) {
if (midpoint >= source.start_offset() &&
midpoint < (source.start_offset() + source.length())) {
random_read_selected_row_groups.push_back(i);
}
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
std::shared_ptr<FileWriteOptions> options) const override;

std::shared_ptr<FileWriteOptions> DefaultWriteOptions() override;
void SetCodec(std::string codec) override;
private:
Compression::type codec_;
};

/// \brief A FileFragment with parquet logic.
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/jni/dataset/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
JNIEXPORT void JNICALL
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
JNIEnv* env, jobject, jobject itr, jbyteArray schema_bytes, jlong file_format_id,
jstring uri, jobjectArray partition_columns, jint max_partitions,
jstring codec, jstring uri, jobjectArray partition_columns, jint max_partitions,
jstring base_name_template) {
JNI_METHOD_START
JavaVM* vm;
Expand All @@ -745,6 +745,7 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
arrow::fs::FileSystemFromUri(JStringToCString(env, uri), &output_path));
std::vector<std::string> partition_column_vector =
ToStringVector(env, partition_columns);
file_format->SetCodec(JStringToCString(env,codec));
options.file_write_options = file_format->DefaultWriteOptions();
options.filesystem = filesystem;
options.base_dir = output_path;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096;
static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::SNAPPY;
static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED;

class PARQUET_EXPORT ColumnProperties {
public:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ public class DatasetFileWriter {
* @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition
* ID around all written files.
*/
public static void write(Scanner scanner, FileFormat format, String uri,
public static void write(Scanner scanner, FileFormat format, String codec, String uri,
String[] partitionColumns, int maxPartitions, String baseNameTemplate) {
final NativeScannerAdaptorImpl adaptor = new NativeScannerAdaptorImpl(scanner);
final NativeSerializedRecordBatchIterator itr = adaptor.scan();
RuntimeException throwableWrapper = null;
try {
JniWrapper.get().writeFromScannerToFile(itr, SchemaUtility.serialize(scanner.schema()),
format.id(), uri, partitionColumns, maxPartitions, baseNameTemplate);
format.id(), codec, uri, partitionColumns, maxPartitions, baseNameTemplate);
} catch (Throwable t) {
throwableWrapper = new RuntimeException(t);
throw throwableWrapper;
Expand All @@ -70,6 +70,7 @@ public static void write(Scanner scanner, FileFormat format, String uri,
* @param uri target file uri
*/
public static void write(Scanner scanner, FileFormat format, String uri) {
write(scanner, format, uri, new String[0], 1024, "dat_{i}");
//TODO: default to UNCOMPRESSED
write(scanner, format, "snappy", uri, new String[0], 1024, "dat_{i}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public native long newJniMethodReference(String classSignature, String methodNam
* ID around all written files.
*/
public native void writeFromScannerToFile(NativeSerializedRecordBatchIterator itr, byte[] schema,
long fileFormat, String uri, String[] partitionColumns, int maxPartitions,
long fileFormat, String codec, String uri, String[] partitionColumns, int maxPartitions,
String baseNameTemplate);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void testParquetWriteWithPartitions() throws Exception {
final File writtenFolder = TMP.newFolder();
final String writtenParquet = writtenFolder.toURI().toString();
try {
DatasetFileWriter.write(scanner, ParquetFileFormat.createDefault(), writtenParquet, new String[]{"id", "name"}, 100, "dat_{i}");
DatasetFileWriter.write(scanner, ParquetFileFormat.createDefault(),"snappy", writtenParquet, new String[]{"id", "name"}, 100, "dat_{i}");
final Set<String> expectedOutputFiles = new HashSet<>(
Arrays.asList("id=1/name=a/dat_0", "id=2/name=b/dat_1", "id=3/name=c/dat_2", "id=2/name=d/dat_3"));
final Set<String> outputFiles = FileUtils.listFiles(writtenFolder, null, true)
Expand Down

0 comments on commit 7deeef2

Please sign in to comment.