Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename temporary column to something less likely to be used #1846

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions nvtabular/ops/categorify.py
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,8 @@ def _encode(
selection_r = ColumnSelector(name if isinstance(name, list) else [storage_name])
list_col = is_list_col(selection_l, df)

tmp_label_column = "__labels_tmp"

# Find number of oov buckets
if buckets and storage_name in buckets:
num_oov_buckets = buckets[storage_name]
Expand Down Expand Up @@ -1642,17 +1644,20 @@ def _encode(
cats_only=True,
reader=read_pq_func,
)
if len(value) and value["labels"].iloc[0] < OOV_OFFSET + num_oov_buckets:
if (
len(value)
and value[tmp_label_column].iloc[0] < OOV_OFFSET + num_oov_buckets
):
# See: https://github.com/rapidsai/cudf/issues/12837
value["labels"] += OOV_OFFSET + num_oov_buckets
value[tmp_label_column] += OOV_OFFSET + num_oov_buckets
else:
value = read_pq_func( # pylint: disable=unexpected-keyword-arg
path,
columns=selection_r.names,
**({"split_row_groups": False} if split_out > 1 else {}),
)

value.index = value.index.rename("labels")
value.index = value.index.rename(tmp_label_column)
if split_out > 1:
value = value.reset_index(drop=False)
if type(df).__module__.split(".")[0] == "cudf":
Expand All @@ -1665,7 +1670,7 @@ def _encode(
part_size = file_frag.metadata.num_rows
ranges.append((size, size + part_size))
size += part_size
value["labels"] = dd.from_map(lambda r: pd.RangeIndex(*r), ranges)
value[tmp_label_column] = dd.from_map(lambda r: pd.RangeIndex(*r), ranges)
else:
value.reset_index(drop=False, inplace=True)

Expand All @@ -1674,7 +1679,7 @@ def _encode(
for c in selection_r.names:
typ = df[selection_l.names[0]].dtype if len(selection_l.names) == 1 else df[c].dtype
value[c] = nullable_series([None], df, typ)
value.index = value.index.rename("labels")
value.index = value.index.rename(tmp_label_column)
value.reset_index(drop=False, inplace=True)

use_collection = isinstance(value, DaskDataFrame)
Expand All @@ -1684,7 +1689,7 @@ def _encode(
use_collection = False

# Determine encoding offsets
null_encoding_offset = value["labels"].head(1).iloc[0] if single_table else NULL_OFFSET
null_encoding_offset = value[tmp_label_column].head(1).iloc[0] if single_table else NULL_OFFSET
bucket_encoding_offset = null_encoding_offset + 1 # 2 (if not single_table)
distinct_encoding_offset = bucket_encoding_offset + num_oov_buckets

Expand Down Expand Up @@ -1727,7 +1732,7 @@ def _encode(
left_on=selection_l.names,
right_on=selection_r.names,
how="left",
).dropna(subset=["labels"])
).dropna(subset=[tmp_label_column])
for part in value.partitions
],
ignore_index=False,
Expand All @@ -1741,11 +1746,11 @@ def _encode(
if len(merged_df) < len(codes):
# Missing nulls
labels = df._constructor_sliced(indistinct)
labels.iloc[merged_df["order"]] = merged_df["labels"]
labels.iloc[merged_df["order"]] = merged_df[tmp_label_column]
labels = labels.values
else:
merged_df["labels"].fillna(df._constructor_sliced(indistinct), inplace=True)
labels = merged_df["labels"].values
merged_df[tmp_label_column].fillna(df._constructor_sliced(indistinct), inplace=True)
labels = merged_df[tmp_label_column].values
else:
# no hashing
if use_collection:
Expand All @@ -1757,7 +1762,7 @@ def _encode(
left_on=selection_l.names,
right_on=selection_r.names,
how="left",
).dropna(subset=["labels"])
).dropna(subset=[tmp_label_column])
for part in value.partitions
],
ignore_index=True,
Expand All @@ -1768,16 +1773,16 @@ def _encode(
np.full(
len(codes),
indistinct,
like=merged_df["labels"].values,
like=merged_df[tmp_label_column].values,
),
)
labels.iloc[merged_df["order"]] = merged_df["labels"]
labels.iloc[merged_df["order"]] = merged_df[tmp_label_column]
else:
labels = merged_df.sort_values("order")["labels"].reset_index(drop=True)
labels = merged_df.sort_values("order")[tmp_label_column].reset_index(drop=True)
else:
labels = codes.merge(
value, left_on=selection_l.names, right_on=selection_r.names, how="left"
).sort_values("order")["labels"]
).sort_values("order")[tmp_label_column]
labels.fillna(indistinct, inplace=True)
labels = labels.values
else:
Expand Down