Skip to content

Commit

Permalink
Correcting issue with empty segments not being written to file for si…
Browse files Browse the repository at this point in the history
…ngle SegArray with Strings values. (#2562)
  • Loading branch information
Ethan-DeBandi99 authored Jul 11, 2023
1 parent 36bf9f8 commit 0eb3008
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 10 deletions.
27 changes: 18 additions & 9 deletions src/ArrowFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1431,18 +1431,27 @@ int cpp_writeStrListColumnToParquet(const char* filename, void* chpl_segs, void*
int64_t count = 0;
while (numLeft > 0 && count < rowGroupSize) { // ensures rowGroupSize maintained
int64_t segmentLength = segments[segIdx+1] - segments[segIdx];
for (int64_t x = 0; x < segmentLength; x++){
int16_t rep_lvl = (x == 0) ? 0 : 1;
int16_t def_lvl = 3;
parquet::ByteArray value;
value.ptr = reinterpret_cast<const uint8_t*>(&chpl_ptr[valIdx]);
value.len = offsets[offIdx+1] - offsets[offIdx] - 1;
ba_writer->WriteBatch(1, &def_lvl, &rep_lvl, &value);
offIdx++;
valIdx+=offsets[offIdx] - offsets[offIdx-1];
if (segmentLength > 0) {
for (int64_t x = 0; x < segmentLength; x++){
int16_t rep_lvl = (x == 0) ? 0 : 1;
int16_t def_lvl = 3;
parquet::ByteArray value;
value.ptr = reinterpret_cast<const uint8_t*>(&chpl_ptr[valIdx]);
value.len = offsets[offIdx+1] - offsets[offIdx] - 1;
ba_writer->WriteBatch(1, &def_lvl, &rep_lvl, &value);
offIdx++;
valIdx+=offsets[offIdx] - offsets[offIdx-1];
}
} else {
// empty segment denoted by null value that is not repeated (first of segment) defined at the list level (1)
segmentLength = 1; // even though segment is length=0, write null to hold the empty segment
int16_t def_lvl = 1;
int16_t rep_lvl = 0;
ba_writer->WriteBatch(segmentLength, &def_lvl, &rep_lvl, nullptr);
}
segIdx++;
numLeft--;count++;

}
}

Expand Down
2 changes: 1 addition & 1 deletion src/ParquetMsg.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ module ParquetMsg {
var entrySeg = new shared SymEntry((+ reduce listSizes), int);
var byteSizes = calcStrSizesAndOffset(entrySeg.a, filenames, listSizes, dsetname);
entrySeg.a = (+ scan entrySeg.a) - entrySeg.a;

var entryVal = new shared SymEntry((+ reduce byteSizes), uint(8));
readListFilesByName(entryVal.a, sizes, seg_sizes, segments, filenames, byteSizes, dsetname, ty);
var stringsEntry = assembleSegStringFromParts(entrySeg, entryVal, st);
Expand Down
8 changes: 8 additions & 0 deletions tests/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,14 @@ def test_segarray_string(self):
self.assertListEqual(x.values.to_list(), rd.values.to_list())
self.assertListEqual(x.to_list(), rd.to_list())

# additional testing for empty segments. See Issue #2560
a, b, c = ["one", "two", "three"], ["un", "deux", "trois"], ["uno", "dos", "tres"]
s = ak.SegArray(ak.array([0, 0, len(a), len(a), len(a), len(a) + len(c)]), ak.array(a + c))
with tempfile.TemporaryDirectory(dir=ParquetTest.par_test_base_tmp) as tmp_dirname:
s.to_parquet(f"{tmp_dirname}/segarray_test_empty")
rd_data = ak.read_parquet(f"{tmp_dirname}/segarray_test_empty_*")
self.assertListEqual(s.to_list(), rd_data.to_list())

@pytest.mark.optional_parquet
def test_against_standard_files(self):
datadir = "resources/parquet-testing"
Expand Down

0 comments on commit 0eb3008

Please sign in to comment.