Skip to content

Commit

Permalink
Merge pull request #127 from realratchet/master
Browse files Browse the repository at this point in the history
Fix issue with fail table column order and more granular tqdm
  • Loading branch information
realratchet authored Jan 25, 2024
2 parents d8e4662 + 412f365 commit ff339f8
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 71 deletions.
9 changes: 6 additions & 3 deletions tablite/_nimlite/funcs/column_selector.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ when isMainModule and appType != "lib":

proc columnSelect(table: nimpy.PyObject, cols: nimpy.PyObject, tqdm: nimpy.PyObject, dir_pid: Path, TaskManager: nimpy.PyObject): (nimpy.PyObject, nimpy.PyObject) =
# this is nim-only implementation, the library build doesn't need it because we need TaskManager to be used for slices
var (columns, page_count, is_correct_type, desired_column_map, passed_column_data, failed_column_data, res_cols_pass, res_cols_fail, column_names, reject_reason_name) = collectColumnSelectInfo(table, cols, string dir_pid)
var pbar = tqdm!(total: 100, desc: "column select")
var (columns, page_count, is_correct_type, desired_column_map, passed_column_data, failed_column_data, res_cols_pass, res_cols_fail, column_names, reject_reason_name) = collectColumnSelectInfo(table, cols, string dir_pid, pbar)

if toSeq(is_correct_type.values).all(proc (x: bool): bool = x):
let tbl_pass_columns = collect(initTable()):
Expand Down Expand Up @@ -55,13 +56,13 @@ when isMainModule and appType != "lib":
(el, res_cols_pass[i], res_cols_fail[i])

var page_size = tabliteConfig().Config.PAGE_SIZE.to(int)
var pbar = tqdm!(total: task_list_inp.len, desc: "column select")
var converted = newSeqOfCap[(seq[(string, nimpy.PyObject)], seq[(string, nimpy.PyObject)])](task_list_inp.len)
var pbarStep = 45 / max(task_list_inp.len - 1, 1)

for (columns, res_pass, res_fail) in task_list_inp:
converted.add(doSliceConvert(dir_pid, page_size, columns, reject_reason_name, res_pass, res_fail, desired_column_map, column_names, is_correct_type))

discard pbar.update(1)
discard pbar.update(pbarStep)

proc extendTable(table: var nimpy.PyObject, columns: seq[(string, nimpy.PyObject)]): void {.inline.} =
for (col_name, pg) in columns:
Expand All @@ -73,6 +74,8 @@ when isMainModule and appType != "lib":
tbl_pass.extendTable(pg_pass)
tbl_fail.extendTable(pg_fail)

discard pbar.update(pbar.total.to(float) - pbar.n.to(float))

return (tbl_pass, tbl_fail)

proc newColumnSelectorInfo(column: string, `type`: string, allow_empty: bool, rename: opt.Option[string]): nimpy.PyObject =
Expand Down
27 changes: 20 additions & 7 deletions tablite/_nimlite/funcs/column_selector/collectinfo.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ proc toPageType(name: string): KindObjectND =
of "datetime": return KindObjectND.K_DATETIME
else: raise newException(FieldDefect, "unsupported page type: '" & name & "'")

proc collectColumnSelectInfo*(table: nimpy.PyObject, cols: nimpy.PyObject, dirPid: string): (
proc collectColumnSelectInfo*(table: nimpy.PyObject, cols: nimpy.PyObject, dirPid: string, pbar: nimpy.PyObject): (
Table[string, seq[string]], int, Table[string, bool], OrderedTable[string, DesiredColumnInfo], seq[string], seq[string], seq[ColInfo], seq[ColInfo], seq[string], string
) =
var desiredColumnMap = initOrderedTable[string, DesiredColumnInfo]()
Expand Down Expand Up @@ -62,9 +62,15 @@ proc collectColumnSelectInfo*(table: nimpy.PyObject, cols: nimpy.PyObject, dirPi
allowEmpty: c.get("allow_empty", builtins().False).to(bool)
)

discard pbar.update(3)
discard pbar.display()

######################################################
# 2. Converting types to user specified
######################################################
# Registry of data
var passedColumnData = newSeq[string]()
var failedColumnData = newSeq[string]()
let columns = collect(initTable()):
for pyColName in table.columns:
let colName = pyColName.to(string)
Expand All @@ -73,6 +79,8 @@ proc collectColumnSelectInfo*(table: nimpy.PyObject, cols: nimpy.PyObject, dirPi
for pyPage in pyColPages:
builtins().str(pyPage.path.absolute()).to(string)

failedColumnData.add(colName)

{colName: pages}

let columnNames = collect: (for k in columns.keys: k)
Expand All @@ -93,10 +101,6 @@ proc collectColumnSelectInfo*(table: nimpy.PyObject, cols: nimpy.PyObject, dirPi

let pageCount = layoutSet[0][1]

# Registry of data
var passedColumnData = newSeq[string]()
var failedColumnData = newSeq[string]()

var cols = initTable[string, seq[string]]()

var resColsPass = newSeqOfCap[ColInfo](max(pageCount - 1, 0))
Expand All @@ -110,6 +114,11 @@ proc collectColumnSelectInfo*(table: nimpy.PyObject, cols: nimpy.PyObject, dirPi

proc genpage(dirpid: string): ColSliceInfo {.inline.} = (dir_pid, tabliteBase().SimplePage.next_id(dir_pid).to(int))

discard pbar.update(5)
discard pbar.display()

let colStepSize = (40 / desiredColumnMap.len - 1)

for (desiredNameNonUnique, colDesired) in desiredColumnMap.pairs():
let keys = toSeq(passedColumnData)
let desiredName = uniqueName(desiredNameNonUnique, keys)
Expand Down Expand Up @@ -151,9 +160,10 @@ proc collectColumnSelectInfo*(table: nimpy.PyObject, cols: nimpy.PyObject, dirPi
for i in 0..<pageCount:
resColsPass[i][desiredName] = genpage(dir_pid)

for desiredName in columns.keys:
failedColumnData.add(desiredName)
discard pbar.update(colStepSize)
discard pbar.display()

for desiredName in columns.keys:
for i in 0..<pageCount:
resColsFail[i][desiredName] = genpage(dir_pid)

Expand All @@ -164,4 +174,7 @@ proc collectColumnSelectInfo*(table: nimpy.PyObject, cols: nimpy.PyObject, dirPi

failedColumnData.add(rejectReasonName)

discard pbar.update(2)
discard pbar.display()

return (columns, pageCount, isCorrectType, desiredColumnMap, passedColumnData, failedColumnData, resColsPass, resColsFail, columnNames, rejectReasonName)
4 changes: 2 additions & 2 deletions tablite/_nimlite/nimlite.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ when isLib:
# -------- COLUMN SELECTOR --------
import funcs/column_selector as column_selector

proc collect_column_select_info*(table: PyObject, cols: PyObject, dir_pid: string): (
proc collect_column_select_info*(table: PyObject, cols: PyObject, dir_pid: string, pbar: PyObject): (
Table[string, seq[string]], int, Table[string, bool], PyObject, seq[string], seq[string], seq[column_selector.ColInfo], seq[column_selector.ColInfo], seq[string], string
) {.exportpy.} =
try:
var (columns, page_count, is_correct_type, desired_column_map, passed_column_data, failed_column_data, res_cols_pass, res_cols_fail, column_names, reject_reason_name) = column_selector.collectColumnSelectInfo(table, cols, dir_pid)
var (columns, page_count, is_correct_type, desired_column_map, passed_column_data, failed_column_data, res_cols_pass, res_cols_fail, column_names, reject_reason_name) = column_selector.collectColumnSelectInfo(table, cols, dir_pid, pbar)

return (columns, page_count, is_correct_type, desired_column_map.toPyObj, passed_column_data, failed_column_data, res_cols_pass, res_cols_fail, column_names, reject_reason_name)
except Exception as e:
Expand Down
123 changes: 65 additions & 58 deletions tablite/nimlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,86 +164,93 @@ def collect_cs_info(i: int, columns: dict, res_cols_pass: list, res_cols_fail: l


def column_select(table, cols, tqdm=_tqdm, TaskManager=TaskManager):
T = type(table)
dir_pid = Config.workdir / Config.pid
with tqdm(total=100, desc="column select", bar_format='{desc}: {percentage:3.0f}%|{bar}{r_bar}') as pbar:
T = type(table)
dir_pid = Config.workdir / Config.pid

columns, page_count, is_correct_type, desired_column_map, passed_column_data, failed_column_data, res_cols_pass, res_cols_fail, column_names, reject_reason_name = nl.collect_column_select_info(table, cols, str(dir_pid))
columns, page_count, is_correct_type, desired_column_map, passed_column_data, failed_column_data, res_cols_pass, res_cols_fail, column_names, reject_reason_name = nl.collect_column_select_info(table, cols, str(dir_pid), pbar)

if all(is_correct_type.values()):
tbl_pass_columns = {
desired_name: table[desired_info[0]]
for desired_name, desired_info in desired_column_map.items()
}
if all(is_correct_type.values()):
tbl_pass_columns = {
desired_name: table[desired_info[0]]
for desired_name, desired_info in desired_column_map.items()
}

tbl_fail_columns = {
desired_name: []
for desired_name in failed_column_data
}
tbl_fail_columns = {
desired_name: []
for desired_name in failed_column_data
}

tbl_pass = T(columns=tbl_pass_columns)
tbl_fail = T(columns=tbl_fail_columns)
tbl_pass = T(columns=tbl_pass_columns)
tbl_fail = T(columns=tbl_fail_columns)

return (tbl_pass, tbl_fail)
return (tbl_pass, tbl_fail)

task_list_inp = (
collect_cs_info(i, columns, res_cols_pass, res_cols_fail)
for i in range(page_count)
)
task_list_inp = (
collect_cs_info(i, columns, res_cols_pass, res_cols_fail)
for i in range(page_count)
)

page_size = Config.PAGE_SIZE
page_size = Config.PAGE_SIZE

tasks = (
Task(
nl.do_slice_convert, str(dir_pid), page_size, columns, reject_reason_name, res_pass, res_fail, desired_column_map, column_names, is_correct_type
tasks = (
Task(
nl.do_slice_convert, str(dir_pid), page_size, columns, reject_reason_name, res_pass, res_fail, desired_column_map, column_names, is_correct_type
)
for columns, res_pass, res_fail in task_list_inp
)
for columns, res_pass, res_fail in task_list_inp
)

cpu_count = max(psutil.cpu_count(), 1)
cpu_count = max(psutil.cpu_count(), 1)

if Config.MULTIPROCESSING_MODE == Config.FORCE:
is_mp = True
elif Config.MULTIPROCESSING_MODE == Config.FALSE:
is_mp = False
elif Config.MULTIPROCESSING_MODE == Config.AUTO:
is_multithreaded = cpu_count > 1
is_multipage = page_count > 1
if Config.MULTIPROCESSING_MODE == Config.FORCE:
is_mp = True
elif Config.MULTIPROCESSING_MODE == Config.FALSE:
is_mp = False
elif Config.MULTIPROCESSING_MODE == Config.AUTO:
is_multithreaded = cpu_count > 1
is_multipage = page_count > 1

is_mp = is_multithreaded and is_multipage
is_mp = is_multithreaded and is_multipage

tbl_pass = T({k: [] for k in passed_column_data})
tbl_fail = T({k: [] for k in failed_column_data})
tbl_pass = T({k: [] for k in passed_column_data})
tbl_fail = T({k: [] for k in failed_column_data})

converted = []
pbar = tqdm(total=page_count, desc="column select")
converted = []
step_size = 45 / max(page_count - 1, 1)

if is_mp:
with TaskManager(cpu_count=cpu_count) as tm:
res = tm.execute(list(tasks), pbar=pbar)
class WrapUpdate:
def update(self, n):
pbar.update(n * step_size)

if any(isinstance(r, str) for r in res):
raise Exception("tasks failed")
if is_mp:
with TaskManager(cpu_count=cpu_count) as tm:
res = tm.execute(list(tasks), pbar=WrapUpdate())

converted.extend(res)
else:
for task in tasks:
res = task.execute()
if any(isinstance(r, str) for r in res):
raise Exception("tasks failed")

converted.extend(res)
else:
for task in tasks:
res = task.execute()

if isinstance(res, str):
raise Exception(res)

if isinstance(res, str):
raise Exception(res)
converted.append(res)
pbar.update(1)

converted.append(res)
pbar.update(1)
def extend_table(table, columns):
for (col_name, pg) in columns:
table[col_name].pages.append(pg)

def extend_table(table, columns):
for (col_name, pg) in columns:
table[col_name].pages.append(pg)
for pg_pass, pg_fail in converted:
extend_table(tbl_pass, pg_pass)
extend_table(tbl_fail, pg_fail)

for pg_pass, pg_fail in converted:
extend_table(tbl_pass, pg_pass)
extend_table(tbl_fail, pg_fail)
pbar.update(pbar.total - pbar.n)

return tbl_pass, tbl_fail
return tbl_pass, tbl_fail

def read_page(path):
return nl.read_page(path)
2 changes: 1 addition & 1 deletion tablite/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
major, minor, patch = 2023, 9, 0
major, minor, patch = 2023, 9, 1
__version_info__ = (major, minor, patch)
__version__ = ".".join(str(i) for i in __version_info__)

0 comments on commit ff339f8

Please sign in to comment.