Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#124: Better error messages in case of missing parameters #148

Merged
merged 7 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion doc/changes/changes_0.6.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ T.B.D

### Bug Fixes

- # 134: Fixed slc download still assuming existence of slc-parts
- # 134: Fixed slc download still assuming existence of slc-parts
- # 124: Fixed input validation in extract_unique_model_dataframes_from_batch

### Refactorings

Expand Down
33 changes: 26 additions & 7 deletions exasol_transformers_extension/udfs/models/base_model_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def run(self, ctx):
batch_df = ctx.get_dataframe(num_rows=self.batch_size, start_col=1)
if batch_df is None:
break

predictions_df = self.get_predictions_from_batch(batch_df)
ctx.emit(predictions_df)

Expand All @@ -66,8 +65,12 @@ def get_predictions_from_batch(self, batch_df: pd.DataFrame) -> pd.DataFrame:
:return: Prediction results of the corresponding batched dataframe
"""
result_df_list = []
for model_df in \
self.extract_unique_model_dataframes_from_batch(batch_df):

unique_model_dataframes = self.extract_unique_model_dataframes_from_batch(self, batch_df)
for model_df in unique_model_dataframes:
if "error_message" in model_df:
result_df_list.append(model_df)
continue
try:
self.check_cache(model_df)
except Exception as exc:
Expand Down Expand Up @@ -95,8 +98,7 @@ def get_prediction_from_unique_param_based_dataframes(self, model_df) \
:return: List of prediction results
"""
result_df_list = []
for param_based_model_df in \
self.extract_unique_param_based_dataframes(model_df):
for param_based_model_df in self.extract_unique_param_based_dataframes(model_df):
try:
result_df = self.get_prediction(param_based_model_df)
result_df_list.append(result_df)
Expand All @@ -105,11 +107,17 @@ def get_prediction_from_unique_param_based_dataframes(self, model_df) \
result_with_error_df = self.get_result_with_error(
param_based_model_df, stack_trace)
result_df_list.append(result_with_error_df)

return result_df_list

@staticmethod
def extract_unique_model_dataframes_from_batch(
def _check_values_not_null(model_name, bucketfs_conn, sub_dir):
if not (model_name and bucketfs_conn and sub_dir):
error_message = f"For each model model_name, bucketfs_conn and sub_dir need to be provided. " \
f"Found model_name = {model_name}, bucketfs_conn = {bucketfs_conn}, sub_dir = {sub_dir}."
raise ValueError(error_message)

@staticmethod
def extract_unique_model_dataframes_from_batch(self,
batch_df: pd.DataFrame) -> Iterator[pd.DataFrame]:
"""
Extract unique model dataframes with the same model_name, bucketfs_conn,
Expand All @@ -123,12 +131,23 @@ def extract_unique_model_dataframes_from_batch(

unique_values = dataframe_operations.get_unique_values(
batch_df, constants.ORDERED_COLUMNS, sort=True)

for model_name, bucketfs_conn, token_conn, sub_dir in unique_values:
try:
self._check_values_not_null(model_name, bucketfs_conn, sub_dir)
except ValueError:
stack_trace = traceback.format_exc()
result_with_error_df = self.get_result_with_error(
batch_df, stack_trace)
yield result_with_error_df
return

selections = (
(batch_df['model_name'] == model_name) &
(batch_df['bucketfs_conn'] == bucketfs_conn) &
(batch_df['sub_dir'] == sub_dir)
)

if token_conn is None:
selections = selections & pd.isnull(batch_df['token_conn'])
else:
Expand Down
Loading
Loading