Skip to content

Commit

Permalink
[CodeBuild] fixed bug, added base_udf tests, dep updates
Browse files Browse the repository at this point in the history
  • Loading branch information
MarleneKress79789 committed Oct 27, 2023
1 parent 5a88b9d commit 1bbc4bf
Show file tree
Hide file tree
Showing 7 changed files with 940 additions and 601 deletions.
3 changes: 2 additions & 1 deletion doc/changes/changes_0.6.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ T.B.D

### Bug Fixes

- # 134: Fixed slc download still assuming existence of slc-parts
- # 134: Fixed slc download still assuming existence of slc-parts
- # 124: Fixed input validation in extract_unique_model_dataframes_from_batch

### Refactorings

Expand Down
33 changes: 26 additions & 7 deletions exasol_transformers_extension/udfs/models/base_model_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def run(self, ctx):
batch_df = ctx.get_dataframe(num_rows=self.batch_size, start_col=1)
if batch_df is None:
break

predictions_df = self.get_predictions_from_batch(batch_df)
ctx.emit(predictions_df)

Expand All @@ -66,8 +65,12 @@ def get_predictions_from_batch(self, batch_df: pd.DataFrame) -> pd.DataFrame:
:return: Prediction results of the corresponding batched dataframe
"""
result_df_list = []
for model_df in \
self.extract_unique_model_dataframes_from_batch(batch_df):

unique_model_dataframes = self.extract_unique_model_dataframes_from_batch(self, batch_df)
for model_df in unique_model_dataframes:
if "error_message" in model_df:
result_df_list.append(model_df)
continue
try:
self.check_cache(model_df)
except Exception as exc:
Expand Down Expand Up @@ -95,8 +98,7 @@ def get_prediction_from_unique_param_based_dataframes(self, model_df) \
:return: List of prediction results
"""
result_df_list = []
for param_based_model_df in \
self.extract_unique_param_based_dataframes(model_df):
for param_based_model_df in self.extract_unique_param_based_dataframes(model_df):
try:
result_df = self.get_prediction(param_based_model_df)
result_df_list.append(result_df)
Expand All @@ -105,11 +107,17 @@ def get_prediction_from_unique_param_based_dataframes(self, model_df) \
result_with_error_df = self.get_result_with_error(
param_based_model_df, stack_trace)
result_df_list.append(result_with_error_df)

return result_df_list

@staticmethod
def extract_unique_model_dataframes_from_batch(
def _check_values_not_null(model_name, bucketfs_conn, sub_dir):
if not (model_name and bucketfs_conn and sub_dir):
error_message = f"For each model model_name, bucketfs_conn and sub_dir need to be provided. " \
f"Found model_name = {model_name}, bucketfs_conn = {bucketfs_conn}, sub_dir = {sub_dir}"
raise ValueError(error_message)

@staticmethod
def extract_unique_model_dataframes_from_batch(self,
batch_df: pd.DataFrame) -> Iterator[pd.DataFrame]:
"""
Extract unique model dataframes with the same model_name, bucketfs_conn,
Expand All @@ -123,12 +131,23 @@ def extract_unique_model_dataframes_from_batch(

unique_values = dataframe_operations.get_unique_values(
batch_df, constants.ORDERED_COLUMNS, sort=True)

for model_name, bucketfs_conn, token_conn, sub_dir in unique_values:
try:
self._check_values_not_null(model_name, bucketfs_conn, sub_dir)
except ValueError:
stack_trace = traceback.format_exc()
result_with_error_df = self.get_result_with_error(
batch_df, stack_trace)
yield result_with_error_df
return

selections = (
(batch_df['model_name'] == model_name) &
(batch_df['bucketfs_conn'] == bucketfs_conn) &
(batch_df['sub_dir'] == sub_dir)
)

if token_conn is None:
selections = selections & pd.isnull(batch_df['token_conn'])
else:
Expand Down
Loading

0 comments on commit 1bbc4bf

Please sign in to comment.