Source code for flowstrider.tool

# SPDX-FileCopyrightText: 2025 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse

# For localization:
# import ctypes
import locale
import os
import pathlib
import sys
import typing
from json.decoder import JSONDecodeError

from colorama import just_fix_windows_console
from openpyxl.utils.exceptions import InvalidFileException

from flowstrider import __version__, rules, settings, storage
from flowstrider.converters import (
    dfd_to_dot_converter,
    metadata_xsxl_converter,
    threats_to_file_converter,
)
from flowstrider.helpers.warnings import WarningsCounter
from flowstrider.models import dataflowdiagram, threat, threat_management
from flowstrider.rules.collections import all_collections


[docs] def main(): # Initialize Localization try: if os.name != "posix": locale_info = locale.getlocale() if locale_info: locale_info = locale_info[0] else: locale.setlocale(locale.LC_ALL, "") locale_info = locale.getlocale(locale.LC_MESSAGES) if locale_info: locale_info = locale_info[0] if locale_info[:2] == "de": lang_sys_string = "de" else: lang_sys_string = "en" except Exception: lang_sys_string = "en" # Set system language settings.init_localization(lang_sys_string, "sys") global _ _ = settings.lang_sys.gettext exit_code = 0 just_fix_windows_console() # Fixes ANSI escape chars for windows # Parse command line parser = argparse.ArgumentParser("flowstrider") subparsers = parser.add_subparsers( required=True, dest="subcommand", help="Subcommand" ) # Threat elicitation parser_elicit = subparsers.add_parser("elicit", help=_("Elicit threats.")) parser_elicit.add_argument( "dfd_path", type=pathlib.Path, help=_("Path to dataflow diagram (in .json) you want to elicit."), ) parser_elicit.add_argument( "--management-path", type=pathlib.Path, help=_( "Path to threat management information" + " file (.json). Will be created if it does not exist." ), ) parser_elicit.add_argument( "--output-path", type=pathlib.Path, help=_( "Path to save the results as PDF to. Will not be saved as PDF if this " + "argument is missing." ), ) parser_elicit.add_argument( "--fail-on-threat", choices=["off", "undecided", "todo", "all"], default="off", help=_( "Tool fails when it identifies a threat with the given management state." ), ) parser_elicit.add_argument( "--out-lang", choices=["en", "de"], default=lang_sys_string, help=_("Changes output language for elicitation and PDF."), ) # check for metadata to add parser_metadata = subparsers.add_parser( "metadata", help=_("Give list of metadata you may want to add.") ) parser_metadata.add_argument( "dfd_path", type=pathlib.Path, help=_("Path to dataflow diagram in (.json) you want the metadata from."), ) parser_metadata.add_argument( "output_path", type=pathlib.Path, help=_("Path to save the metadata XLSX file to."), ) parser_metadata.add_argument( "--out-lang", choices=["en", "de"], default=lang_sys_string, help=_("Changes output language for XLSX file."), ) # update metadata from xlsx file parser_update = subparsers.add_parser( "update", help=_("Update metadata for a dfd.json from metadata.xlsx file.") ) parser_update.add_argument( "dfd_path", type=pathlib.Path, help=_("Path to dataflow diagram (in .json) you want to update."), ) parser_update.add_argument( "metadata_path", type=pathlib.Path, help=_("Path to metadata file (in .xlsx) you want to add."), ) # Info subparsers.add_parser("info", help=_("Print version information.")) args = parser.parse_args() match args.subcommand: case "elicit": exit_code = elicit_cmd( args.dfd_path, args.management_path, args.output_path, args.fail_on_threat, args.out_lang, ) case "metadata": exit_code = metadata_cmd(args.dfd_path, args.output_path, args.out_lang) case "update": exit_code = update_cmd(args.dfd_path, args.metadata_path) case "info": print( _("FlowStrider - FlowStrider automates data flow-based threat modeling") ) print(_("Version {vers}").format(vers=__version__)) print(_("Copyright (C) 2025 German Aerospace Center DLR")) # Inform of warnings if WarningsCounter.count > 0: print() print( settings.C_WARNING + wrap( settings.lang_sys.ngettext( "There was 1 warning!", "There were {i} warnings!", WarningsCounter.count, ).format(i=WarningsCounter.count) ) + settings.C_DEFAULT ) sys.exit(exit_code)
CMD_LEFT_CHAR_WIDTH = settings.CMD_LEFT_CHAR_WIDTH CMD_MAX_CHAR_WIDTH = settings.CMD_MAX_CHAR_WIDTH wrap = dfd_to_dot_converter.wrap_text
[docs] def elicit_cmd( dfd_path: pathlib.Path, management_path: typing.Optional[pathlib.Path], output_path: typing.Optional[pathlib.Path], fail_on_threat: str, out_lang: str, ): # Set output language settings.init_localization(out_lang, "out") # Attempt to open dfd file try: with open(dfd_path) as dfd_file: serialized_dfd = dfd_file.read() except FileNotFoundError: print( _( "Error: Specified file '{path}' not found. Check that the file" + " exists and that the location is correct." ).format(path=dfd_path) ) return 1 except IsADirectoryError: print( _( "Error: '{path}' is a directory, please specify the path to a" + " .json file." ).format(path=dfd_path) ) return 1 except PermissionError: print( _( "Error: No permission to access '{path}'." + "Please specify a valid path to a" + " .json file." ).format(path=dfd_path) ) return 1 # Deserialize dfd file in memory try: dfd: dataflowdiagram.DataflowDiagram = storage.deserialize_dfd(serialized_dfd) except JSONDecodeError as error: print( _("Error: There is a json error in file '{path}':\n{err}.").format( path=dfd_path, err=error ) ) return 1 if management_path is not None and management_path.exists(): # Attempt to open management file if it exists try: with open(management_path) as management_file: serialized_threat_management_database = management_file.read() except IsADirectoryError: print( _( "Error: '{path}' is a directory, please specify the path to a" + " .json file." ).format(path=management_path) ) return 1 except PermissionError: print( _( "Error: No permission to access '{path}'." + " Please specify a valid path to a" + " .json file." ).format(path=management_path) ) return 1 # Deserialize management file in memory try: threat_management_database: threat_management.ThreatManagementDatabase = ( storage.deserialize_threat_management_database( serialized_threat_management_database ) ) except JSONDecodeError as error: print( _("Error: There is a json error in file '{path}':\n{err}.").format( path=dfd_path, err=error ) ) return 1 else: # No threat management file given or file does not exist, # initialize a new database. threat_management_database: threat_management.ThreatManagementDatabase = ( threat_management.ThreatManagementDatabase() ) # Elicit threats using rules results: typing.List[threat.Threat] = rules.elicit(dfd) # Update threat management DB threat_management_database.update(results, dfd) if management_path is not None: # Write changes to management file try: with open(management_path, "w") as management_file: serialized_threat_management_database = ( storage.serialize_threat_management_database( threat_management_database ) ) management_file.write(serialized_threat_management_database) except PermissionError: print( _( "Error: No permission to access '{path}'." + "Please specify a valid path to a" + " .json file." ).format(path=management_path) ) return 1 if fail_on_threat == "off": # Only print the threats when not in CI/CD mode, otherwise it will be confusing print_threats(dfd, results, threat_management_database) if output_path is not None and not output_path.is_dir(): # Create dfd images and generate pdf with image and threats dfd_to_dot_converter.render_dfd(dfd) threats_to_file_converter.create_threats_pdf( results, dfd, threat_management_database, output_path ) print() print( wrap( _( "Results saved as PDF to " + "'{path}'. " + "Diagram" + " saved as PNG, SVG and Graphviz to the output/visualization" + " folder." ).format(path=output_path) ) ) else: print() print(wrap(_("No or wrong output path specified. Threats were not saved."))) if management_path is not None: print() print( wrap( _("Threat management states saved as JSON to ") + str(management_path) + "." ) ) fail_result = threat_management_database.should_fail(results, dfd, fail_on_threat) if len(fail_result) > 0: print( wrap( _( "The following threats caused a failure. Selected level: {level}" ).format(level=fail_on_threat) ) ) print_threats(dfd, fail_result, threat_management_database) return 1 else: return 0
[docs] def metadata_cmd(dfd_path: pathlib.Path, output_path: pathlib.Path, out_lang: str): # Set output language settings.init_localization(out_lang, "out") # Attempt to open file try: with open(dfd_path) as dfd_file: serialized_dfd = dfd_file.read() except FileNotFoundError: print( _( "Error: Specified file '{path}' not found. Check that the file" + " exists and that the location is correct." ).format(path=dfd_path) ) return 1 except IsADirectoryError: print( _( "Error: '{path}' is a directory. Please specify the path to a" + " .json file." ).format(path=dfd_path) ) return 1 except PermissionError: print( _( "Error: No permission to access '{path}'." + "Please specify a valid path to a" + " .json file." ).format(path=dfd_path) ) return 1 # Deserialize file in memory try: dfd: dataflowdiagram.DataflowDiagram = storage.deserialize_dfd(serialized_dfd) except JSONDecodeError as error: print( _("Error: There is a json error in file '{path}':\n{err}.").format( path=dfd_path, err=error ) ) return 1 metadata_xsxl_converter.metadata_check(dfd, output_path) print( _( "Metadata file of diagram '{id}' saved to" + " '{out_path}'.\nAfter modifying the metadata" + " you can update the data-flow diagram with the command 'update'." ).format(id=dfd.id, out_path=output_path) ) return 0
[docs] def update_cmd(dfd_path: pathlib.Path, metadata_path: pathlib.Path): # Set output language (important because attributes have to be initialized, # ...localization not actually used for updating (no real output)) settings.init_localization("en", "out") try: with open(dfd_path) as dfd_file: serialized_dfd = dfd_file.read() except FileNotFoundError: print( _( "Error: Specified dfd file '{path}' not found. Check that the file" + " exists and that the location is correct." ).format(path=dfd_path) ) return 1 except IsADirectoryError: print( _( "Error: '{path}' is a directory. Please specify the path to a" + " .json file." ).format(path=dfd_path) ) return 1 except PermissionError: print( _( "Error: No permission to access '{path}'." + "Please specify a valid path to a" + " .json file." ).format(path=dfd_path) ) return 1 dfd: dataflowdiagram.DataflowDiagram = storage.deserialize_dfd(serialized_dfd) try: dfd = metadata_xsxl_converter.update_dfd_json_from_xlsx(dfd, metadata_path) except FileNotFoundError: print( _( "Error: Specified file '{path}' not found. Check that the file" + " exists and that the location is correct." ).format(path=metadata_path) ) return 1 except InvalidFileException: print( _( "Error: '{path}' is of wrong file format. Please specify the" + " path to a .xlsx file." ).format(path=metadata_path) ) return 1 except PermissionError: print( _( "Error: No permission to access '{path}'." + "Please specify a valid path to a" + " .xlsx file." ).format(path=metadata_path) ) return 1 serialized_dfd = storage.serialize_dfd(dfd) with open(dfd_path, "w") as dfd_file: dfd_file.write(serialized_dfd) print( _("Successfully updated diagram '{id}' in file '{path}'.").format( id=dfd.id, path=dfd_path ) ) return 0
if __name__ == "__main__": main()