diff --git a/setools/permmap.py b/setools/permmap.py
index f9fdc498..31ca6363 100644
--- a/setools/permmap.py
+++ b/setools/permmap.py
@@ -7,7 +7,7 @@
from collections import OrderedDict
from contextlib import suppress
from dataclasses import dataclass
-from typing import cast, Dict, Iterable, Optional, Union
+from typing import cast, Dict, Final, Iterable, Optional, Union
import pkg_resources
@@ -16,9 +16,9 @@
from .mixins import TupleCompat
from .policyrep import AVRule, SELinuxPolicy, TERuletype
-INFOFLOW_DIRECTIONS = ("r", "w", "b", "n", "u")
+INFOFLOW_DIRECTIONS: Final = ("r", "w", "b", "n", "u")
+MIN_WEIGHT: Final[int] = 1
+MAX_WEIGHT: Final[int] = 10
@@ -95,6 +95,9 @@ class PermissionMap:
"""Permission Map for information flow analysis."""
def __init__(self, permmapfile: Optional[str] = None) -> None:
diff --git a/setoolsgui/apol.py b/setoolsgui/apol.py
index 1bb286f3..c4e929a1 100644
--- a/setoolsgui/apol.py
+++ b/setoolsgui/apol.py
@@ -22,7 +22,7 @@
# Supported analyses. These are not directly used here, but
# will init the tab registry in widgets.tab for apol's analyses.
-from .widgets import (mlsrulequery, rbacrulequery, terulequery)
+from .widgets import (infoflow, mlsrulequery, rbacrulequery, terulequery)
from typing import Dict, Final, Optional
diff --git a/setoolsgui/apol/infoflow.py b/setoolsgui/apol/infoflow.py
deleted file mode 100644
index fb9b4099..00000000
--- a/setoolsgui/apol/infoflow.py
+++ /dev/null
@@ -1,543 +0,0 @@
-# Copyright 2015, Tresys Technology, LLC
-# SPDX-License-Identifier: LGPL-2.1-only
-import logging
-import copy
-from collections import defaultdict
-from contextlib import suppress
-from PyQt5.QtCore import pyqtSignal, Qt, QStringListModel, QThread
-from PyQt5.QtGui import QPalette, QTextCursor
-from PyQt5.QtWidgets import QCompleter, QHeaderView, QMessageBox, QProgressDialog, \
- QTreeWidgetItem
-from setools import InfoFlowAnalysis
-from setools.exception import UnmappedClass, UnmappedPermission
-from ..logtosignal import LogHandlerToSignal
-from .analysistab import AnalysisSection, AnalysisTab
-from .exception import TabFieldError
-from .excludetypes import ExcludeTypes
-from .permmapedit import PermissionMapEditor
-from .workspace import load_checkboxes, load_spinboxes, load_lineedits, load_textedits, \
- save_checkboxes, save_spinboxes, save_lineedits, save_textedits
-class InfoFlowAnalysisTab(AnalysisTab):
- """An information flow analysis tab."""
- section = AnalysisSection.Analysis
- tab_title = "Information Flow Analysis"
- mlsonly = False
- @property
- def perm_map(self):
- return self.query.perm_map
- @perm_map.setter
- def perm_map(self, pmap):
- # copy permission map to keep enabled/disabled
- # settings private to this map.
- perm_map = copy.deepcopy(pmap)
- # transfer enabled/disabled settings from
- # current permission map, to the new map
- for classname in self.query.perm_map.classes():
- for mapping in self.query.perm_map.perms(classname):
- with suppress(UnmappedClass, UnmappedPermission):
- perm_map.mapping(classname, mapping.perm).enabled = mapping.enabled
- # apply updated permission map
- self.query.perm_map = perm_map
- def __init__(self, parent, policy, perm_map):
- super(InfoFlowAnalysisTab, self).__init__(parent)
- self.log = logging.getLogger(__name__)
- self.policy = policy
- self.query = InfoFlowAnalysis(policy, perm_map)
- self.query.source = None
- self.query.target = None
- self.setupUi()
- def __del__(self):
- with suppress(RuntimeError):
- self.thread.quit()
- self.thread.wait(5000)
- logging.getLogger("setools.infoflow").removeHandler(self.handler)
- def setupUi(self):
- self.log.debug("Initializing UI.")
- self.load_ui("apol/infoflow.ui")
- # set up error message for missing perm map
- self.error_msg = QMessageBox(self)
- self.error_msg.setStandardButtons(QMessageBox.Ok)
- # set up perm map editor
- self.permmap_editor = PermissionMapEditor(self, False)
- # set up source/target autocompletion
- type_completion_list = [str(t) for t in self.policy.types()]
- type_completer_model = QStringListModel(self)
- type_completer_model.setStringList(sorted(type_completion_list))
- self.type_completion = QCompleter()
- self.type_completion.setModel(type_completer_model)
- self.source.setCompleter(self.type_completion)
- self.target.setCompleter(self.type_completion)
- # setup indications of errors on source/target/default
- self.errors = set()
- self.orig_palette = self.source.palette()
- self.error_palette = self.source.palette()
- self.error_palette.setColor(QPalette.Base, Qt.red)
- self.clear_source_error()
- self.clear_target_error()
- # set up processing thread
- self.thread = ResultsUpdater(self.query)
- self.thread.raw_line.connect(self.raw_results.appendPlainText)
- self.thread.finished.connect(self.update_complete)
- self.thread.flows.connect(self.reset_browser)
- # set up browser thread
- self.browser_thread = BrowserUpdater(self.query)
- self.browser_thread.flows.connect(self.add_browser_children)
- # create a "busy, please wait" dialog
- self.busy = QProgressDialog(self)
- self.busy.setModal(True)
- self.busy.setRange(0, 0)
- self.busy.setMinimumDuration(0)
- self.busy.setCancelButton(None)
- self.busy.reset()
- # update busy dialog from infoflow INFO logs
- self.handler = LogHandlerToSignal()
- self.handler.message.connect(self.busy.setLabelText)
- logging.getLogger("setools.infoflow").addHandler(self.handler)
- # Ensure settings are consistent with the initial .ui state
- self.max_path_length.setEnabled(self.all_paths.isChecked())
- self.source.setEnabled(not self.flows_in.isChecked())
- self.target.setEnabled(not self.flows_out.isChecked())
- self.criteria_frame.setHidden(not self.criteria_expander.isChecked())
- self.notes.setHidden(not self.notes_expander.isChecked())
- self.browser_tab.setEnabled(self.flows_in.isChecked() or self.flows_out.isChecked())
- # connect signals
- self.buttonBox.clicked.connect(self.run)
- self.source.textEdited.connect(self.clear_source_error)
- self.source.editingFinished.connect(self.set_source)
- self.target.textEdited.connect(self.clear_target_error)
- self.target.editingFinished.connect(self.set_target)
- self.all_paths.toggled.connect(self.all_paths_toggled)
- self.flows_in.toggled.connect(self.flows_in_toggled)
- self.flows_out.toggled.connect(self.flows_out_toggled)
- self.min_perm_weight.valueChanged.connect(self.set_min_weight)
- self.exclude_types.clicked.connect(self.choose_excluded_types)
- self.edit_permmap.clicked.connect(self.open_permmap_editor)
- self.browser.currentItemChanged.connect(self.browser_item_selected)
- #
- # Analysis mode
- #
- def all_paths_toggled(self, value):
- self.clear_source_error()
- self.clear_target_error()
- self.max_path_length.setEnabled(value)
- def flows_in_toggled(self, value):
- self.clear_source_error()
- self.clear_target_error()
- self.source.setEnabled(not value)
- self.limit_paths.setEnabled(not value)
- self.browser_tab.setEnabled(value)
- def flows_out_toggled(self, value):
- self.clear_source_error()
- self.clear_target_error()
- self.target.setEnabled(not value)
- self.limit_paths.setEnabled(not value)
- self.browser_tab.setEnabled(value)
- #
- # Source criteria
- #
- def clear_source_error(self):
- self.clear_criteria_error(self.source, "The source type of the analysis.")
- def set_source(self):
- try:
- # look up the type here, so invalid types can be caught immediately
- text = self.source.text()
- if text:
- self.query.source = self.policy.lookup_type(text)
- else:
- self.query.source = None
- except Exception as ex:
- self.log.error("Source type error: {0}".format(str(ex)))
- self.set_criteria_error(self.source, ex)
- #
- # Target criteria
- #
- def clear_target_error(self):
- self.clear_criteria_error(self.target, "The target type of the analysis.")
- def set_target(self):
- try:
- # look up the type here, so invalid types can be caught immediately
- text = self.target.text()
- if text:
- self.query.target = self.policy.lookup_type(text)
- else:
- self.query.target = None
- except Exception as ex:
- self.log.error("Target type error: {0}".format(str(ex)))
- self.set_criteria_error(self.target, ex)
- #
- # Options
- #
- def set_min_weight(self, value):
- self.query.min_weight = value
- def choose_excluded_types(self):
- chooser = ExcludeTypes(self, self.policy)
- chooser.show()
- def open_permmap_editor(self):
- self.permmap_editor.show(self.perm_map)
- def apply_permmap(self, pmap):
- # used only by permission map editor
- self.query.perm_map = pmap
- #
- # Save/Load tab
- #
- def save(self):
- """Return a dictionary of settings."""
- if self.errors:
- raise TabFieldError("Field(s) are in error: {0}".
- format(" ".join(o.objectName() for o in self.errors)))
- settings = {}
- save_checkboxes(self, settings, ["criteria_expander", "notes_expander", "all_paths",
- "all_shortest_paths", "flows_in", "flows_out"])
- save_lineedits(self, settings, ["source", "target"])
- save_spinboxes(self, settings, ["max_path_length", "limit_paths", "min_perm_weight"])
- save_textedits(self, settings, ["notes"])
- settings["exclude"] = [str(t) for t in self.query.exclude]
- settings["exclude_perms"] = defaultdict(list)
- for mapping in self.perm_map:
- if not mapping.enabled:
- settings["exclude_perms"][mapping.class_].append(mapping.perm)
- return settings
- def load(self, settings):
- load_checkboxes(self, settings, ["criteria_expander", "notes_expander", "all_paths",
- "all_shortest_paths", "flows_in", "flows_out"])
- load_lineedits(self, settings, ["source", "target"])
- load_spinboxes(self, settings, ["max_path_length", "limit_paths", "min_perm_weight"])
- load_textedits(self, settings, ["notes"])
- try:
- self.query.exclude = settings["exclude"]
- except KeyError:
- self.log.warning("Excluded types criteria missing from settings file.")
- if "exclude_perms" not in settings:
- self.log.warning("Excluded permissions missing from settings file.")
- else:
- for mapping in self.perm_map:
- # iterate over the map so that any permission
- # not in the setting file's exclude list is enabled.
- try:
- mapping.enabled = mapping.perm not in settings["exclude_perms"][mapping.class_]
- except KeyError:
- mapping.enabled = True
- #
- # Infoflow browser
- #
- def _new_browser_item(self, type_, parent, rules=None, children=None):
- # build main item
- item = QTreeWidgetItem(parent if parent else self.browser)
- item.setText(0, str(type_))
- item.type_ = type_
- item.children = children if children else []
- item.rules = rules if rules else []
- item.child_populated = children is not None
- # add child items
- for child_type, child_rules in item.children:
- child_item = self._new_browser_item(child_type, item, rules=child_rules)
- item.addChild(child_item)
- item.setExpanded(children is not None)
- self.log.debug("Built item for {0} with {1} children and {2} rules".format(
- type_, len(item.children), len(item.rules)))
- return item
- def reset_browser(self, root_type, out, children):
- self.log.debug("Resetting browser.")
- # clear results
- self.browser.clear()
- self.browser_details.clear()
- # save browser details independent
- # from main analysis UI settings
- self.browser_root_type = root_type
- self.browser_mode = out
- root = self._new_browser_item(self.browser_root_type, self.browser, children=children)
- self.browser.insertTopLevelItem(0, root)
- def browser_item_selected(self, current, previous):
- if not current:
- # browser is being reset
- return
- self.log.debug("{0} selected in browser.".format(current.type_))
- self.browser_details.clear()
- try:
- parent_type = current.parent().type_
- except AttributeError:
- # should only hit his on the root item
- pass
- else:
- self.browser_details.appendPlainText("Information flows {0} {1} {2}\n".format(
- current.parent().type_, "->" if self.browser_mode else "<-", current.type_))
- for rule in current.rules:
- self.browser_details.appendPlainText(rule)
- self.browser_details.moveCursor(QTextCursor.Start)
- if not current.child_populated:
- self.busy.setLabelText("Gathering additional browser details for {0}...".format(
- current.type_))
- self.busy.show()
- self.browser_thread.out = self.browser_mode
- self.browser_thread.type_ = current.type_
- self.browser_thread.start()
- def add_browser_children(self, children):
- item = self.browser.currentItem()
- item.children = children
- self.log.debug("Adding children for {0}".format(item.type_))
- for child_type, child_rules in item.children:
- child_item = self._new_browser_item(child_type, item, rules=child_rules)
- item.addChild(child_item)
- item.child_populated = True
- self.busy.reset()
- #
- # Results runner
- #
- def run(self, button):
- # right now there is only one button.
- fail = False
- if self.source.isEnabled() and not self.query.source:
- self.set_criteria_error(self.source, "A source type is required")
- fail = True
- if self.target.isEnabled() and not self.query.target:
- self.set_criteria_error(self.target, "A target type is required.")
- fail = True
- if not self.perm_map:
- self.log.critical("A permission map is required to begin the analysis.")
- self.error_msg.critical(self,
- "No permission map available.",
- "Please load a permission map to begin the analysis.")
- fail = True
- if fail:
- return
- for mode in [self.all_paths, self.all_shortest_paths, self.flows_in, self.flows_out]:
- if mode.isChecked():
- break
- self.query.mode = mode.objectName()
- self.query.max_path_len = self.max_path_length.value()
- self.query.limit = self.limit_paths.value()
- # start processing
- self.busy.setLabelText("Processing query...")
- self.busy.show()
- self.raw_results.clear()
- self.thread.start()
- def update_complete(self):
- if not self.busy.wasCanceled():
- self.busy.setLabelText("Moving the raw result to top; GUI may be unresponsive")
- self.busy.repaint()
- self.raw_results.moveCursor(QTextCursor.Start)
- if self.flows_in.isChecked() or self.flows_out.isChecked():
- # move to browser tab for flows in/out
- self.results_frame.setCurrentIndex(1)
- else:
- self.results_frame.setCurrentIndex(0)
- self.busy.reset()
-class ResultsUpdater(QThread):
- """
- Thread for processing queries and updating result widgets.
- Parameters:
- query The query object
- model The model for the results
- Qt signals:
- raw_line A string to be appended to the raw results.
- flows (str, bool, list) Initial information for populating
- the flows browser.
- """
- raw_line = pyqtSignal(str)
- flows = pyqtSignal(str, bool, list)
- def __init__(self, query):
- super(ResultsUpdater, self).__init__()
- self.query = query
- self.log = logging.getLogger(__name__)
- def __del__(self):
- self.wait()
- def run(self):
- """Run the query and update results."""
- assert self.query.limit, "Code doesn't currently handle unlimited (limit=0) paths."
- self.out = self.query.mode == "flows_out"
- if self.query.mode == "all_paths":
- self.transitive(self.query.all_paths(self.query.source, self.query.target,
- self.query.max_path_len))
- elif self.query.mode == "all_shortest_paths":
- self.transitive(self.query.all_shortest_paths(self.query.source, self.query.target))
- elif self.query.mode == "flows_out":
- self.direct(self.query.infoflows(self.query.source, out=self.out))
- else: # flows_in
- self.direct(self.query.infoflows(self.query.target, out=self.out))
- def transitive(self, paths):
- pathnum = 0
- for pathnum, path in enumerate(paths, start=1):
- self.raw_line.emit("Flow {0}:".format(pathnum))
- for stepnum, step in enumerate(path, start=1):
- self.raw_line.emit(" Step {0}: {1} -> {2}".format(stepnum,
- step.source,
- step.target))
- for rule in sorted(step.rules):
- self.raw_line.emit(" {0}".format(rule))
- self.raw_line.emit("")
- if QThread.currentThread().isInterruptionRequested() or (pathnum >= self.query.limit):
- break
- else:
- QThread.yieldCurrentThread()
- self.raw_line.emit("")
- self.raw_line.emit("{0} information flow path(s) found.\n".format(pathnum))
- self.log.info("{0} information flow path(s) found.".format(pathnum))
- def direct(self, flows):
- flownum = 0
- child_types = []
- for flownum, flow in enumerate(flows, start=1):
- self.raw_line.emit("Flow {0}: {1} -> {2}".format(flownum, flow.source, flow.target))
- for rule in sorted(flow.rules):
- self.raw_line.emit(" {0}".format(rule))
- self.raw_line.emit("")
- # Generate results for flow browser
- if self.out:
- child_types.append((flow.target, sorted(str(r) for r in flow.rules)))
- else:
- child_types.append((flow.source, sorted(str(r) for r in flow.rules)))
- if QThread.currentThread().isInterruptionRequested():
- break
- else:
- QThread.yieldCurrentThread()
- self.raw_line.emit("{0} information flow(s) found.\n".format(flownum))
- self.log.info("{0} information flow(s) found.".format(flownum))
- # Update browser:
- root_type = self.query.source if self.out else self.query.target
- self.flows.emit(str(root_type), self.out, sorted(child_types))
-class BrowserUpdater(QThread):
- """
- Thread for processing additional analysis for the browser.
- Parameters:
- query The query object
- model The model for the results
- Qt signals:
- flows A list of child types to render in the
- infoflows browser.
- """
- flows = pyqtSignal(list)
- def __init__(self, query):
- super(BrowserUpdater, self).__init__()
- self.query = query
- self.type_ = None
- self.out = None
- self.log = logging.getLogger(__name__)
- def __del__(self):
- self.wait()
- def run(self):
- flownum = 0
- child_types = []
- for flownum, flow in enumerate(self.query.infoflows(self.type_, out=self.out), start=1):
- # Generate results for flow browser
- if self.out:
- child_types.append((flow.target, sorted(str(r) for r in flow.rules)))
- else:
- child_types.append((flow.source, sorted(str(r) for r in flow.rules)))
- if QThread.currentThread().isInterruptionRequested():
- break
- else:
- QThread.yieldCurrentThread()
- self.log.debug("{0} additional information flow(s) found.".format(flownum))
- # Update browser:
- self.flows.emit(sorted(child_types))
diff --git a/setoolsgui/apol/infoflow.ui b/setoolsgui/apol/infoflow.ui
deleted file mode 100644
index 45bd87e1..00000000
--- a/setoolsgui/apol/infoflow.ui
+++ /dev/null
@@ -1,658 +0,0 @@
For shortest path and all paths analyses, this + this is the source type of the analysis.
+ +For flows out analysis, the analysis will return the + information flows out of this type.
+ +This is not used for flows in analysis. + """) + + # + # Configure mode frame + # + modeframe = InfoFlowMode(self.query, parent=self) + modeframe.selectionChanged.connect(self._apply_mode_change) + + # + # Configure target type + # + dst = criteria.TypeOrAttrNameWidget("Target Type", + self.query, + SETTINGS_TARGET, + mode=criteria.TypeOrAttrNameMode.type_only, + enable_regex=False, + enable_indirect=False, + required=True, + parent=self.criteria_frame) + dst.setToolTip("The target type of the analysis.") + dst.setWhatsThis( + """ +
For shortest path and all paths analyses, this + this is the target type of the analysis.
+ +This is not used for flows out analysis. + +
For flows in analysis, the analysis will return the + information flows into this type.
+ """) + + # + # Configure options frame + # + optframe = InfoFlowOptions(self.query, parent=self.criteria_frame) + optframe.result_limit_changed.connect(self._apply_result_limit) + self._apply_result_limit() + + # + # Set up tree view + # + + # Disable source/target criteria based on info flow in/out + modeframe.criteria[setools.InfoFlowAnalysis.Mode.FlowsOut].toggled.connect(dst.setDisabled) + modeframe.criteria[setools.InfoFlowAnalysis.Mode.FlowsIn].toggled.connect(src.setDisabled) + + # + # Final setup + # + + # ensure the mode is properly reflected + self._apply_mode_change(self.query.mode) + + # Add widgets to layout + self.criteria_frame_layout.addWidget(src, 1, 0, 2, 1) + self.criteria_frame_layout.addWidget(modeframe, 0, 1, 2, 1) + self.criteria_frame_layout.addWidget(dst, 1, 2, 2, 1) + self.criteria_frame_layout.addWidget(optframe, 2, 1, 2, 1) + self.criteria_frame_layout.addWidget(self.buttonBox, 4, 0, 1, 3) + + # Save widget references + self.criteria = (src, dst, modeframe, optframe) + + # Set result table's model + # self.tree_results_model = models.MLSRuleTable(self.table_results) + + def _apply_mode_change(self, mode: setools.InfoFlowAnalysis.Mode) -> None: + """Reconfigure after an analysis mode change.""" + # Only enable tree browser for flows in/out mode. Set the correct + # renderer based on the mode. + self.log.debug(f"Handling mode change to {mode}.") + if mode in (setools.InfoFlowAnalysis.Mode.FlowsIn, setools.InfoFlowAnalysis.Mode.FlowsOut): + self.results.setTabEnabled(tab.GraphResultTabs.Tree, True) + self.worker.render = InfoFlowAnalysisTab.render_direct_path + else: + self.results.setTabEnabled(tab.GraphResultTabs.Tree, False) + self.worker.render = InfoFlowAnalysisTab.render_transitive_path + + def _apply_result_limit(self, value: int = DEFAULT_RESULT_LIMIT) -> None: + """Apply result limit change.""" + assert isinstance(self.query, setools.InfoFlowAnalysis) # type narrowing + self.log.debug(f"Setting result limit to {value} flows.") + self.worker.result_limit = value + + def handle_permmap_change(self, permmap: setools.PermissionMap) -> None: + self.log.debug(f"Applying updated permission map {permmap}") + self.query.perm_map = permmap + + @staticmethod + def render_direct_path(count: int, step: setools.InfoFlowStep) -> str: + """Render text representation of flows in/out results.""" + return f"Flow {count}: {step:full}\n" + + @staticmethod + def render_transitive_path(count: int, path: setools.InfoFlowPath) -> str: + """Render text representation of all/shortest paths results.""" + lines = [f"Flow {count}:"] + for stepnum, step in enumerate(path, start=1): + lines.append(f" Step {stepnum}: {step:full}\n") + return "\n".join(lines) + + +class InfoFlowMode(criteria.RadioEnumCriteria[setools.InfoFlowAnalysis.Mode]): + + """Information flow analysis mode radio buttons.""" + + def __init__(self, query: setools.InfoFlowAnalysis, + parent: typing.Optional[QtWidgets.QWidget] = None) -> None: + + # colspan 2 so the below spinbox fits better with the radio button. + super().__init__("Analysis Mode", query, SETTINGS_MODE, setools.InfoFlowAnalysis.Mode, + colspan=2, parent=parent) + + # Add all paths steps to mode widget. + self.all_path_steps = QtWidgets.QSpinBox(self) + self.all_path_steps.valueChanged.connect(self._apply_all_path_steps) + self.all_path_steps.setSuffix(" steps") + self.all_path_steps.setMinimum(MIN_ALL_PATHS_STEPS) + self.all_path_steps.setValue(DEFAULT_ALL_PATHS_STEPS) + + # get layout location of all paths option + all_path_index = self.top_layout.indexOf( + self.criteria[setools.InfoFlowAnalysis.Mode.AllPaths]) + row, col, _, _ = self.top_layout.getItemPosition(all_path_index) + # add steps spin box in the next column of the radio button + self.top_layout.addWidget(self.all_path_steps, row, col + 1, 1, 1) + + # set path steps to enable only if the corresponding mode is selected. + # it starts disabled since shortest paths is the default option. + self.all_path_steps.setEnabled(False) + self.criteria[setools.InfoFlowAnalysis.Mode.AllPaths].toggled.connect( + self.all_path_steps.setEnabled) + + def _apply_all_path_steps(self, value: int = DEFAULT_ALL_PATHS_STEPS) -> None: + """Apply the value of the all paths spinbox to the query.""" + assert isinstance(self.query, setools.InfoFlowAnalysis) # type narrowing + self.log.debug(f"All paths max steps to {value} steps.") + self.query.all_paths_max_steps = value + + def save(self, settings: typing.Dict) -> None: + super().save(settings) + settings[SETTINGS_ALL_PATHS_STEPS] = self.all_path_steps.value() + + def load(self, settings: typing.Dict) -> None: + super().load(settings) + with suppress(KeyError): + self.all_path_steps.setValue(settings[SETTINGS_ALL_PATHS_STEPS]) + + +class InfoFlowOptions(criteria.CriteriaWidget): + + """ + Infoflow analysis options widget. + + Presents the options: + * Minimum permission weight + * Limit number of results + * Exclude types button + * Exclude permissions/classes button + """ + + has_errors: typing.Final[bool] = False + result_limit_changed = QtCore.pyqtSignal(int) + + def __init__(self, query: setools.InfoFlowAnalysis, + parent: typing.Optional[QtWidgets.QWidget] = None) -> None: + + super().__init__("Options", query, "", parent=parent) + + self.top_layout = QtWidgets.QFormLayout(self) + self.top_layout.setLabelAlignment(QtCore.Qt.AlignmentFlag.AlignRight) + + self.min_weight = QtWidgets.QSpinBox(self) + self.min_weight.valueChanged.connect(self._apply_min_weight) + self.min_weight.setValue(DEFAULT_MIN_PERM_WT) + self.min_weight.setMinimum(MIN_MIN_PERM_WT) + self.min_weight.setMaximum(MAX_MIN_PERM_WT) + self.top_layout.addRow(QtWidgets.QLabel("Minimum permission weight:", self), + self.min_weight) + + self.result_limit = QtWidgets.QSpinBox(self) + self.result_limit.valueChanged.connect(self.result_limit_changed) + self.result_limit.setValue(DEFAULT_RESULT_LIMIT) + self.result_limit.setMinimum(MIN_RESULT_LIMIT) + self.top_layout.addRow(QtWidgets.QLabel("Limit results:", self), + self.result_limit) + + self.edit_excluded_types = QtWidgets.QPushButton("Edit...", self) + self.edit_excluded_types.clicked.connect(self._start_type_exclude) + self.top_layout.addRow(QtWidgets.QLabel("Excluded types:", self), + self.edit_excluded_types) + + self.edit_excluded_perms = QtWidgets.QPushButton("Edit...", self) + self.edit_excluded_perms.clicked.connect(self._start_permmap_exclude) + self.top_layout.addRow(QtWidgets.QLabel("Excluded permissions:", self), + self.edit_excluded_perms) + + def _apply_min_weight(self, value: int) -> None: + """Apply minimum perm weight to the query.""" + assert isinstance(self.query, setools.InfoFlowAnalysis) # type narrowing + self.log.debug(f"Setting min permission weight to {value}") + self.query.min_weight = value + + def _apply_permmap(self, new_map: setools.PermissionMap) -> None: + assert isinstance(self.query, setools.InfoFlowAnalysis) # type narrowing + self.log.debug("Applying updated permission map.") + self.query.perm_map = new_map + + def _start_permmap_exclude(self) -> None: + w = PermissionMapEditor(self.query.perm_map, edit=False, parent=self) + w.apply_permmap.connect(self._apply_permmap) + w.open() + + def _start_type_exclude(self) -> None: + ExcludeTypes(self.query, parent=self).open() + + def save(self, settings: typing.Dict) -> None: + settings[SETTINGS_MIN_WEIGHT] = self.min_weight.value() + settings[SETTINGS_RESULT_LIMIT] = self.result_limit.value() + settings[SETTINGS_EXCLUDE_TYPES] = [str(t) for t in self.query.exclude] + # TODO: permmap with enable/disable states + + def load(self, settings: typing.Dict) -> None: + assert isinstance(self.query, setools.InfoFlowAnalysis) # type narrowing + with suppress(KeyError): + self.min_weight.setValue(settings[SETTINGS_MIN_WEIGHT]) + + with suppress(KeyError): + self.result_limit.setValue(settings[SETTINGS_RESULT_LIMIT]) + + with suppress(KeyError): + self.query.exclude = settings[SETTINGS_EXCLUDE_TYPES] + + # TODO: perm map + + +if __name__ == '__main__': + import sys + import logging + import pprint + import warnings + + logging.basicConfig(level=logging.DEBUG, + format='%(asctime)s|%(levelname)s|%(name)s|%(message)s') + warnings.simplefilter("default") + + app = QtWidgets.QApplication(sys.argv) + mw = QtWidgets.QMainWindow() + widget = InfoFlowAnalysisTab(setools.SELinuxPolicy(), setools.PermissionMap(), parent=mw) + mw.setCentralWidget(widget) + mw.resize(widget.size()) + whatsthis = QtWidgets.QWhatsThis.createAction(mw) + mw.menuBar().addAction(whatsthis) + mw.setStatusBar(QtWidgets.QStatusBar(mw)) + mw.resize(1024, 768) + mw.show() + rc = app.exec_() + pprint.pprint(widget.save()) + sys.exit(rc)