Source code for flowstrider.converters.threats_formatter

# SPDX-FileCopyrightText: 2025 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: BSD-3-Clause

import typing
from dataclasses import dataclass
from math import isclose

from flowstrider import settings
from flowstrider.converters.dfd_to_dot_converter import wrap_text as wrap
from flowstrider.helpers.warnings import WarningsCounter
from flowstrider.models import dataflowdiagram, threat, threat_management
from flowstrider.rules import collections


[docs] @dataclass class ThreatGroup: """ One group holding its threats Attributes: name: the name of the group used for displaying threats_by_source_and_severity: all threats in this group, all grouped to the corresponding combination of source and severity they belong to """ name: str threats_by_source_and_severity: typing.Dict[ typing.Tuple[str, float], typing.List[threat.Threat] ]
[docs] @dataclass class ThreatsInfoContainer: """ Container for the sorted and grouped threats and some associated data. Attributes: info_strings: Strings to be printed containing general information about the threats threat_groups: A dictionary holding all the threat groups sources_occurences: How often each single source occurs """ info_strings: typing.Dict[str, str] threat_groups: typing.Dict[str, ThreatGroup] sources_occurences: typing.Dict[str, int]
[docs] def format_threats( dfd: dataflowdiagram.DataflowDiagram, threats: typing.List[threat.Threat], threat_management_db: threat_management.ThreatManagementDatabase, filters: typing.List[str], sort: str, group: str, ) -> ThreatsInfoContainer: """ This function gets the raw unordered threats and sorts, orders and filters them. Some associated data is also being calculated. The results are used by the cmd output and the PDF generation. Args: dfd: the dataflowdiagram from which the threats were generated threats: generated threats threat_management_db: management data for the generated threats filters: list of filters that are used to filter the threats sort: by which parameter the threats are going to be sorted group: by which parameter the threats are going to be grouped Returns: Container holding the filtered, sorted and grouped threats as well as strings with information about the threats that are going to be used for the console and the PDF report """ _ = settings.lang_out.gettext result_container = ThreatsInfoContainer({}, {}, {}) # Calculate some statistics before filtering threats_involved_sources: typing.Set[str] = set() threats_involved_locations: typing.Set[str] = set() threats_by_rule_set_counts: typing.Dict[str, int] = {} for threat_ in threats: source = threat_.source location = threat.location_str(threat_.location, dfd) if source not in threats_involved_sources: threats_involved_sources.add(source) if location not in threats_involved_locations: threats_involved_locations.add(location) if threat_.rule_set_name not in threats_by_rule_set_counts: threats_by_rule_set_counts[threat_.rule_set_name] = 0 threats_by_rule_set_counts[threat_.rule_set_name] += 1 # Apply filters threats_filtered, applied_filters = threats_apply_filters( dfd, threats, threat_management_db, filters ) # Apply sorting threats_filtered = threats_sort(dfd, threats_filtered, sort) # Apply grouping (threat_groups, threat_sources_occurences) = threats_group( dfd, threats_filtered, threat_management_db, group ) # Repeat statistic calculations after applying filters threats_involved_sources_filtered: typing.Set[str] = set() threats_involved_locations_filtered: typing.Set[str] = set() threats_by_rule_set_counts_filtered: typing.Dict[str, int] = {} for threat_ in threats_filtered: source = threat_.source location = threat.location_str(threat_.location, dfd) if source not in threats_involved_sources_filtered: threats_involved_sources_filtered.add(source) if location not in threats_involved_locations_filtered: threats_involved_locations_filtered.add(location) if threat_.rule_set_name not in threats_by_rule_set_counts_filtered: threats_by_rule_set_counts_filtered[threat_.rule_set_name] = 0 threats_by_rule_set_counts_filtered[threat_.rule_set_name] += 1 # Generate output strings with the calculated information if len(threats) == 0: result_container.info_strings["no_threats"] = _("There were no threats found.") else: # Applied filters if applicable: if len(applied_filters) == 0: result_container.info_strings["filters"] = "" else: result_container.info_strings["filters"] = ( _("Applied output filters:") + " " + applied_filters ) # General threat numbers: result_container.info_strings["threat_numbers"] = ( settings.lang_out.ngettext( "One threat has been elicited.", "{count} threats have been elicited.", len(threats), ).format(count=len(threats)) + " (" + settings.lang_out.ngettext( "One threat source", "{count} different threat sources", len(threats_involved_sources), ).format(count=len(threats_involved_sources)) + " " + settings.lang_out.ngettext( "and one involved location", "and a total of {count} involved locations", len(threats_involved_locations), ).format(count=len(threats_involved_locations)) + ".)" ) # General threat numbers after filters: if len(applied_filters) == 0: result_container.info_strings["threat_numbers_after_filters"] = "" else: result_container.info_strings["threat_numbers_after_filters"] = ( settings.lang_out.ngettext( "One threat is being displayed.", "{count} threats are being displayed.", len(threats_filtered), ).format(count=len(threats_filtered)) + " (" + settings.lang_out.ngettext( "One displayed threat source", "{count} different displayed threat sources", len(threats_involved_sources_filtered), ).format(count=len(threats_involved_sources_filtered)) + " " + settings.lang_out.ngettext( "and one involved location", "and {count} involved locations", len(threats_involved_locations_filtered), ).format(count=len(threats_involved_locations_filtered)) + ".)" ) result_container.info_strings["collections_header"] = _( "Used rule collections:" ) collection_count = 0 for collection in collections.all_collections: if collection.tag in dfd.tags: item: str = ( collection.name + " " + _("rule collection") + " (" + str(threats_by_rule_set_counts.get(collection.name, 0)) + " " + settings.lang_out.ngettext( "elicited threat", "elicited threats", threats_by_rule_set_counts.get(collection.name, 0), ) ) if len(applied_filters) == 0: item = item + ")." else: item = ( item + ", " + str( threats_by_rule_set_counts_filtered.get(collection.name, 0) ) + " " + settings.lang_out.ngettext( "displayed threat", "displayed threats", threats_by_rule_set_counts_filtered.get(collection.name, 0), ) + ")." ) result_container.info_strings["collection" + str(collection_count)] = ( item ) collection_count += 1 # Add sorted threat data to container result_container.threat_groups = threat_groups result_container.sources_occurences = threat_sources_occurences return result_container
[docs] def threats_apply_filters( dfd: dataflowdiagram.DataflowDiagram, threats: typing.List[threat.Threat], threat_management_db: threat_management.ThreatManagementDatabase, filters: typing.List[str], ) -> typing.Tuple[typing.List[threat.Threat], str]: """ Parse cmd filters and apply to list of threats Args: dfd: the dataflowdiagram in which the threats were generated threats: the generated threats threat_management_db: management database for generated threats filters: list of filters to be applied to the threats Returns: Tuple with list of the threats that are left after the filters were applied and a string containing the applied filters """ def print_warning(warning_key: str, filter: str, **string_substitutes: str): _ = settings.lang_sys.gettext warnings = { "wrong_operator": _( 'no accepted operator in filter "{filter}". Accepted operators are:' ) + " [=, !=, <, >, <=, >=].", "wrong_key_word": _( 'no accepted keyword in filter "{filter}". Accepted keywords are:' + "{accepted_keywords}." ), "no_value": _('the value is missing in filter "{filter}".'), "no_float": _( '"{value}"' + " couldn't be parsed as a float in filter" + ' "{filter}".' ), "sev_under_0": _('the severity in filter "{filter}" is below 0.'), "no_rule_set_tag": _( 'the tag "{tag}" in filter "{filter}" is not a valid rule set tag. ' + "Valid tags are: {valid_tags}." ), "operator_not_permitted": _( 'the operator "{operator}" is not permitted for key_word "{key_word}" ' + 'in filter "{filter}".' ), "element_not_found": _( 'there is no element with id "{id}" in the dfd as defined in filter ' + '"{filter}".' ), "management_state_invalid": _( 'the management state "{state}" as defined in filter "{filter}"' + " doesn't exist. Valid management states are: {valid_states}." ), } print( settings.C_WARNING + wrap( _("Warning: ") + _(warnings[warning_key]).format(filter=filter, **string_substitutes) ) + settings.C_DEFAULT ) WarningsCounter.add_warning() applied_filters: typing.List[str] = [] threats_filtered = threats.copy() for filter_normal in filters: # Check filters for errors: filter_normal = filter_normal.replace(" ", "") filter = filter_normal operator: str = "" if "<=" in filter: operator = "<=" elif ">=" in filter: operator = ">=" elif "<" in filter: operator = "<" elif ">" in filter: operator = ">" elif "!=" in filter: operator = "!=" elif "=" in filter: operator = "=" else: print_warning("wrong_operator", filter_normal) continue key_word: str = filter[: filter.find(operator)] key_word = key_word.lower() accepted_keywords = ["severity", "rule_set", "location", "management_state"] if key_word not in accepted_keywords: print_warning( "wrong_key_word", filter_normal, accepted_keywords=" [" + ", ".join(accepted_keywords) + "].", ) continue check_value_str: str = filter[filter.find(operator) + len(operator) :] if len(check_value_str) < 1: print_warning("no_value", filter_normal) continue check_value_list: typing.List[str] = check_value_str.split(",") # Apply filters: # ########## Severity: ################## if key_word == "severity": check_value: float try: check_value = float(check_value_str) except ValueError: print_warning("no_float", filter_normal, value=check_value_str) continue if check_value < 0: print_warning("sev_under_0", filter_normal) if operator == "=": for i in range(len(threats_filtered) - 1, -1, -1): if not isclose(threats_filtered[i].severity, check_value): del threats_filtered[i] elif operator == "!=": for i in range(len(threats_filtered) - 1, -1, -1): if isclose(threats_filtered[i].severity, check_value): del threats_filtered[i] elif operator == "<": for i in range(len(threats_filtered) - 1, -1, -1): if not threats_filtered[i].severity < check_value: del threats_filtered[i] elif operator == ">": for i in range(len(threats_filtered) - 1, -1, -1): if not threats_filtered[i].severity > check_value: del threats_filtered[i] elif operator == "<=": for i in range(len(threats_filtered) - 1, -1, -1): if not threats_filtered[i].severity <= check_value: del threats_filtered[i] elif operator == ">=": for i in range(len(threats_filtered) - 1, -1, -1): if not threats_filtered[i].severity >= check_value: del threats_filtered[i] # ########## Rule set: ################## elif key_word == "rule_set": # Check if tag even exists in the rule sets correct_rule_set_tags = [] for rule_set in collections.all_collections: correct_rule_set_tags.append(rule_set.tag) tags_correct: bool = True for value in check_value_list: if value not in correct_rule_set_tags: print_warning( "no_rule_set_tag", filter_normal, tag=value, valid_tags=("[" + ", ".join(correct_rule_set_tags) + "]"), ) tags_correct = False if not tags_correct: continue if operator == "=": for i in range(len(threats_filtered) - 1, -1, -1): if threats_filtered[i].rule_set_tag not in check_value_list: del threats_filtered[i] elif operator == "!=": for i in range(len(threats_filtered) - 1, -1, -1): if threats_filtered[i].rule_set_tag in check_value_list: del threats_filtered[i] else: print_warning( "operator_not_permitted", filter_normal, operator=operator, key_word=key_word, ) continue # ########## Location: ################## elif key_word == "location": # Check if locations even exist and give warning if not elements_exist = True for location_str in check_value_list: element = dfd.get_element_by_id(location_str) if element is None: print_warning("element_not_found", filter_normal, id=location_str) elements_exist = False if not elements_exist: continue if operator == "=": for i in range(len(threats_filtered) - 1, -1, -1): location = threats_filtered[i].location if not isinstance(location, str): location = location.id if location not in check_value_list: del threats_filtered[i] elif operator == "!=": for i in range(len(threats_filtered) - 1, -1, -1): location = threats_filtered[i].location if not isinstance(location, str): location = location.id if location in check_value_list: del threats_filtered[i] else: print_warning( "operator_not_permitted", filter_normal, operator=operator, key_word=key_word, ) continue # ########## Management state: ########## elif key_word == "management_state": # Check if management states exist and give warning if not valid_states = [ state.name for state in threat_management.ThreatManagementState ] states_exist = True for state in check_value_list: if state not in valid_states: print_warning( "management_state_invalid", filter_normal, state=state, valid_states=str(valid_states), ) states_exist = False if not states_exist: continue if operator == "=": for i in range(len(threats_filtered) - 1, -1, -1): if ( threat_management_db.get( threats_filtered[i], dfd ).management_state.name not in check_value_list ): del threats_filtered[i] elif operator == "!=": for i in range(len(threats_filtered) - 1, -1, -1): if ( threat_management_db.get( threats_filtered[i], dfd ).management_state.name in check_value_list ): del threats_filtered[i] else: print_warning( "operator_not_permitted", filter_normal, operator=operator, key_word=key_word, ) continue # Add filter to filter string only if it was applied applied_filters.append('"' + filter_normal + '"') applied_filters_str = ", ".join(applied_filters) return (threats_filtered, applied_filters_str)
[docs] def threats_sort( dfd: dataflowdiagram.DataflowDiagram, threats: typing.List[threat.Threat], sort_criteria: str, ) -> typing.List[threat.Threat]: """ Parse cmd sorting criteria and sort the threats Args: dfd: the dataflowdiagram in which the threats were generated threats: the generated threats sort_criteria: the sorting criteria (divided by comma if multiple) Returns: List of all the threats in sorted order """ def print_warning(warning_key: str, **string_substitutes: str): _ = settings.lang_sys.gettext warnings = { "invalid_criteria": _( 'the sorting criteria "{criteria}" is not valid. Accepted criteria are:' + " {accepted_criteria}." ) } print( settings.C_WARNING + wrap( _("Warning: ") + _(warnings[warning_key]).format(filter=filter, **string_substitutes) ) + settings.C_DEFAULT ) WarningsCounter.add_warning() # Parse sorting criteria sort_criteria = sort_criteria.replace(" ", "") criteria_list: typing.List[str] = sort_criteria.split(",") # Don't change order of existing items! accepted_criteria = [ "severity", "r-severity", "alphabetical_source", "r-alphabetical_source", "alphabetical_location", "r-alphabetical_location", ] sorting_lambdas = [] for criteria in criteria_list: if criteria == accepted_criteria[0]: # severity sorting_lambdas.append(lambda threat: threat.severity) elif criteria == accepted_criteria[1]: # r-severity sorting_lambdas.append(lambda threat: -threat.severity) elif criteria == accepted_criteria[2]: # alphabetical_source sorting_lambdas.append(lambda threat: threat.source) elif criteria == accepted_criteria[3]: # r-alphabetical_source sorting_lambdas.append(lambda threat: -ord(threat.source[0])) elif criteria == accepted_criteria[4]: # alphabetical_location sorting_lambdas.append(lambda threat: threat.location_str(dfd)) elif criteria == accepted_criteria[5]: # r-alphabetical_location sorting_lambdas.append(lambda threat: -ord(threat.location_str(dfd)[0])) else: print_warning( "invalid_criteria", criteria=criteria, accepted_criteria=str(accepted_criteria), ) def sorting_lambda(threat): return tuple(lambda_(threat) for lambda_ in sorting_lambdas) # Sort threats threats.sort(key=sorting_lambda) return threats
[docs] def threats_group( dfd: dataflowdiagram.DataflowDiagram, threats: typing.List[threat.Threat], threat_management_db: threat_management.ThreatManagementDatabase, grouping_criteria: str, ) -> typing.Tuple[typing.Dict[str, ThreatGroup], typing.Dict[str, int]]: """ Parse cmd grouping_criteria and group the threats Args: dfd: the dataflowdiagram in which the threats were generated threats: the generated threats threat_management_db: management database for generated threats grouping_criteria: the criteria by which the threats will be grouped Returns: Dictionary containing each threat group with the name and a list of all threats in that group and a dictionary containing a 0 for each source that appears only once and a 1 for sources appearing more than once """ def print_warning(warning_key: str, **string_substitutes: str): _ = settings.lang_sys.gettext warnings = { "invalid_criteria": _( 'the grouping criteria "{criteria}" is not valid. ' + "Accepted criteria are:" + " {accepted_criteria}." ) } print( settings.C_WARNING + wrap( _("Warning: ") + _(warnings[warning_key]).format(filter=filter, **string_substitutes) ) + settings.C_DEFAULT ) WarningsCounter.add_warning() if grouping_criteria is None: grouping_criteria = "none" else: # Parse grouping criteria grouping_criteria = grouping_criteria.replace(" ", "") # Don't change order of existing items! accepted_criteria = [ "source", "rule_set", "location", "management_state", ] if grouping_criteria not in accepted_criteria and grouping_criteria != "none": print_warning( "invalid_criteria", criteria=grouping_criteria, accepted_criteria=str(accepted_criteria), ) grouping_criteria = "none" # Group threats by criteria _ = settings.lang_out.gettext grouped_threats: typing.Dict[str, typing.List[threat.Threat]] = {} group_names: typing.Dict[str, str] = {} for threat_ in threats: # source if grouping_criteria == accepted_criteria[0]: if threat_.source_internal not in grouped_threats: group_name = _("source") + ' "' + threat_.source + '"' group_names[threat_.source_internal] = group_name grouped_threats[threat_.source_internal] = [] grouped_threats[threat_.source_internal].append(threat_) # rule_set elif grouping_criteria == accepted_criteria[1]: if threat_.rule_set_tag not in grouped_threats: group_name = _("rule set") + ' "' + threat_.rule_set_name + '"' group_names[threat_.rule_set_tag] = group_name grouped_threats[threat_.rule_set_tag] = [] grouped_threats[threat_.rule_set_tag].append(threat_) # location elif grouping_criteria == accepted_criteria[2]: location = ( threat_.location if isinstance(threat_.location, str) else threat_.location.id ) if location not in grouped_threats: group_name = _("location") + ' "' + threat_.location_str(dfd) + '"' group_names[location] = group_name grouped_threats[location] = [] grouped_threats[location].append(threat_) # management_state elif grouping_criteria == accepted_criteria[3]: management_item = threat_management_db.get(threat_, dfd) if management_item.management_state.name not in grouped_threats: group_name = ( _("management state") + ' "' + management_item.management_state.name + '"' ) group_names[management_item.management_state.name] = group_name grouped_threats[management_item.management_state.name] = [] grouped_threats[management_item.management_state.name].append(threat_) if grouping_criteria == "none": group_name = "" group_names[""] = group_name grouped_threats[""] = threats.copy() # Group all threats inside each group to their (source, severity) combinations # ...(within their primary groups) and put them in a better threat_groups container threat_groups: typing.Dict[str, ThreatGroup] = {} threat_sources_occurences: typing.Dict[str, int] = {} for i, (id, threat_group) in enumerate(grouped_threats.items()): # Add a new group threat_groups[id] = ThreatGroup(group_names[id], {}) for threat_ in threat_group: source = threat_.source severity = threat_.severity if source not in threat_sources_occurences: threat_sources_occurences[source] = 0 if (source, severity) not in threat_groups[ id ].threats_by_source_and_severity: threat_groups[id].threats_by_source_and_severity[ (source, severity) ] = [] threat_sources_occurences[source] += 1 threat_groups[id].threats_by_source_and_severity[(source, severity)].append( threat_ ) # Disregard the counter for sources who appear only once for source, occurences in threat_sources_occurences.items(): if occurences == 1: threat_sources_occurences[source] = 0 else: threat_sources_occurences[source] = 1 return (threat_groups, threat_sources_occurences)