Source code for flowstrider.tool

# SPDX-FileCopyrightText: 2025 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse

# For localization:
# import ctypes
import locale
import os
import pathlib
import sys
import typing
from json.decoder import JSONDecodeError

from colorama import just_fix_windows_console
from openpyxl.utils.exceptions import InvalidFileException

from flowstrider import __version__, rules, settings, storage
from flowstrider.converters import (
    dfd_to_dot_converter,
    metadata_xlsx_converter,
    threats_formatter,
    threats_to_file_converter,
)
from flowstrider.helpers import rules_checker
from flowstrider.helpers.warnings import WarningsCounter
from flowstrider.models import dataflowdiagram, threat, threat_management
from flowstrider.rules.collections import all_collections


[docs] def main(): # Initialize Localization try: if os.name != "posix": locale_info = locale.getlocale() if locale_info: locale_info = locale_info[0] else: locale.setlocale(locale.LC_ALL, "") locale_info = locale.getlocale(locale.LC_MESSAGES) if locale_info: locale_info = locale_info[0] if locale_info[:2] == "de": lang_sys_string = "de" else: lang_sys_string = "en" except Exception: lang_sys_string = "en" # Set system language settings.init_localization(lang_sys_string, "sys") _ = settings.lang_sys.gettext exit_code = 0 just_fix_windows_console() # Fixes ANSI escape chars for windows rules_checker.check_rule_name_duplicates() # Parse command line parser = argparse.ArgumentParser("flowstrider") subparsers = parser.add_subparsers( required=True, dest="subcommand", help="Subcommand", ) # Threat elicitation parser_elicit = subparsers.add_parser("elicit", help=_("Elicit threats.")) parser_elicit.add_argument( "dfd_path", type=pathlib.Path, help=_("Path to dataflow diagram (in .json) you want to elicit."), ) parser_elicit.add_argument( "--management-path", type=pathlib.Path, help=_( "Path to threat management information" + " file (.json). Will be created if it does not exist." ), ) parser_elicit.add_argument( "--output-path", type=pathlib.Path, help=_( "Path to save the results as PDF to. Will not be saved as PDF if this " + "argument is missing." ), ) parser_elicit.add_argument( "--fail-on-threat", choices=["off", "undecided", "todo", "all"], default="off", help=_( "Tool fails when it identifies a threat with the given management state. " + "Default: off." ), ) parser_elicit.add_argument( "--out-lang", choices=["en", "de"], default=lang_sys_string, help=_("Changes output language for elicitation and PDF."), ) parser_elicit.add_argument( "--filter", action="append", default=[], help=_( "Filter the elicited threats that are being displayed in the output " + "(console and PDF). Applicable filters (with example values) are: " ) + '"severity>1.0"; "rule_set=stride"; "location!=node_id"; ' + '"management_state=mitigate,accept".' + _( "This command can be used more than once, in which case the filters " + "add conjunctively. This means, only threats for which all filters apply " + "will be displayed." ), ) parser_elicit.add_argument( "--sort", default="r-severity,alphabetical_source", help=_( "Sort the elicited threats that are being displayed in the output " + "(console and PDF). Applicable sorting criteria are: " ) + '"severity", "alphabetical_source", "alphabetical_location".' + _( 'Each criteria can be prepended with "r-" to allow for inverted ' + "sorting. Multiple criteria can be combined with comma for secondary " + "sorting like: " ) + '"--sort "r-severity,alphabetical_source".', ) parser_elicit.add_argument( "--group", default=None, help=_( "Group the elicited threats that are being displayed in the output " + "(console and PDF). It is possible to group by: " ) + '"source", "rule_set", "location", "management_state".', ) parser_elicit.add_argument( "-q", "--quiet", action="store_true", default=False, help=_( "Limit the output on the console and the PDF to save space and give a more " + "compact overview. All threats are still being shown, but things like " + "descriptions and management state are hidden." ), ) # check for metadata to add parser_metadata = subparsers.add_parser( "metadata", help=_("Give list of metadata you may want to add.") ) parser_metadata.add_argument( "dfd_path", type=pathlib.Path, help=_("Path to dataflow diagram in (.json) you want the metadata from."), ) parser_metadata.add_argument( "output_path", type=pathlib.Path, help=_("Path to save the metadata XLSX file to."), ) parser_metadata.add_argument( "--out-lang", choices=["en", "de"], default=lang_sys_string, help=_("Changes output language for XLSX file."), ) # update metadata from xlsx file parser_update = subparsers.add_parser( "update", help=_("Update metadata for a dfd.json from metadata.xlsx file.") ) parser_update.add_argument( "dfd_path", type=pathlib.Path, help=_("Path to dataflow diagram (in .json) you want to update."), ) parser_update.add_argument( "metadata_path", type=pathlib.Path, help=_("Path to metadata file (in .xlsx) you want to add."), ) # Info subparsers.add_parser("info", help=_("Print version information.")) args = parser.parse_args() match args.subcommand: case "elicit": exit_code = elicit_cmd( args.dfd_path, args.management_path, args.output_path, args.fail_on_threat, args.out_lang, args.filter, args.sort, args.group, args.quiet, ) case "metadata": exit_code = metadata_cmd(args.dfd_path, args.output_path, args.out_lang) case "update": exit_code = update_cmd(args.dfd_path, args.metadata_path) case "info": print( _("FlowStrider - FlowStrider automates data flow-based threat modeling") ) print(_("Version {vers}").format(vers=__version__)) print(_("DFD-version {vers}").format(vers=storage.DFD_VERSION)) print( _("Threat management file version {vers}").format( vers=storage.THREAT_MANAGEMENT_DB_VERSION ) ) print(_("Copyright (C) 2025 German Aerospace Center DLR")) # Inform of warnings if WarningsCounter.count > 0: print() print( settings.C_WARNING + wrap( settings.lang_sys.ngettext( "There was 1 warning!", "There were {i} warnings!", WarningsCounter.count, ).format(i=WarningsCounter.count) ) + settings.C_DEFAULT ) sys.exit(exit_code)
CMD_LEFT_CHAR_WIDTH = settings.CMD_LEFT_CHAR_WIDTH CMD_MAX_CHAR_WIDTH = settings.CMD_MAX_CHAR_WIDTH wrap = dfd_to_dot_converter.wrap_text
[docs] def elicit_cmd( dfd_path: pathlib.Path, management_path: typing.Optional[pathlib.Path], output_path: typing.Optional[pathlib.Path], fail_on_threat: str, out_lang: str, filters: typing.List[str], sort: str, group: str, quiet: bool, ): # Set output language settings.init_localization(out_lang, "out") _ = settings.lang_sys.gettext # Attempt to open dfd file try: with open(dfd_path) as dfd_file: serialized_dfd = dfd_file.read() except FileNotFoundError: print( _( "Error: Specified file '{path}' not found. Check that the file" + " exists and that the location is correct." ).format(path=dfd_path) ) return 1 except IsADirectoryError: print( _( "Error: '{path}' is a directory, please specify the path to a" + " .json file." ).format(path=dfd_path) ) return 1 except PermissionError: print( _( "Error: No permission to access '{path}'." + "Please specify a valid path to a" + " .json file." ).format(path=dfd_path) ) return 1 # Deserialize dfd file in memory try: dfd: dataflowdiagram.DataflowDiagram = storage.deserialize_dfd(serialized_dfd) except JSONDecodeError as error: print( _("Error: There is a json error in file '{path}':\n{err}.").format( path=dfd_path, err=error ) ) return 1 if management_path is not None and management_path.exists(): # Attempt to open management file if it exists try: with open(management_path) as management_file: serialized_threat_management_database = management_file.read() except IsADirectoryError: print( _( "Error: '{path}' is a directory, please specify the path to a" + " .json file." ).format(path=management_path) ) return 1 except PermissionError: print( _( "Error: No permission to access '{path}'." + " Please specify a valid path to a" + " .json file." ).format(path=management_path) ) return 1 # Deserialize management file in memory try: threat_management_database: threat_management.ThreatManagementDatabase = ( storage.deserialize_threat_management_database( serialized_threat_management_database ) ) except JSONDecodeError as error: print( _("Error: There is a json error in file '{path}':\n{err}.").format( path=dfd_path, err=error ) ) return 1 else: # No threat management file given or file does not exist, # initialize a new database. threat_management_database = threat_management.ThreatManagementDatabase() # Elicit threats using rules results: typing.List[threat.Threat] = rules.elicit(dfd) # Update threat management DB threat_management_database.update(results, dfd) if management_path is not None: # Write changes to management file try: with open(management_path, "w") as management_file: serialized_threat_management_database = ( storage.serialize_threat_management_database( threat_management_database ) ) management_file.write(serialized_threat_management_database) except PermissionError: print( _( "Error: No permission to access '{path}'." + "Please specify a valid path to a" + " .json file." ).format(path=management_path) ) return 1 threats_info_holder = None if fail_on_threat == "off": # Only print the threats when not in CI/CD mode, otherwise it will be confusing if threats_info_holder is None: threats_info_holder = threats_formatter.format_threats( dfd, results, threat_management_database, filters, sort, group ) print_threats(dfd, threat_management_database, threats_info_holder, quiet) if output_path is not None and not output_path.is_dir(): # Create dfd images and generate pdf with image and threats dot_fail_result = dfd_to_dot_converter.render_dfd(dfd) if threats_info_holder is None: threats_info_holder = threats_formatter.format_threats( dfd, results, threat_management_database, filters, sort, group ) threats_to_file_converter.create_threats_pdf( dfd, threat_management_database, threats_info_holder, dfd_path, management_path, output_path, quiet, ) print() if dot_fail_result == 0: print( wrap( _("Results saved as PDF to '{path}'.").format(path=output_path) + " " + _( "Diagram saved as PNG, SVG and dot to the output/visualization " + "folder." ) ) ) else: print( wrap( _("Results saved as PDF to '{path}'.").format(path=output_path) + " " + _("Diagram could not be generated.") ) ) else: print() print(wrap(_("No or wrong output path specified. Threats were not saved."))) if management_path is not None: print() print( wrap( _("Threat management states saved as JSON to ") + str(management_path) + "." ) ) fail_result = threat_management_database.should_fail(results, dfd, fail_on_threat) if len(fail_result) > 0: print( wrap( _( "The following threats caused a failure. Selected level: {level}" ).format(level=fail_on_threat) ) ) threats_info_holder = threats_formatter.format_threats( dfd, fail_result, threat_management_database, filters, sort, group ) print_threats(dfd, threat_management_database, threats_info_holder, quiet) return 1 else: return 0
[docs] def metadata_cmd(dfd_path: pathlib.Path, output_path: pathlib.Path, out_lang: str): # Set output language settings.init_localization(out_lang, "out") _ = settings.lang_out.gettext # Attempt to open file try: with open(dfd_path) as dfd_file: serialized_dfd = dfd_file.read() except FileNotFoundError: print( _( "Error: Specified file '{path}' not found. Check that the file" + " exists and that the location is correct." ).format(path=dfd_path) ) return 1 except IsADirectoryError: print( _( "Error: '{path}' is a directory. Please specify the path to a" + " .json file." ).format(path=dfd_path) ) return 1 except PermissionError: print( _( "Error: No permission to access '{path}'." + "Please specify a valid path to a" + " .json file." ).format(path=dfd_path) ) return 1 # Deserialize file in memory try: dfd: dataflowdiagram.DataflowDiagram = storage.deserialize_dfd(serialized_dfd) except JSONDecodeError as error: print( _("Error: There is a json error in file '{path}':\n{err}.").format( path=dfd_path, err=error ) ) return 1 metadata_xlsx_converter.metadata_check(dfd, output_path) print( _( "Metadata file of diagram '{id}' saved to" + " '{out_path}'.\nAfter modifying the metadata" + " you can update the data-flow diagram with the command 'update'." ).format(id=dfd.id, out_path=output_path) ) return 0
[docs] def update_cmd(dfd_path: pathlib.Path, metadata_path: pathlib.Path): # Set output language (important because attributes have to be initialized, # ...localization not actually used for updating (no real output)) settings.init_localization("en", "out") _ = settings.lang_out.gettext try: with open(dfd_path) as dfd_file: serialized_dfd = dfd_file.read() except FileNotFoundError: print( _( "Error: Specified dfd file '{path}' not found. Check that the file" + " exists and that the location is correct." ).format(path=dfd_path) ) return 1 except IsADirectoryError: print( _( "Error: '{path}' is a directory. Please specify the path to a" + " .json file." ).format(path=dfd_path) ) return 1 except PermissionError: print( _( "Error: No permission to access '{path}'." + "Please specify a valid path to a" + " .json file." ).format(path=dfd_path) ) return 1 dfd: dataflowdiagram.DataflowDiagram = storage.deserialize_dfd(serialized_dfd) try: dfd = metadata_xlsx_converter.update_dfd_json_from_xlsx(dfd, metadata_path) except FileNotFoundError: print( _( "Error: Specified file '{path}' not found. Check that the file" + " exists and that the location is correct." ).format(path=metadata_path) ) return 1 except InvalidFileException: print( _( "Error: '{path}' is of wrong file format. Please specify the" + " path to a .xlsx file." ).format(path=metadata_path) ) return 1 except PermissionError: print( _( "Error: No permission to access '{path}'." + "Please specify a valid path to a" + " .xlsx file." ).format(path=metadata_path) ) return 1 serialized_dfd = storage.serialize_dfd(dfd) with open(dfd_path, "w") as dfd_file: dfd_file.write(serialized_dfd) print( _("Successfully updated diagram '{id}' in file '{path}'.").format( id=dfd.id, path=dfd_path ) ) return 0
if __name__ == "__main__": main()