Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closes #3334, #3351: Simplify server side string code and added fixed length #3335

Merged
merged 6 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions arkouda/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,7 @@ def read_parquet(
tag_data: bool = False,
read_nested: bool = True,
has_non_float_nulls: bool = False,
fixed_len: int = -1
) -> Union[
Mapping[
str,
Expand Down Expand Up @@ -783,6 +784,10 @@ def read_parquet(
has_non_float_nulls: bool
Default False. This flag must be set to True to read non-float parquet columns
that contain null values.
fixed_len: int
Default -1. This value can be set for reading Parquet string columns when the
length of each string is known at runtime. This can allow for skipping byte
calculation, which can have an impact on performance.

Returns
-------
Expand Down Expand Up @@ -847,6 +852,7 @@ def read_parquet(
tag_data=tag_data,
read_nested=read_nested,
has_non_float_nulls=has_non_float_nulls,
fixed_len=fixed_len,
)[dset]
for dset in datasets
}
Expand All @@ -862,6 +868,7 @@ def read_parquet(
"filenames": filenames,
"tag_data": tag_data,
"has_non_float_nulls": has_non_float_nulls,
"fixed_len": fixed_len,
},
)
rep = json.loads(rep_msg) # See GenSymIO._buildReadAllMsgJson for json structure
Expand Down Expand Up @@ -1795,6 +1802,7 @@ def read(
column_delim: str = ",",
read_nested: bool = True,
has_non_float_nulls: bool = False,
fixed_len: int = -1,
) -> Union[
Mapping[
str,
Expand Down Expand Up @@ -1849,6 +1857,10 @@ def read(
has_non_float_nulls: bool
Default False. This flag must be set to True to read non-float parquet columns
that contain null values.
fixed_len: int
Default -1. This value can be set for reading Parquet string columns when the
length of each string is known at runtime. This can allow for skipping byte
calculation, which can have an impact on performance.

Returns
-------
Expand Down Expand Up @@ -1913,6 +1925,7 @@ def read(
allow_errors=allow_errors,
read_nested=read_nested,
has_non_float_nulls=has_non_float_nulls,
fixed_len=fixed_len,
)
elif ftype.lower() == "csv":
return read_csv(
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/IO.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def time_ak_write(N_per_locale, numfiles, trials, dtype, path, seed, fileFormat,
else:
raise ValueError("Invalid file format")

nb = a.size * a.itemsize * numfiles
nb = a.size * a.itemsize * numfiles if dtype != 'str' else a.nbytes * numfiles
for key in times.keys():
print("write Average time {} = {:.4f} sec".format(key, times[key]))
print("write Average rate {} = {:.4f} GiB/sec".format(key, nb / 2**30 / times[key]))
Expand Down Expand Up @@ -140,7 +140,7 @@ def time_ak_read(N_per_locale, numfiles, trials, dtype, path, fileFormat, comps=
else:
raise ValueError("Invalid file format")

nb = a.size * a.itemsize
nb = a.size * a.itemsize * numfiles if dtype != 'str' else a.nbytes * numfiles
for key in times.keys():
print("read Average time {} = {:.4f} sec".format(key, times[key]))
print("read Average rate {} = {:.4f} GiB/sec".format(key, nb / 2**30 / times[key]))
Expand Down
53 changes: 9 additions & 44 deletions src/ParquetMsg.chpl
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,8 @@ module ParquetMsg {
var tagData: bool = msgArgs.get("tag_data").getBoolValue();
var strictTypes: bool = msgArgs.get("strict_types").getBoolValue();

var fixedLen = msgArgs.get('fixed_len').getIntValue() + 1;

var allowErrors: bool = msgArgs.get("allow_errors").getBoolValue(); // default is false
var hasNonFloatNulls: bool = msgArgs.get("has_non_float_nulls").getBoolValue();
if allowErrors {
Expand Down Expand Up @@ -983,57 +985,20 @@ module ParquetMsg {
}
rnames.pushBack((dsetname, ObjType.PDARRAY, valName));
} else if ty == ArrowTypes.stringArr {
/*
1. create a block distributed files array (locale owner reads file)
2. get number of row groups so we know how much data we have to store
3. create array to store data (2D array with same distribution dist files)
4. go distributed and create readers for each file
*/
extern proc c_getNumRowGroups(readerIdx): c_int;
extern proc c_openFile(filename, idx);
extern proc c_createRowGroupReader(rowGroup, readerIdx);
extern proc c_createColumnReader(colname, readerIdx);
extern proc c_freeMapValues(rowToFree);
extern proc c_readParquetColumnChunks(filename, batchSize,
numElems, readerIdx, numRead,
externalData, defLevels, errMsg): int;

var entrySeg = createSymEntry(len, int);

var distFiles = makeDistArray(filenames);
var numRowGroups: [distFiles.domain] int;

var maxRowGroups = getRowGroupNums(distFiles, numRowGroups);
var externalData: [distFiles.domain] [0..#maxRowGroups] c_ptr(void);
var containsNulls: [distFiles.domain] [0..#maxRowGroups] bool;
var valsRead: [distFiles.domain] [0..#maxRowGroups] int;
var bytesPerRG: [distFiles.domain] [0..#maxRowGroups] int;
var startIdxs: [distFiles.domain] [0..#maxRowGroups] int; // correspond to starting idx in entrySeg

fillSegmentsAndPersistData(distFiles, entrySeg, externalData, containsNulls, valsRead, dsetname, sizes, len, numRowGroups, bytesPerRG, startIdxs);

var (rgSubdomains, totalBytes) = getRGSubdomains(bytesPerRG, maxRowGroups);

var entryVal;

if containsNulls[0][0] {
// Calculate byte sizes by reading or fixed length
if fixedLen < 2 {
byteSizes = calcStrSizesAndOffset(entrySeg.a, filenames, sizes, dsetname);
entrySeg.a = (+ scan entrySeg.a) - entrySeg.a;
entryVal = createSymEntry((+ reduce byteSizes), uint(8));
readStrFilesByName(entryVal.a, whereNull, filenames, byteSizes, dsetname, ty);
} else {
entryVal = createSymEntry(totalBytes, uint(8));
entrySeg.a = (+ scan entrySeg.a) - entrySeg.a;
copyValuesFromC(entryVal, distFiles, externalData, valsRead, numRowGroups, rgSubdomains, maxRowGroups, sizes, entrySeg.a, startIdxs);
entrySeg.a = fixedLen;
stress-tess marked this conversation as resolved.
Show resolved Hide resolved
byteSizes = fixedLen*len;
}

for i in externalData.domain {
for j in externalData[i].domain {
if valsRead[i][j] > 0 then
on externalData[i][j] do
c_freeMapValues(externalData[i][j]);
}
}
// Read into distributed array
var entryVal = new shared SymEntry((+ reduce byteSizes), uint(8));
readStrFilesByName(entryVal.a, whereNull, filenames, byteSizes, dsetname, ty);

var stringsEntry = assembleSegStringFromParts(entrySeg, entryVal, st);
rnames.pushBack((dsetname, ObjType.STRINGS, "%s+%?".doFormat(stringsEntry.name, stringsEntry.nBytes)));
Expand Down
Loading