# SPDX-FileCopyrightText: 2025 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: BSD-3-Clause
import typing
from dataclasses import dataclass
from math import isclose
from flowstrider import settings
from flowstrider.converters.dfd_to_dot_converter import wrap_text as wrap
from flowstrider.helpers.warnings import WarningsCounter
from flowstrider.models import dataflowdiagram, threat, threat_management
from flowstrider.rules import collections
[docs]
@dataclass
class ThreatGroup:
"""
One group holding its threats
Attributes:
name: the name of the group used for displaying
threats_by_source_and_severity:
all threats in this group, all grouped to the
corresponding combination of source and severity they belong to
"""
name: str
threats_by_source_and_severity: typing.Dict[
typing.Tuple[str, float], typing.List[threat.Threat]
]
[docs]
@dataclass
class ThreatsInfoContainer:
"""
Container for the sorted and grouped threats and some associated data.
Attributes:
info_strings:
Strings to be printed containing general information about the threats
threat_groups: A dictionary holding all the threat groups
sources_occurences: How often each single source occurs
"""
info_strings: typing.Dict[str, str]
threat_groups: typing.Dict[str, ThreatGroup]
sources_occurences: typing.Dict[str, int]
[docs]
def threats_apply_filters(
dfd: dataflowdiagram.DataflowDiagram,
threats: typing.List[threat.Threat],
threat_management_db: threat_management.ThreatManagementDatabase,
filters: typing.List[str],
) -> typing.Tuple[typing.List[threat.Threat], str]:
"""
Parse cmd filters and apply to list of threats
Args:
dfd: the dataflowdiagram in which the threats were generated
threats: the generated threats
threat_management_db: management database for generated threats
filters: list of filters to be applied to the threats
Returns:
Tuple with list of the threats that are left after the filters were applied
and a string containing the applied filters
"""
def print_warning(warning_key: str, filter: str, **string_substitutes: str):
_ = settings.lang_sys.gettext
warnings = {
"wrong_operator": _(
'no accepted operator in filter "{filter}". Accepted operators are:'
)
+ " [=, !=, <, >, <=, >=].",
"wrong_key_word": _(
'no accepted keyword in filter "{filter}". Accepted keywords are:'
+ "{accepted_keywords}."
),
"no_value": _('the value is missing in filter "{filter}".'),
"no_float": _(
'"{value}"'
+ " couldn't be parsed as a float in filter"
+ ' "{filter}".'
),
"sev_under_0": _('the severity in filter "{filter}" is below 0.'),
"no_rule_set_tag": _(
'the tag "{tag}" in filter "{filter}" is not a valid rule set tag. '
+ "Valid tags are: {valid_tags}."
),
"operator_not_permitted": _(
'the operator "{operator}" is not permitted for key_word "{key_word}" '
+ 'in filter "{filter}".'
),
"element_not_found": _(
'there is no element with id "{id}" in the dfd as defined in filter '
+ '"{filter}".'
),
"management_state_invalid": _(
'the management state "{state}" as defined in filter "{filter}"'
+ " doesn't exist. Valid management states are: {valid_states}."
),
}
print(
settings.C_WARNING
+ wrap(
_("Warning: ")
+ _(warnings[warning_key]).format(filter=filter, **string_substitutes)
)
+ settings.C_DEFAULT
)
WarningsCounter.add_warning()
applied_filters: typing.List[str] = []
threats_filtered = threats.copy()
for filter_normal in filters:
# Check filters for errors:
filter_normal = filter_normal.replace(" ", "")
filter = filter_normal
operator: str = ""
if "<=" in filter:
operator = "<="
elif ">=" in filter:
operator = ">="
elif "<" in filter:
operator = "<"
elif ">" in filter:
operator = ">"
elif "!=" in filter:
operator = "!="
elif "=" in filter:
operator = "="
else:
print_warning("wrong_operator", filter_normal)
continue
key_word: str = filter[: filter.find(operator)]
key_word = key_word.lower()
accepted_keywords = ["severity", "rule_set", "location", "management_state"]
if key_word not in accepted_keywords:
print_warning(
"wrong_key_word",
filter_normal,
accepted_keywords=" [" + ", ".join(accepted_keywords) + "].",
)
continue
check_value_str: str = filter[filter.find(operator) + len(operator) :]
if len(check_value_str) < 1:
print_warning("no_value", filter_normal)
continue
check_value_list: typing.List[str] = check_value_str.split(",")
# Apply filters:
# ########## Severity: ##################
if key_word == "severity":
check_value: float
try:
check_value = float(check_value_str)
except ValueError:
print_warning("no_float", filter_normal, value=check_value_str)
continue
if check_value < 0:
print_warning("sev_under_0", filter_normal)
if operator == "=":
for i in range(len(threats_filtered) - 1, -1, -1):
if not isclose(threats_filtered[i].severity, check_value):
del threats_filtered[i]
elif operator == "!=":
for i in range(len(threats_filtered) - 1, -1, -1):
if isclose(threats_filtered[i].severity, check_value):
del threats_filtered[i]
elif operator == "<":
for i in range(len(threats_filtered) - 1, -1, -1):
if not threats_filtered[i].severity < check_value:
del threats_filtered[i]
elif operator == ">":
for i in range(len(threats_filtered) - 1, -1, -1):
if not threats_filtered[i].severity > check_value:
del threats_filtered[i]
elif operator == "<=":
for i in range(len(threats_filtered) - 1, -1, -1):
if not threats_filtered[i].severity <= check_value:
del threats_filtered[i]
elif operator == ">=":
for i in range(len(threats_filtered) - 1, -1, -1):
if not threats_filtered[i].severity >= check_value:
del threats_filtered[i]
# ########## Rule set: ##################
elif key_word == "rule_set":
# Check if tag even exists in the rule sets
correct_rule_set_tags = []
for rule_set in collections.all_collections:
correct_rule_set_tags.append(rule_set.tag)
tags_correct: bool = True
for value in check_value_list:
if value not in correct_rule_set_tags:
print_warning(
"no_rule_set_tag",
filter_normal,
tag=value,
valid_tags=("[" + ", ".join(correct_rule_set_tags) + "]"),
)
tags_correct = False
if not tags_correct:
continue
if operator == "=":
for i in range(len(threats_filtered) - 1, -1, -1):
if threats_filtered[i].rule_set_tag not in check_value_list:
del threats_filtered[i]
elif operator == "!=":
for i in range(len(threats_filtered) - 1, -1, -1):
if threats_filtered[i].rule_set_tag in check_value_list:
del threats_filtered[i]
else:
print_warning(
"operator_not_permitted",
filter_normal,
operator=operator,
key_word=key_word,
)
continue
# ########## Location: ##################
elif key_word == "location":
# Check if locations even exist and give warning if not
elements_exist = True
for location_str in check_value_list:
element = dfd.get_element_by_id(location_str)
if element is None:
print_warning("element_not_found", filter_normal, id=location_str)
elements_exist = False
if not elements_exist:
continue
if operator == "=":
for i in range(len(threats_filtered) - 1, -1, -1):
location = threats_filtered[i].location
if not isinstance(location, str):
location = location.id
if location not in check_value_list:
del threats_filtered[i]
elif operator == "!=":
for i in range(len(threats_filtered) - 1, -1, -1):
location = threats_filtered[i].location
if not isinstance(location, str):
location = location.id
if location in check_value_list:
del threats_filtered[i]
else:
print_warning(
"operator_not_permitted",
filter_normal,
operator=operator,
key_word=key_word,
)
continue
# ########## Management state: ##########
elif key_word == "management_state":
# Check if management states exist and give warning if not
valid_states = [
state.name for state in threat_management.ThreatManagementState
]
states_exist = True
for state in check_value_list:
if state not in valid_states:
print_warning(
"management_state_invalid",
filter_normal,
state=state,
valid_states=str(valid_states),
)
states_exist = False
if not states_exist:
continue
if operator == "=":
for i in range(len(threats_filtered) - 1, -1, -1):
if (
threat_management_db.get(
threats_filtered[i], dfd
).management_state.name
not in check_value_list
):
del threats_filtered[i]
elif operator == "!=":
for i in range(len(threats_filtered) - 1, -1, -1):
if (
threat_management_db.get(
threats_filtered[i], dfd
).management_state.name
in check_value_list
):
del threats_filtered[i]
else:
print_warning(
"operator_not_permitted",
filter_normal,
operator=operator,
key_word=key_word,
)
continue
# Add filter to filter string only if it was applied
applied_filters.append('"' + filter_normal + '"')
applied_filters_str = ", ".join(applied_filters)
return (threats_filtered, applied_filters_str)
[docs]
def threats_sort(
dfd: dataflowdiagram.DataflowDiagram,
threats: typing.List[threat.Threat],
sort_criteria: str,
) -> typing.List[threat.Threat]:
"""
Parse cmd sorting criteria and sort the threats
Args:
dfd: the dataflowdiagram in which the threats were generated
threats: the generated threats
sort_criteria: the sorting criteria (divided by comma if multiple)
Returns:
List of all the threats in sorted order
"""
def print_warning(warning_key: str, **string_substitutes: str):
_ = settings.lang_sys.gettext
warnings = {
"invalid_criteria": _(
'the sorting criteria "{criteria}" is not valid. Accepted criteria are:'
+ " {accepted_criteria}."
)
}
print(
settings.C_WARNING
+ wrap(
_("Warning: ")
+ _(warnings[warning_key]).format(filter=filter, **string_substitutes)
)
+ settings.C_DEFAULT
)
WarningsCounter.add_warning()
# Parse sorting criteria
sort_criteria = sort_criteria.replace(" ", "")
criteria_list: typing.List[str] = sort_criteria.split(",")
# Don't change order of existing items!
accepted_criteria = [
"severity",
"r-severity",
"alphabetical_source",
"r-alphabetical_source",
"alphabetical_location",
"r-alphabetical_location",
]
sorting_lambdas = []
for criteria in criteria_list:
if criteria == accepted_criteria[0]: # severity
sorting_lambdas.append(lambda threat: threat.severity)
elif criteria == accepted_criteria[1]: # r-severity
sorting_lambdas.append(lambda threat: -threat.severity)
elif criteria == accepted_criteria[2]: # alphabetical_source
sorting_lambdas.append(lambda threat: threat.source)
elif criteria == accepted_criteria[3]: # r-alphabetical_source
sorting_lambdas.append(lambda threat: -ord(threat.source[0]))
elif criteria == accepted_criteria[4]: # alphabetical_location
sorting_lambdas.append(lambda threat: threat.location_str(dfd))
elif criteria == accepted_criteria[5]: # r-alphabetical_location
sorting_lambdas.append(lambda threat: -ord(threat.location_str(dfd)[0]))
else:
print_warning(
"invalid_criteria",
criteria=criteria,
accepted_criteria=str(accepted_criteria),
)
def sorting_lambda(threat):
return tuple(lambda_(threat) for lambda_ in sorting_lambdas)
# Sort threats
threats.sort(key=sorting_lambda)
return threats
[docs]
def threats_group(
dfd: dataflowdiagram.DataflowDiagram,
threats: typing.List[threat.Threat],
threat_management_db: threat_management.ThreatManagementDatabase,
grouping_criteria: str,
) -> typing.Tuple[typing.Dict[str, ThreatGroup], typing.Dict[str, int]]:
"""
Parse cmd grouping_criteria and group the threats
Args:
dfd: the dataflowdiagram in which the threats were generated
threats: the generated threats
threat_management_db: management database for generated threats
grouping_criteria: the criteria by which the threats will be grouped
Returns:
Dictionary containing each threat group with the name and a list of all threats
in that group and a dictionary containing a 0 for each source that appears only
once and a 1 for sources appearing more than once
"""
def print_warning(warning_key: str, **string_substitutes: str):
_ = settings.lang_sys.gettext
warnings = {
"invalid_criteria": _(
'the grouping criteria "{criteria}" is not valid. '
+ "Accepted criteria are:"
+ " {accepted_criteria}."
)
}
print(
settings.C_WARNING
+ wrap(
_("Warning: ")
+ _(warnings[warning_key]).format(filter=filter, **string_substitutes)
)
+ settings.C_DEFAULT
)
WarningsCounter.add_warning()
if grouping_criteria is None:
grouping_criteria = "none"
else:
# Parse grouping criteria
grouping_criteria = grouping_criteria.replace(" ", "")
# Don't change order of existing items!
accepted_criteria = [
"source",
"rule_set",
"location",
"management_state",
]
if grouping_criteria not in accepted_criteria and grouping_criteria != "none":
print_warning(
"invalid_criteria",
criteria=grouping_criteria,
accepted_criteria=str(accepted_criteria),
)
grouping_criteria = "none"
# Group threats by criteria
_ = settings.lang_out.gettext
grouped_threats: typing.Dict[str, typing.List[threat.Threat]] = {}
group_names: typing.Dict[str, str] = {}
for threat_ in threats:
# source
if grouping_criteria == accepted_criteria[0]:
if threat_.source_internal not in grouped_threats:
group_name = _("source") + ' "' + threat_.source + '"'
group_names[threat_.source_internal] = group_name
grouped_threats[threat_.source_internal] = []
grouped_threats[threat_.source_internal].append(threat_)
# rule_set
elif grouping_criteria == accepted_criteria[1]:
if threat_.rule_set_tag not in grouped_threats:
group_name = _("rule set") + ' "' + threat_.rule_set_name + '"'
group_names[threat_.rule_set_tag] = group_name
grouped_threats[threat_.rule_set_tag] = []
grouped_threats[threat_.rule_set_tag].append(threat_)
# location
elif grouping_criteria == accepted_criteria[2]:
location = (
threat_.location
if isinstance(threat_.location, str)
else threat_.location.id
)
if location not in grouped_threats:
group_name = _("location") + ' "' + threat_.location_str(dfd) + '"'
group_names[location] = group_name
grouped_threats[location] = []
grouped_threats[location].append(threat_)
# management_state
elif grouping_criteria == accepted_criteria[3]:
management_item = threat_management_db.get(threat_, dfd)
if management_item.management_state.name not in grouped_threats:
group_name = (
_("management state")
+ ' "'
+ management_item.management_state.name
+ '"'
)
group_names[management_item.management_state.name] = group_name
grouped_threats[management_item.management_state.name] = []
grouped_threats[management_item.management_state.name].append(threat_)
if grouping_criteria == "none":
group_name = ""
group_names[""] = group_name
grouped_threats[""] = threats.copy()
# Group all threats inside each group to their (source, severity) combinations
# ...(within their primary groups) and put them in a better threat_groups container
threat_groups: typing.Dict[str, ThreatGroup] = {}
threat_sources_occurences: typing.Dict[str, int] = {}
for i, (id, threat_group) in enumerate(grouped_threats.items()):
# Add a new group
threat_groups[id] = ThreatGroup(group_names[id], {})
for threat_ in threat_group:
source = threat_.source
severity = threat_.severity
if source not in threat_sources_occurences:
threat_sources_occurences[source] = 0
if (source, severity) not in threat_groups[
id
].threats_by_source_and_severity:
threat_groups[id].threats_by_source_and_severity[
(source, severity)
] = []
threat_sources_occurences[source] += 1
threat_groups[id].threats_by_source_and_severity[(source, severity)].append(
threat_
)
# Disregard the counter for sources who appear only once
for source, occurences in threat_sources_occurences.items():
if occurences == 1:
threat_sources_occurences[source] = 0
else:
threat_sources_occurences[source] = 1
return (threat_groups, threat_sources_occurences)