Skip to content

Commit

Permalink
Merge pull request #70 from AutomatedProcessImprovement/update_rules
Browse files Browse the repository at this point in the history
Update rules
  • Loading branch information
SimpleSquirrelz authored Aug 10, 2024
2 parents 27091c4 + 7a5c5cd commit 3bfc460
Show file tree
Hide file tree
Showing 13 changed files with 460 additions and 180 deletions.
1 change: 1 addition & 0 deletions prosimos/all_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(self, global_attributes, case_attributes, event_attributes):
global_case_attribute_names = global_attribute_names.intersection(all_case_attribute_names)
global_event_attribute_names = global_attribute_names.intersection(all_event_attribute_names)

self.global_attribute_initial_values = global_attributes.get_values_calculated()
self.global_attributes = self._extract_global_attributes(global_attributes, global_case_attribute_names, global_event_attribute_names)

self.global_case_attributes, self.case_attributes = \
Expand Down
6 changes: 1 addition & 5 deletions prosimos/branch_condition_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
BranchConditionWithRule
)


class BranchConditionParser:
def __init__(self, json_data_with_branching_conditions):
self.data = json_data_with_branching_conditions
Expand All @@ -17,25 +18,20 @@ def parse(self):
or_rules_json = curr_branch_condition_rule["rules"]
branch_id = curr_branch_condition_rule["id"]

# print(f"OR RULES JSON: {str(or_rules_json)}")
or_rules: List[AndBranchConditionRule] = []
for or_rule in or_rules_json:
and_rules: List[BranchConditionRule] = []
# print(f"OR RULE: {str(or_rule)}")
for and_rule_json in or_rule:
# print(f"AND RULE JSON: {str(and_rule_json)}")
rule: BranchConditionRule = BranchConditionRule(
and_rule_json["attribute"],
and_rule_json["comparison"],
and_rule_json["value"],
)
and_rules.append(rule)
# print(f"AND RULES: {str(or_rule)}")
all_and_rules = AndBranchConditionRule(and_rules)
or_rules.append(all_and_rules)

all_or_rules = OrBranchConditionRule(or_rules)
# print(f"ALL OR RULES: {all_or_rules}\n\n")

branching_conditions.append(BranchConditionWithRule(all_or_rules, branch_id))

Expand Down
31 changes: 26 additions & 5 deletions prosimos/branch_condition_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ def _parse_value(self, value):
if self.condition == "in":
min_boundary = float(value[0])
max_boundary = sys.maxsize if value[1] == "inf" else float(value[1])

return min_boundary, max_boundary
else:
return value
Expand All @@ -35,12 +34,34 @@ def is_rule_true(self, all_case_values):
if self.attribute not in all_case_values:
return False

case_value = all_case_values[self.attribute]
raw_case_value = all_case_values[self.attribute]
if self.condition in [">", ">=", "<", "<=", "!="]:
try:
case_value = float(raw_case_value)
comparison_value = float(self.value)
except ValueError:
case_value = raw_case_value
comparison_value = self.value
else:
case_value = raw_case_value
comparison_value = self.value

if self.condition == "in":
evaluator = InOperatorEvaluator(self.value, case_value)
return evaluator.eval()
min_value, max_value = comparison_value
return min_value <= case_value <= max_value
elif self.condition == ">":
return case_value > comparison_value
elif self.condition == ">=":
return case_value >= comparison_value
elif self.condition == "<":
return case_value < comparison_value
elif self.condition == "<=":
return case_value <= comparison_value
elif self.condition == "!=":
return case_value != comparison_value
else:
return self.value == case_value
return case_value == comparison_value



class AndBranchConditionRule:
Expand Down
9 changes: 6 additions & 3 deletions prosimos/case_attributes.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,15 @@ def get_next_value(self):
return self.value.generate_sample(1)[0]

def validate(self):
epsilon = 1e-6

if self.case_atrr_type == CASE_ATTR_TYPE.DISCRETE:
actual_sum_probabilities = reduce(lambda acc, item: acc + item, self.value["probabilities"], 0)

if actual_sum_probabilities != 1:
raise InvalidCaseAttributeException(f"Case attribute ${self.name}: probabilities' sum should be equal to 1")

if not (1 - epsilon <= actual_sum_probabilities <= 1 + epsilon):
raise InvalidCaseAttributeException(
f"Case attribute {self.name}: probabilities' sum should be equal to 1")

return True


Expand Down
167 changes: 144 additions & 23 deletions prosimos/event_attributes.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
import pprint
from enum import Enum
from functools import reduce
from random import choices
from typing import Dict
import ast
import operator as op
import numpy as np

from pix_framework.statistics.distribution import DurationDistribution

from prosimos.exceptions import InvalidEventAttributeException
import math


class EVENT_ATTR_TYPE(Enum):
DISCRETE = "discrete"
CONTINUOUS = "continuous"
EXPRESSION = "expression"
DTREE = "dtree"


operators = {
Expand All @@ -24,18 +27,67 @@ class EVENT_ATTR_TYPE(Enum):
ast.FloorDiv: op.floordiv
}

def fix(mean):
return DurationDistribution(name="fix", mean=mean, var=0.0, std=0.0, minimum=mean, maximum=mean).generate_sample(1)[0]

def parse_discrete_value(value_info_arr):
prob_arr = []
options_arr = []
for item in value_info_arr:
options_arr.append(item["key"])
prob_arr.append(float(item["value"]))

return {
"options": options_arr,
"probabilities": prob_arr
}
def uniform(minimum, maximum):
return DurationDistribution(name="uniform", minimum=minimum, maximum=maximum).generate_sample(1)[0]


def norm(mean, std, minimum=None, maximum=None):
return DurationDistribution(name="norm", mean=mean, std=std, minimum=minimum, maximum=maximum).generate_sample(1)[0]


def triang(c, minimum, maximum):
return DurationDistribution(name="triang", mean=c, minimum=minimum, maximum=maximum).generate_sample(1)[0]


def expon(mean, minimum=None, maximum=None):
return DurationDistribution(name="expon", mean=mean, minimum=minimum, maximum=maximum).generate_sample(1)[0]


def lognorm(mean, var, minimum=None, maximum=None):
return DurationDistribution(name="lognorm", mean=mean, var=var, minimum=minimum, maximum=maximum).generate_sample(1)[0]


def gamma(mean, var, minimum=None, maximum=None):
return DurationDistribution(name="gamma", mean=mean, var=var, minimum=minimum, maximum=maximum).generate_sample(1)[0]


distributions = {
'fix': fix,
'uniform': uniform,
'norm': norm,
'triang': triang,
'expon': expon,
'lognorm': lognorm,
'gamma': gamma
}

math_functions = {name: getattr(math, name) for name in dir(math) if callable(getattr(math, name))}


def parse_discrete_value(value_info):
if isinstance(value_info, list):
prob_arr = []
options_arr = []
for item in value_info:
options_arr.append(item["key"])
prob_arr.append(float(item["value"]))

return {
"type": "discrete",
"options": options_arr,
"probabilities": prob_arr
}
elif isinstance(value_info, dict):
return {
"type": "markov",
"transitions": value_info
}
else:
raise ValueError("Unsupported value_info format for discrete value")


def parse_continuous_value(value_info) -> "DurationDistribution":
Expand All @@ -57,7 +109,8 @@ def _eval(node):
elif isinstance(node, ast.UnaryOp):
return operators[type(node.op)](_eval(node.operand))
elif isinstance(node, ast.Name):
return vars_dict[node.id]
if node.id in vars_dict:
return vars_dict[node.id]
elif isinstance(node, ast.Str):
return node.s
elif isinstance(node, ast.Compare):
Expand All @@ -67,14 +120,33 @@ def _eval(node):
return all(_eval(value) for value in node.values)
elif type(node.op) is ast.Or:
return any(_eval(value) for value in node.values)
elif isinstance(node, ast.Call):
func_name = node.func.id
args = [_eval(arg) for arg in node.args]
if func_name in distributions:
return distributions[func_name](*args)
elif func_name in math_functions:
args = [_eval(arg) for arg in node.args]
try:
return math_functions[func_name ](*args)
except OverflowError:
return np.finfo(np.float32).max
except ValueError as e:
return 0
else:
return None
return 0
except (SyntaxError, ZeroDivisionError, TypeError, KeyError):
return None
return 0

return _eval(tree.body)


def evaluate_dtree(dtree, vars_dict):
for conditions, formula in dtree:
if conditions is True or all(eval_expr(cond, vars_dict) for cond in conditions):
return eval_expr(formula, vars_dict)
return None

class EventAttribute:
def __init__(self, event_id, name, event_attr_type, value):
self.event_id: str = event_id
Expand All @@ -87,27 +159,76 @@ def __init__(self, event_id, name, event_attr_type, value):
self.value = parse_continuous_value(value)
elif self.event_attr_type == EVENT_ATTR_TYPE.EXPRESSION:
self.value = value
elif self.event_attr_type == EVENT_ATTR_TYPE.DTREE:
self.value = value
else:
raise Exception(f"Not supported event attribute {type}")

self.validate()

def get_next_value(self, all_attributes={}):
def get_next_value(self, all_attributes):
if self.event_attr_type == EVENT_ATTR_TYPE.DISCRETE:
one_choice_arr = choices(self.value["options"], self.value["probabilities"])
return one_choice_arr[0]
if self.value["type"] == "markov":
current_value = all_attributes.get(self.name, None)
next_state = self.get_next_markov_state(current_value)
if next_state is not None:
all_attributes[self.name] = next_state
return next_state
return current_value
else:
one_choice_arr = choices(self.value["options"], self.value["probabilities"])
return one_choice_arr[0]

elif self.event_attr_type == EVENT_ATTR_TYPE.EXPRESSION:
return eval_expr(self.value, all_attributes)
result = eval_expr(self.value, all_attributes)
if isinstance(result, (int, float, np.number)) and not isinstance(result, bool):
if result == 0: # Specifically handle zero without adjusting to tiny (in case of any errors in eval)
return 0
elif result == float('inf'):
return np.finfo(np.float32).max
elif result == -float('inf'):
return np.finfo(np.float32).min
elif abs(result) < np.finfo(np.float32).tiny:
return np.finfo(np.float32).tiny
elif abs(result) > np.finfo(np.float32).max:
return np.finfo(np.float32).max if result > 0 else np.finfo(np.float32).min
else:
return result
else:
return result
elif self.event_attr_type == EVENT_ATTR_TYPE.DTREE:
result = evaluate_dtree(self.value, all_attributes)
if result is not None:
return result
return 0
else:
return self.value.generate_sample(1)[0]

def get_next_markov_state(self, current_value):
transitions = self.value["transitions"]
if current_value in transitions:
current_transitions = transitions[current_value]
options, probabilities = zip(*current_transitions.items())
return choices(options, probabilities)[0]
return current_value

def validate(self):
if self.event_attr_type == EVENT_ATTR_TYPE.DISCRETE:
actual_sum_probabilities = reduce(lambda acc, item: acc + item, self.value["probabilities"], 0)
epsilon = 1e-6

if actual_sum_probabilities != 1:
raise InvalidEventAttributeException(
f"Event attribute ${self.name}: probabilities' sum should be equal to 1")
if self.event_attr_type == EVENT_ATTR_TYPE.DISCRETE:
if self.value["type"] == "discrete":
actual_sum_probabilities = sum(self.value["probabilities"])

if not (1 - epsilon <= actual_sum_probabilities <= 1 + epsilon):
raise InvalidEventAttributeException(
f"Event attribute {self.name}: probabilities' sum should be equal to 1")
elif self.value["type"] == "markov":
for state, transitions in self.value["transitions"].items():
actual_sum_probabilities = sum(transitions.values())
if not (1 - epsilon <= actual_sum_probabilities <= 1 + epsilon):
raise InvalidEventAttributeException(
f"Event attribute {self.name}, state {state}: "
f"probabilities' sum should be equal to 1")

return True

Expand Down
13 changes: 9 additions & 4 deletions prosimos/gateway_condition_choice.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
class GatewayConditionChoice:
def __init__(self, candidates_list, rules_list):
def __init__(self, candidates_list, rules_list, default_path=None):
self.candidates_list = candidates_list
self.rules_list = rules_list
self.default_path = default_path

def get_outgoing_flow(self, case_attributes):
return [candidate for candidate, rule in zip(self.candidates_list, self.rules_list)
if rule.is_true(case_attributes)]
def set_default(self, default_path):
self.default_path = default_path

def get_outgoing_flow(self, attributes):
return [candidate for candidate, rule in zip(self.candidates_list, self.rules_list)
if rule.is_true(attributes)]

def get_default_path(self):
return self.default_path
Loading

0 comments on commit 3bfc460

Please sign in to comment.