diff --git a/ServerModules.cfg b/ServerModules.cfg index 5b92a6eae1..bbf0b58776 100644 --- a/ServerModules.cfg +++ b/ServerModules.cfg @@ -5,6 +5,7 @@ ArraySetopsMsg KExtremeMsg ArgSortMsg SegmentedMsg +DataFrameIndexingMsg OperatorMsg RandMsg IndexingMsg diff --git a/arkouda/dataframe.py b/arkouda/dataframe.py index 9e4474195f..6538478a88 100644 --- a/arkouda/dataframe.py +++ b/arkouda/dataframe.py @@ -4,12 +4,14 @@ from warnings import warn import pandas as pd # type: ignore import random +import json +from typing import cast from arkouda.segarray import SegArray from arkouda.pdarrayclass import pdarray from arkouda.categorical import Categorical from arkouda.strings import Strings -from arkouda.pdarraycreation import arange, array +from arkouda.pdarraycreation import arange, array, create_pdarray from arkouda.groupbyclass import GroupBy as akGroupBy from arkouda.pdarraysetops import concatenate, unique, intersect1d, in1d from arkouda.pdarrayIO import save_all, load_all @@ -17,7 +19,7 @@ from arkouda.dtypes import float64 as akfloat64 from arkouda.sorting import argsort, coargsort from arkouda.numeric import where -from arkouda.client import maxTransferBytes +from arkouda.client import maxTransferBytes, generic_msg from arkouda.row import Row from arkouda.alignment import in1dmulti from arkouda.series import Series @@ -422,6 +424,56 @@ def _get_head_tail(self): newdf._set_index(idx) return newdf.to_pandas(retain_index=True) + def _get_head_tail_server(self): + if self._empty: + return pd.DataFrame() + self.update_size() + maxrows = pd.get_option('display.max_rows') + if self._size <= maxrows: + newdf = DataFrame() + for col in self._columns: + if isinstance(self[col], Categorical): + newdf[col] = self[col].categories[self[col].codes] + else: + newdf[col] = self[col] + newdf._set_index(self.index) + return newdf.to_pandas(retain_index=True) + # Being 1 above the threshold causes the PANDAS formatter to split the data frame vertically + idx = array(list(range(maxrows // 2 + 1)) + list(range(self._size - (maxrows // 2), self._size))) + msg_list = [] + for col in self._columns: + if isinstance(self[col], Categorical): + msg_list.append(f"Categorical+{col}+{self[col].codes.name}+{self[col].categories.name}") + elif isinstance(self[col], SegArray): + msg_list.append(f"SegArray+{col}+{self[col].segments.name}+{self[col].values.name}") + elif isinstance(self[col], Strings): + msg_list.append(f"Strings+{col}+{self[col].name}") + else: + msg_list.append(f"pdarray+{col}+{self[col].name}") + + repMsg = cast(str, generic_msg(cmd="dataframe_idx", args="{} {} {}". + format(len(msg_list), idx.name, json.dumps(msg_list)))) + msgList = json.loads(repMsg) + + df_dict = {} + for m in msgList: + # Split to [datatype, column, create] + msg = m.split("+", 2) + t = msg[0] + if t == "Strings": + # Categorical is returned as a strings by indexing categories[codes[idx]] + df_dict[msg[1]] = Strings.from_return_msg(msg[2]) + elif t == "SegArray": + # split creates for segments and values + eles = msg[2].split("+") + df_dict[msg[1]] = SegArray(create_pdarray(eles[0]), create_pdarray(eles[1])) + else: + df_dict[msg[1]] = create_pdarray(msg[2]) + + new_df = DataFrame(df_dict) + new_df._set_index(idx) + return new_df.to_pandas(retain_index=True)[self._columns] + def _shape_str(self): return "{} rows x {} columns".format(self.size, self._ncols()) @@ -440,8 +492,8 @@ def _repr_html_(self): """ Return html-formatted version of the dataframe. """ - prt = self._get_head_tail() + with pd.option_context("display.show_dimensions", False): retval = prt._repr_html_() retval += "

" + self._shape_str() + "

" diff --git a/benchmarks/dataframe.py b/benchmarks/dataframe.py new file mode 100644 index 0000000000..ebe37b5007 --- /dev/null +++ b/benchmarks/dataframe.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 + +import time, argparse +import numpy as np +import pandas as pd + +import arkouda as ak + +OPS = ['_get_head_tail_server', '_get_head_tail'] +TYPES = ('int64', 'uint64',) + + +def generate_dataframe(N, seed): + types = [ak.Categorical, ak.pdarray, ak.Strings, ak.SegArray] + + # generate random columns to build dataframe + df_dict = {} + np.random.seed(seed) + for x in range(20): # loop to create 20 random columns + key = f"c_{x}" + d = types[x % 4] + if d == ak.Categorical: + str_arr = ak.random_strings_uniform(minlen=5, maxlen=6, size=N, seed=seed) + df_dict[key] = ak.Categorical(str_arr) + elif d == ak.pdarray: + df_dict[key] = ak.array(np.random.randint(0, 2 ** 32, N)) + elif d == ak.Strings: + df_dict[key] = ak.random_strings_uniform(minlen=5, maxlen=6, size=N, seed=seed) + elif d == ak.SegArray: + df_dict[key] = ak.SegArray(ak.arange(0, N*5, 5), ak.array(np.random.randint(0, 2 ** 32, N*5))) + + return ak.DataFrame(df_dict) + + +def time_ak_df_display(N_per_locale, trials, seed): + print(">>> arkouda dataframe display") + cfg = ak.get_config() + N = N_per_locale * cfg["numLocales"] + print("numLocales = {}, N = {:,}".format(cfg["numLocales"], N)) + + pd.set_option("display.max_rows", 100) + pd.set_option("display.min_rows", 10) + pd.set_option("display.max_columns", 20) + + df = generate_dataframe(N, seed) + + timings = {op: [] for op in OPS} + results = {} + for i in range(trials): + timings = {op: [] for op in OPS} + for op in timings.keys(): + fxn = getattr(df, op) + start = time.time() + r = fxn() + end = time.time() + timings[op].append(end - start) + results[op] = r + + tavg = {op: sum(t) / trials for op, t in timings.items()} + + # calculate nbytes based on the columns + nbytes = 0 + for col in df.columns: + col_obj = df[col] + if isinstance(col_obj, ak.pdarray): + nbytes += col_obj.size * col_obj.itemsize + elif isinstance(col_obj, ak.Categorical): + nbytes += col_obj.codes.size * col_obj.codes.itemsize + elif isinstance(col_obj, ak.Strings): + nbytes += col_obj.nbytes * col_obj.entry.itemsize + elif isinstance(col_obj, ak.SegArray): + nbytes += col_obj.values.size * col_obj.values.itemsize + + for op, t in tavg.items(): + print(" {} Average time = {:.4f} sec".format(op, t)) + bytes_per_sec = nbytes / t + print(" {} Average rate = {:.2f} GiB/sec".format(op, bytes_per_sec / 2 ** 30)) + + +def check_correctness(N_per_locale, seed): + cfg = ak.get_config() + N = N_per_locale * cfg["numLocales"] + df = generate_dataframe(N, seed) + + pd.set_option("display.max_rows", 100) + pd.set_option("display.min_rows", 10) + pd.set_option("display.max_columns", 20) + + printdf = df._get_head_tail_server() # measure the pandas df returned + # Mainly want to verify shape for the print + assert(printdf.shape[0] == 101) + assert(printdf.shape[1] == 20) + + +def create_parser(): + parser = argparse.ArgumentParser(description="Run the dataframe display benchmarks: " + "_get_head_tail, _get_head_tail_server") + parser.add_argument('hostname', help='Hostname of arkouda server') + parser.add_argument('port', type=int, help='Port of arkouda server') + parser.add_argument('-n', '--size', type=int, default=10**4, help='Problem size: length of columns in dataframe.') + parser.add_argument('-t', '--trials', type=int, default=1, help='Number of times to run the benchmark') + parser.add_argument('-d', '--dtype', default='int64', help='Dtype of array ({})'.format(', '.join(TYPES))) + parser.add_argument('--correctness-only', default=False, action='store_true', + help='Only check correctness, not performance.') + parser.add_argument('-s', '--seed', default=None, type=int, help='Value to initialize random number generator') + return parser + + +if __name__ == "__main__": + import sys + + parser = create_parser() + args = parser.parse_args() + if args.dtype not in TYPES: + raise ValueError("Dtype must be {}, not {}".format('/'.join(TYPES), args.dtype)) + + ak.verbose = False + ak.connect(args.hostname, args.port) + + if args.correctness_only: + for dtype in TYPES: + check_correctness(args.size, seed) + sys.exit(0) + + print("array size = {:,}".format(args.size)) + print("number of trials = ", args.trials) + time_ak_df_display(args.size, args.trials, args.seed) + + sys.exit(0) diff --git a/src/DataFrameIndexingMsg.chpl b/src/DataFrameIndexingMsg.chpl new file mode 100644 index 0000000000..d0bbf4625c --- /dev/null +++ b/src/DataFrameIndexingMsg.chpl @@ -0,0 +1,222 @@ + module DataFrameIndexingMsg + { + use ServerConfig; + use ServerErrorStrings; + + use Reflection; + use ServerErrors; + use Logging; + use Message; + use SegmentedMsg; + use AryUtil; + + use MultiTypeSymEntry; + use MultiTypeSymbolTable; + + private config const logLevel = ServerConfig.logLevel; + const dfiLogger = new Logger(logLevel); + + // gather indexing by integer index vector + proc dfIdxHelper(idx: borrowed SymEntry(int), columnVals: borrowed SymEntry(?t), st: borrowed SymTab, col: string, rtnName: bool=false): string throws { + param pn = Reflection.getRoutineName(); + // get next symbol name + var rname = st.nextName(); + + if (columnVals.size == 0) && (idx.size == 0) { + var a = st.addEntry(rname, 0, t); + var repMsg = "pdarray+%s+created %s".format(col, st.attrib(rname)); + return repMsg; + } + var idxMin = min reduce idx.a; + var idxMax = max reduce idx.a; + if idxMin < 0 { + var errorMsg = "Error: %s: OOBindex %i < 0".format(pn,idxMin); + dfiLogger.error(getModuleName(),getRoutineName(),getLineNumber(),errorMsg); + throw new owned IllegalArgumentError(errorMsg); + } + if idxMax >= columnVals.size { + var errorMsg = "Error: %s: OOBindex %i > %i".format(pn,idxMin,columnVals.size-1); + dfiLogger.error(getModuleName(),getRoutineName(),getLineNumber(),errorMsg); + throw new owned IllegalArgumentError(errorMsg); + } + var a = st.addEntry(rname, idx.size, t); + ref a2 = columnVals.a; + ref iva = idx.a; + ref aa = a.a; + forall (a1,idx) in zip(aa,iva) { + a1 = a2[idx]; + } + + if rtnName { + return rname; + } + + var repMsg = "pdarray+%s+created %s".format(col, st.attrib(rname)); + return repMsg; + } + + proc df_seg_array_idx(idx: borrowed SymEntry(int), segments: borrowed SymEntry(int), values: borrowed SymEntry(?t), col: string, st: borrowed SymTab): string throws { + var lens: [0..#idx.size] int; + var orig_segs: [0..#idx.size] int = segments.a[idx.a]; + + const ref high = orig_segs.domain.high; + forall (i, os, l) in zip(orig_segs.domain, orig_segs, lens){ + if(i == high) { + l = values.size - os; + } else { + l = orig_segs[i+1] - os; + } + } + + var rvals: [0..#(+ reduce lens)] t; + var rsegs = (+ scan lens) - lens; + + forall(i, rs, os, l) in zip(orig_segs.domain, rsegs, orig_segs, lens){ + var v = new lowLevelLocalizingSlice(values.a, os..#l); + for j in 0..#l{ + rvals[rs+j] = v.ptr[j]; + } + } + + var s_name = st.nextName(); + st.addEntry(s_name, new shared SymEntry(rsegs)); + var v_name = st.nextName(); + st.addEntry(v_name, new shared SymEntry(rvals)); + + return "SegArray+%s+created %s+created %s".format(col, st.attrib(s_name), st.attrib(v_name)); + } + + proc dataframeBatchIndexingMsg(cmd: string, payload: string, st: borrowed SymTab): MsgTuple throws { + param pn = Reflection.getRoutineName(); + var repMsg: string; // response message + // split request into fields + var (jsonsize_str, iname, json_str) = payload.splitMsgToTuple(3); + + var jsonsize: int; + try{ + jsonsize = jsonsize_str: int; + } + catch { + var errorMsg = "jsonsize could not be interpreted as an int. %s)".format(jsonsize_str); + dfiLogger.error(getModuleName(),getRoutineName(),getLineNumber(),errorMsg); + throw new owned IllegalArgumentError(errorMsg); + } + + var eleList = jsonToPdArray(json_str, jsonsize); + + var gIdx: borrowed GenSymEntry = getGenericTypedArrayEntry(iname, st); + var idx = toSymEntry(gIdx, int); + + var repMsgList: [0..#jsonsize] string; + + forall (i, rpm, ele) in zip(repMsgList.domain, repMsgList, eleList) { + var ele_parts = ele.split("+"); + ref col_name = ele_parts[1]; + select (ele_parts[0]) { + when ("Categorical") { + dfiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),"Element at %i is Categorical".format(i)); + ref codes_name = ele_parts[2]; + ref categories_name = ele_parts[3]; + dfiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),"Codes Name: %s, Categories Name: %s".format(codes_name, categories_name)); + + var gCode: borrowed GenSymEntry = getGenericTypedArrayEntry(codes_name, st); + var code_vals = toSymEntry(gCode, int); + var idxCodeName = dfIdxHelper(idx, code_vals, st, col_name, true); + + var args: [1..2] string = [categories_name, idxCodeName]; + var repTup = segPdarrayIndex("str", args, st); + if repTup.msgType == MsgType.ERROR { + throw new IllegalArgumentError(repTup.msg); + } + + rpm = "%jt".format("Strings+%s+%s".format(col_name, repTup.msg)); + } + when ("Strings") { + dfiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),"Element at %i is Strings".format(i)); + var args: [1..2] string = [ele_parts[2], iname]; + var repTup = segPdarrayIndex("str", args, st); + if repTup.msgType == MsgType.ERROR { + throw new IllegalArgumentError(repTup.msg); + } + + rpm = "%jt".format("Strings+%s+%s".format(col_name, repTup.msg)); + } + when ("pdarray"){ + dfiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),"Element at %i is pdarray".format(i)); + var gCol: borrowed GenSymEntry = getGenericTypedArrayEntry(ele_parts[2], st); + select (gCol.dtype) { + when (DType.Int64) { + var col_vals = toSymEntry(gCol, int); + rpm = "%jt".format(dfIdxHelper(idx, col_vals, st, col_name)); + } + when (DType.UInt64) { + var col_vals = toSymEntry(gCol, uint); + rpm = "%jt".format(dfIdxHelper(idx, col_vals, st, col_name)); + } + when (DType.Bool) { + var col_vals = toSymEntry(gCol, bool); + rpm = "%jt".format(dfIdxHelper(idx, col_vals, st, col_name)); + } + when (DType.Float64){ + var col_vals = toSymEntry(gCol, real); + rpm = "%jt".format(dfIdxHelper(idx, col_vals, st, col_name)); + } + otherwise { + var errorMsg = notImplementedError(pn,dtype2str(gCol.dtype)); + dfiLogger.error(getModuleName(),getRoutineName(),getLineNumber(),errorMsg); + throw new IllegalArgumentError(errorMsg); + } + } + } + when ("SegArray"){ + dfiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),"Element at %i is SegArray".format(i)); + ref segments_name = ele_parts[2]; + ref values_name = ele_parts[3]; + dfiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),"Segments Name: %s, Values Name: %s".format(segments_name, values_name)); + + var gSeg: borrowed GenSymEntry = getGenericTypedArrayEntry(segments_name, st); + var segments = toSymEntry(gSeg, int); + var gVal: borrowed GenSymEntry = getGenericTypedArrayEntry(values_name, st); + select(gVal.dtype){ + when(DType.Int64){ + var values = toSymEntry(gVal, int); + rpm = "%jt".format(df_seg_array_idx(idx, segments, values, col_name, st)); + } + when(DType.UInt64){ + var values = toSymEntry(gVal, uint); + rpm = "%jt".format(df_seg_array_idx(idx, segments, values, col_name, st)); + } + when(DType.Float64){ + var values = toSymEntry(gVal, real); + rpm = "%jt".format(df_seg_array_idx(idx, segments, values, col_name, st)); + } + when(DType.Bool){ + var values = toSymEntry(gVal, bool); + rpm = "%jt".format(df_seg_array_idx(idx, segments, values, col_name, st)); + } + otherwise { + var errorMsg = notImplementedError(pn,dtype2str(gVal.dtype)); + dfiLogger.error(getModuleName(),getRoutineName(),getLineNumber(),errorMsg); + throw new IllegalArgumentError(errorMsg); + } + } + } + otherwise { + var errorMsg = notImplementedError(pn, ele_parts[0]); + dfiLogger.error(getModuleName(),getRoutineName(),getLineNumber(),errorMsg); + throw new IllegalArgumentError(errorMsg); + } + } + } + + repMsg = "[%s]".format(",".join(repMsgList)); + dfiLogger.debug(getModuleName(),getRoutineName(),getLineNumber(),repMsg); + return new MsgTuple(repMsg, MsgType.NORMAL); + } + + + proc registerMe() { + use CommandMap; + registerFunction("dataframe_idx", dataframeBatchIndexingMsg, getModuleName()); + } +} \ No newline at end of file