diff --git a/arkouda/io.py b/arkouda/io.py index 7b9e11251b..0a79a1a3f4 100644 --- a/arkouda/io.py +++ b/arkouda/io.py @@ -734,6 +734,7 @@ def read_parquet( tag_data: bool = False, read_nested: bool = True, has_non_float_nulls: bool = False, + fixed_len: int = -1 ) -> Union[ Mapping[ str, @@ -783,6 +784,10 @@ def read_parquet( has_non_float_nulls: bool Default False. This flag must be set to True to read non-float parquet columns that contain null values. + fixed_len: int + Default -1. This value can be set for reading Parquet string columns when the + length of each string is known at runtime. This can allow for skipping byte + calculation, which can have an impact on performance. Returns ------- @@ -847,6 +852,7 @@ def read_parquet( tag_data=tag_data, read_nested=read_nested, has_non_float_nulls=has_non_float_nulls, + fixed_len=fixed_len, )[dset] for dset in datasets } @@ -862,6 +868,7 @@ def read_parquet( "filenames": filenames, "tag_data": tag_data, "has_non_float_nulls": has_non_float_nulls, + "fixed_len": fixed_len, }, ) rep = json.loads(rep_msg) # See GenSymIO._buildReadAllMsgJson for json structure @@ -1795,6 +1802,7 @@ def read( column_delim: str = ",", read_nested: bool = True, has_non_float_nulls: bool = False, + fixed_len: int = -1, ) -> Union[ Mapping[ str, @@ -1849,6 +1857,10 @@ def read( has_non_float_nulls: bool Default False. This flag must be set to True to read non-float parquet columns that contain null values. + fixed_len: int + Default -1. This value can be set for reading Parquet string columns when the + length of each string is known at runtime. This can allow for skipping byte + calculation, which can have an impact on performance. Returns ------- @@ -1913,6 +1925,7 @@ def read( allow_errors=allow_errors, read_nested=read_nested, has_non_float_nulls=has_non_float_nulls, + fixed_len=fixed_len, ) elif ftype.lower() == "csv": return read_csv( diff --git a/benchmarks/IO.py b/benchmarks/IO.py index ef3be69444..3112aa343e 100644 --- a/benchmarks/IO.py +++ b/benchmarks/IO.py @@ -87,7 +87,7 @@ def time_ak_write(N_per_locale, numfiles, trials, dtype, path, seed, fileFormat, else: raise ValueError("Invalid file format") - nb = a.size * a.itemsize * numfiles + nb = a.size * a.itemsize * numfiles if dtype != 'str' else a.nbytes * numfiles for key in times.keys(): print("write Average time {} = {:.4f} sec".format(key, times[key])) print("write Average rate {} = {:.4f} GiB/sec".format(key, nb / 2**30 / times[key])) @@ -140,7 +140,7 @@ def time_ak_read(N_per_locale, numfiles, trials, dtype, path, fileFormat, comps= else: raise ValueError("Invalid file format") - nb = a.size * a.itemsize + nb = a.size * a.itemsize * numfiles if dtype != 'str' else a.nbytes * numfiles for key in times.keys(): print("read Average time {} = {:.4f} sec".format(key, times[key])) print("read Average rate {} = {:.4f} GiB/sec".format(key, nb / 2**30 / times[key])) diff --git a/src/ParquetMsg.chpl b/src/ParquetMsg.chpl index 1c13bfafa3..51a9d3bd70 100644 --- a/src/ParquetMsg.chpl +++ b/src/ParquetMsg.chpl @@ -814,6 +814,8 @@ module ParquetMsg { var tagData: bool = msgArgs.get("tag_data").getBoolValue(); var strictTypes: bool = msgArgs.get("strict_types").getBoolValue(); + var fixedLen = msgArgs.get('fixed_len').getIntValue() + 1; + var allowErrors: bool = msgArgs.get("allow_errors").getBoolValue(); // default is false var hasNonFloatNulls: bool = msgArgs.get("has_non_float_nulls").getBoolValue(); if allowErrors { @@ -983,57 +985,20 @@ module ParquetMsg { } rnames.pushBack((dsetname, ObjType.PDARRAY, valName)); } else if ty == ArrowTypes.stringArr { - /* - 1. create a block distributed files array (locale owner reads file) - 2. get number of row groups so we know how much data we have to store - 3. create array to store data (2D array with same distribution dist files) - 4. go distributed and create readers for each file - */ - extern proc c_getNumRowGroups(readerIdx): c_int; - extern proc c_openFile(filename, idx); - extern proc c_createRowGroupReader(rowGroup, readerIdx); - extern proc c_createColumnReader(colname, readerIdx); - extern proc c_freeMapValues(rowToFree); - extern proc c_readParquetColumnChunks(filename, batchSize, - numElems, readerIdx, numRead, - externalData, defLevels, errMsg): int; - var entrySeg = createSymEntry(len, int); - var distFiles = makeDistArray(filenames); - var numRowGroups: [distFiles.domain] int; - - var maxRowGroups = getRowGroupNums(distFiles, numRowGroups); - var externalData: [distFiles.domain] [0..#maxRowGroups] c_ptr(void); - var containsNulls: [distFiles.domain] [0..#maxRowGroups] bool; - var valsRead: [distFiles.domain] [0..#maxRowGroups] int; - var bytesPerRG: [distFiles.domain] [0..#maxRowGroups] int; - var startIdxs: [distFiles.domain] [0..#maxRowGroups] int; // correspond to starting idx in entrySeg - - fillSegmentsAndPersistData(distFiles, entrySeg, externalData, containsNulls, valsRead, dsetname, sizes, len, numRowGroups, bytesPerRG, startIdxs); - - var (rgSubdomains, totalBytes) = getRGSubdomains(bytesPerRG, maxRowGroups); - - var entryVal; - - if containsNulls[0][0] { + // Calculate byte sizes by reading or fixed length + if fixedLen < 2 { byteSizes = calcStrSizesAndOffset(entrySeg.a, filenames, sizes, dsetname); - entrySeg.a = (+ scan entrySeg.a) - entrySeg.a; - entryVal = createSymEntry((+ reduce byteSizes), uint(8)); - readStrFilesByName(entryVal.a, whereNull, filenames, byteSizes, dsetname, ty); } else { - entryVal = createSymEntry(totalBytes, uint(8)); - entrySeg.a = (+ scan entrySeg.a) - entrySeg.a; - copyValuesFromC(entryVal, distFiles, externalData, valsRead, numRowGroups, rgSubdomains, maxRowGroups, sizes, entrySeg.a, startIdxs); + entrySeg.a = fixedLen; + byteSizes = fixedLen*len; } + entrySeg.a = (+ scan entrySeg.a) - entrySeg.a; - for i in externalData.domain { - for j in externalData[i].domain { - if valsRead[i][j] > 0 then - on externalData[i][j] do - c_freeMapValues(externalData[i][j]); - } - } + // Read into distributed array + var entryVal = new shared SymEntry((+ reduce byteSizes), uint(8)); + readStrFilesByName(entryVal.a, whereNull, filenames, byteSizes, dsetname, ty); var stringsEntry = assembleSegStringFromParts(entrySeg, entryVal, st); rnames.pushBack((dsetname, ObjType.STRINGS, "%s+%?".doFormat(stringsEntry.name, stringsEntry.nBytes)));