Skip to content

Commit

Permalink
feat: Add timeout to the smt operations
Browse files Browse the repository at this point in the history
  • Loading branch information
GermanMT committed Nov 28, 2024
1 parent 84c0a79 commit c2c5b8f
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 45 deletions.
16 changes: 12 additions & 4 deletions flamapy/metamodels/smt_metamodel/operations/complete_config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from z3 import Int, sat, Optimize
from z3 import Int, sat, Optimize, unknown

from flamapy.core.operations import Operation
from flamapy.metamodels.smt_metamodel.models import PySMTModel
Expand All @@ -8,13 +8,14 @@
class CompleteConfig(Operation):
def __init__(self, config: dict[str, int]) -> None:
self.config: dict[str, int] = config
self.result: list[dict[str, float | int]] = []
self.result: list[dict[str, float | int]] | str = []

def get_result(self) -> list[dict[str, float | int]]:
def get_result(self) -> list[dict[str, float | int]] | str:
return self.result

def execute(self, model: PySMTModel) -> None:
solver = Optimize()
solver.set("timeout", 3000)
if model.func_obj_var is not None:
cvss_f = model.func_obj_var
solver.minimize(cvss_f)
Expand All @@ -24,5 +25,12 @@ def execute(self, model: PySMTModel) -> None:
while solver.check() == sat:
config = solver.model()
sanitized_config = config_sanitizer(config)
self.result.append(sanitized_config)
if isinstance(self.result, list):
self.result.append(sanitized_config)
break
if solver.check() == unknown:
self.result = (
"Execution timed out after 3 seconds. "
"The complexity of the model is too high, "
"try lowering the maximum level of the graph."
)
16 changes: 12 additions & 4 deletions flamapy/metamodels/smt_metamodel/operations/config_by_impact.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from z3 import sat, Optimize, Abs
from z3 import sat, Optimize, Abs, unknown

from flamapy.core.operations import Operation
from flamapy.metamodels.smt_metamodel.models import PySMTModel
Expand All @@ -8,13 +8,14 @@
class ConfigByImpact(Operation):
def __init__(self, impact: float) -> None:
self.impact: float = impact
self.result: list[dict[str, float | int]] = []
self.result: list[dict[str, float | int]] | str = []

def get_result(self) -> list[dict[str, float | int]]:
def get_result(self) -> list[dict[str, float | int]] | str:
return self.result

def execute(self, model: PySMTModel) -> None:
solver = Optimize()
solver.set("timeout", 3000)
if model.func_obj_var is not None:
cvss_f = model.func_obj_var
obj = Abs(cvss_f - self.impact)
Expand All @@ -23,5 +24,12 @@ def execute(self, model: PySMTModel) -> None:
while solver.check() == sat:
config = solver.model()
sanitized_config = config_sanitizer(config)
self.result.append(sanitized_config)
if isinstance(self.result, list):
self.result.append(sanitized_config)
break
if solver.check() == unknown:
self.result = (
"Execution timed out after 3 seconds. "
"The complexity of the model is too high, "
"try lowering the maximum level of the graph."
)
16 changes: 12 additions & 4 deletions flamapy/metamodels/smt_metamodel/operations/filter_configs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from z3 import And, Or, Solver, sat
from z3 import And, Or, Solver, sat, unknown

from flamapy.core.operations import Operation
from flamapy.metamodels.smt_metamodel.models import PySMTModel
Expand All @@ -10,9 +10,9 @@ def __init__(self, max_threshold: float, min_threshold: float, limit: int) -> No
self.max_threshold: float = max_threshold
self.min_threshold: float = min_threshold
self.limit: int = limit
self.result: list[dict[str, float | int]] = []
self.result: list[dict[str, float | int]] | str = []

def get_result(self) -> list[dict[str, float | int]]:
def get_result(self) -> list[dict[str, float | int]] | str:
return self.result

def execute(self, model: PySMTModel) -> None:
Expand All @@ -21,15 +21,23 @@ def execute(self, model: PySMTModel) -> None:
max_ctc = cvss_f <= self.max_threshold
min_ctc = cvss_f >= self.min_threshold
solver = Solver()
solver.set("timeout", 3000)
solver.add(And([model.domain, max_ctc, min_ctc]))
while len(self.result) < self.limit and solver.check() == sat:
config = solver.model()
sanitized_config = config_sanitizer(config)
self.result.append(sanitized_config)
if isinstance(self.result, list):
self.result.append(sanitized_config)
block = []
for var in config:
if str(var) != "/0":
variable = var()
if "CVSS" not in str(variable):
block.append(config[var] != variable)
solver.add(Or(block))
if solver.check() == unknown:
self.result = (
"Execution timed out after 3 seconds. "
"The complexity of the model is too high, "
"try lowering the maximum level of the graph."
)
16 changes: 12 additions & 4 deletions flamapy/metamodels/smt_metamodel/operations/maximize_impact.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from z3 import Or, Optimize, sat
from z3 import Or, Optimize, sat, unknown

from flamapy.core.operations import Operation
from flamapy.metamodels.smt_metamodel.models import PySMTModel
Expand All @@ -8,25 +8,33 @@
class MaximizeImpact(Operation):
def __init__(self, limit: int) -> None:
self.limit: int = limit
self.result: list[dict[str, float | int]] = []
self.result: list[dict[str, float | int]] | str = []

def get_result(self) -> list[dict[str, float | int]]:
def get_result(self) -> list[dict[str, float | int]] | str:
return self.result

def execute(self, model: PySMTModel) -> None:
solver = Optimize()
solver.set("timeout", 3000)
if model.func_obj_var is not None:
cvss_f = model.func_obj_var
solver.maximize(cvss_f)
solver.add(model.domain)
while len(self.result) < self.limit and solver.check() == sat:
config = solver.model()
sanitized_config = config_sanitizer(config)
self.result.append(sanitized_config)
if isinstance(self.result, list):
self.result.append(sanitized_config)
block = []
for var in config:
if str(var) != "/0":
variable = var()
if "CVSS" not in str(variable):
block.append(config[var] != variable)
solver.add(Or(block))
if solver.check() == unknown:
self.result = (
"Execution timed out after 3 seconds. "
"The complexity of the model is too high, "
"try lowering the maximum level of the graph."
)
16 changes: 12 additions & 4 deletions flamapy/metamodels/smt_metamodel/operations/minimize_impact.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from z3 import Or, Optimize, sat
from z3 import Or, Optimize, sat, unknown

from flamapy.core.operations import Operation
from flamapy.metamodels.smt_metamodel.models import PySMTModel
Expand All @@ -8,25 +8,33 @@
class MinimizeImpact(Operation):
def __init__(self, limit: int) -> None:
self.limit: int = limit
self.result: list[dict[str, float | int]] = []
self.result: list[dict[str, float | int]] | str = []

def get_result(self) -> list[dict[str, float | int]]:
def get_result(self) -> list[dict[str, float | int]] | str:
return self.result

def execute(self, model: PySMTModel) -> None:
solver = Optimize()
solver.set("timeout", 3000)
if model.func_obj_var is not None:
cvss_f = model.func_obj_var
solver.minimize(cvss_f)
solver.add(model.domain)
while len(self.result) < self.limit and solver.check() == sat:
config = solver.model()
sanitized_config = config_sanitizer(config)
self.result.append(sanitized_config)
if isinstance(self.result, list):
self.result.append(sanitized_config)
block = []
for var in config:
if str(var) != "/0":
variable = var()
if "CVSS" not in str(variable):
block.append(config[var] != variable)
solver.add(Or(block))
if solver.check() == unknown:
self.result = (
"Execution timed out after 3 seconds. "
"The complexity of the model is too high, "
"try lowering the maximum level of the graph."
)
16 changes: 12 additions & 4 deletions flamapy/metamodels/smt_metamodel/operations/number_of_products.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
from z3 import Or, Solver, sat
from z3 import Or, Solver, sat, unknown

from flamapy.core.operations import Operation
from flamapy.metamodels.smt_metamodel.models import PySMTModel


class NumberOfProducts(Operation):
def __init__(self) -> None:
self.result: int = 0
self.result: int | str = 0

def get_result(self) -> int:
def get_result(self) -> int | str:
return self.result

def execute(self, model: PySMTModel) -> None:
solver = Solver()
solver.set("timeout", 3000)
solver.add(model.domain)
while solver.check() == sat:
config = solver.model()
Expand All @@ -23,4 +24,11 @@ def execute(self, model: PySMTModel) -> None:
if "CVSS" not in str(variable):
block.append(config[var] != variable)
solver.add(Or(block))
self.result += 1
if isinstance(self.result, int):
self.result += 1
if solver.check() == unknown:
self.result = (
"Execution timed out after 3 seconds. "
"The complexity of the model is too high, "
"try lowering the maximum level of the graph."
)
13 changes: 10 additions & 3 deletions flamapy/metamodels/smt_metamodel/operations/valid_config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from z3 import Int, Solver, sat
from z3 import Int, Solver, sat, unknown

from flamapy.core.operations import Operation
from flamapy.metamodels.smt_metamodel.models import PySMTModel
Expand All @@ -7,14 +7,21 @@
class ValidConfig(Operation):
def __init__(self, config: dict[str, int]) -> None:
self.config: dict[str, int] = config
self.result: bool = True
self.result: bool | str = True

def get_result(self) -> bool:
def get_result(self) -> bool | str:
return self.result

def execute(self, model: PySMTModel) -> None:
solver = Solver()
solver.set("timeout", 3000)
solver.add(model.domain)
for package, count in self.config.items():
solver.add(Int(package) == count)
self.result = solver.check() == sat
if solver.check() == unknown:
self.result = (
"Execution timed out after 3 seconds. "
"The complexity of the model is too high, "
"try lowering the maximum level of the graph."
)
13 changes: 10 additions & 3 deletions flamapy/metamodels/smt_metamodel/operations/valid_model.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
from z3 import Solver, sat
from z3 import Solver, sat, unknown

from flamapy.core.operations import Operation
from flamapy.metamodels.smt_metamodel.models import PySMTModel


class ValidModel(Operation):
def __init__(self) -> None:
self.result: bool = True
self.result: bool | str = True

def get_result(self) -> bool:
def get_result(self) -> bool | str:
return self.result

def execute(self, model: PySMTModel) -> None:
solver = Solver()
solver.set("timeout", 3000)
solver.add(model.domain)
self.result = solver.check() == sat
if solver.check() == unknown:
self.result = (
"Execution timed out after 3 seconds. "
"The complexity of the model is too high, "
"try lowering the maximum level of the graph."
)
32 changes: 17 additions & 15 deletions flamapy/metamodels/smt_metamodel/transformations/graph_to_smt.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,23 @@ def convert(self, model_text: str) -> None:
)

def transform(self) -> str:
for direct_requires in self.source_data["requires"]["direct"]:
direct_version_counts = self.transform_direct_package(direct_requires)
if direct_version_counts:
rels_to_delete = []
for indirect_requires in self.source_data["requires"]["indirect"]:
if (
indirect_requires["parent_version_name"] == direct_requires["dependency"] and
indirect_requires["parent_count"] not in direct_version_counts
):
rels_to_delete.append(indirect_requires)
for _ in rels_to_delete:
self.source_data["requires"]["indirect"].remove(_)

for indirect_requires in self.source_data["requires"]["indirect"]:
self.transform_indirect_package(indirect_requires)
if "direct" in self.source_data["requires"]:
for direct_requires in self.source_data["requires"]["direct"]:
direct_version_counts = self.transform_direct_package(direct_requires)
if direct_version_counts:
rels_to_delete = []
if "indirect" in self.source_data["requires"]:
for indirect_requires in self.source_data["requires"]["indirect"]:
if (
indirect_requires["parent_version_name"] == direct_requires["dependency"] and
indirect_requires["parent_count"] not in direct_version_counts
):
rels_to_delete.append(indirect_requires)
for _ in rels_to_delete:
self.source_data["requires"]["indirect"].remove(_)
if "indirect" in self.source_data["requires"]:
for indirect_requires in self.source_data["requires"]["indirect"]:
self.transform_indirect_package(indirect_requires)
func_obj_name = f"func_obj_{self.source_data["name"]}"
file_risk_name = f"file_risk_{self.source_data["name"]}"
self.var_domain.add(
Expand Down

0 comments on commit c2c5b8f

Please sign in to comment.