Source code for flowstrider.models.threat_management
# SPDX-FileCopyrightText: 2025 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: BSD-3-Clause
import typing
from bisect import bisect_left
from dataclasses import dataclass, field
from enum import Enum, auto
from flowstrider import settings
from flowstrider.converters.dfd_to_dot_converter import wrap_text as wrap
from flowstrider.models import dataflowdiagram, threat
from ..helpers.warnings import WarningsCounter
[docs]
class ThreatManagementState(Enum):
# These imply future work
Undecided = auto()
Delegate = auto()
Mitigate = auto()
Avoid = auto()
# These are final
Accept = auto()
Delegated = auto()
Mitigated = auto()
def __format__(self, _):
return f"{self.name}"
[docs]
@dataclass(frozen=True)
class ThreatManagementItem:
uid: str = ""
management_state: ThreatManagementState = ThreatManagementState.Undecided
explanation: str = ""
ThreatManagementDict = typing.Dict[str, ThreatManagementItem]
[docs]
@dataclass
class ThreatManagementDatabase:
per_threat_information: ThreatManagementDict = field(default_factory=dict)
[docs]
def update(
self,
threats_in: typing.List[threat.Threat],
dfd: dataflowdiagram.DataflowDiagram,
):
# Sort threats by uid
threats = threats_in.copy()
threats.sort(key=lambda threat: threat.uid())
item_deletion_list = []
# Iterate over management file:
for key, management_item in list(self.per_threat_information.items()):
# Search for corresponding threat to the management item
index = bisect_left(
threats, management_item.uid, key=lambda threat: threat.uid()
)
if index < len(threats) and threats[index].uid() == management_item.uid:
# If threat exists for management item, remove threat from list
# ...(marks as done) and rename management item
self.per_threat_information[threats[index].display_id(dfd)] = (
self.per_threat_information[key]
)
if key != threats[index].display_id(dfd):
del self.per_threat_information[key]
del threats[index]
else:
# If no threat exists for management item (i.e. threat resolved or uid
# ...changed):
# Simple delete if the management item wasn't modified (undecided state)
if (
self.per_threat_information[key].management_state
== ThreatManagementState.Undecided
and len(self.per_threat_information[key].explanation) == 0
):
del self.per_threat_information[key]
# Add to list for deletion with warning otherwise
else:
item_deletion_list.append(key)
# Remove items for which no threat exists anymore and give warning
if len(item_deletion_list) > 0:
global _
_ = settings.lang_sys.gettext
print(
settings.C_WARNING
+ "\n"
+ wrap(
_("Warning: ")
+ settings.lang_sys.ngettext(
"the following non-empty threat management item has been"
+ " deleted because its corresponding threat doesn't exist"
+ " anymore:",
"the following non-empty threat management items have been"
+ " deleted because their corresponding threats don't exist"
+ " anymore:",
len(item_deletion_list),
)
)
)
for item in item_deletion_list:
WarningsCounter.add_warning()
print("\n" + wrap(item))
print(wrap("uid: " + self.per_threat_information[item].uid))
print(
wrap(
_("State: ")
+ "{state}".format(
state=self.per_threat_information[item].management_state
)
)
)
if self.per_threat_information[item].explanation:
print(
wrap(
_("Explanation: ")
+ self.per_threat_information[item].explanation
)
)
del self.per_threat_information[item]
print(settings.C_DEFAULT, end="")
# Add management items for all remaining threats (for which no existing
# ...management item was found)
for threat_ in threats:
new_item = ThreatManagementItem(uid=threat_.uid())
self.per_threat_information[threat_.display_id(dfd)] = new_item
[docs]
def get(self, threat_: threat.Threat, dfd: dataflowdiagram.DataflowDiagram):
return self.per_threat_information[threat_.display_id(dfd)]
[docs]
def should_fail(
self,
threats: typing.List[threat.Threat],
dfd: dataflowdiagram.DataflowDiagram,
level: str,
) -> typing.List[threat.Threat]:
return_value = list()
if level == "off":
return return_value
for threat_ in threats:
threat_management_item = self.get(threat_, dfd)
# Levels: "off", "undecided", "todo", "all"
match threat_management_item.management_state:
case ThreatManagementState.Undecided:
if level in ("undecided", "todo", "all"):
return_value.append(threat_)
case (
ThreatManagementState.Delegate
| ThreatManagementState.Mitigate
| ThreatManagementState.Avoid
):
if level in ("todo", "all"):
return_value.append(threat_)
case (
ThreatManagementState.Accept
| ThreatManagementState.Delegated
| ThreatManagementState.Mitigated
):
if level in ("all"):
return_value.append(threat_)
return return_value