# SPDX-FileCopyrightText: 2025 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: BSD-3-Clause
import argparse
# For localization:
# import ctypes
import locale
import os
import pathlib
import sys
import typing
from json.decoder import JSONDecodeError
from colorama import just_fix_windows_console
from openpyxl.utils.exceptions import InvalidFileException
from flowstrider import __version__, rules, settings, storage
from flowstrider.converters import (
dfd_to_dot_converter,
metadata_xlsx_converter,
threats_formatter,
threats_to_file_converter,
)
from flowstrider.helpers import rules_checker
from flowstrider.helpers.warnings import WarningsCounter
from flowstrider.models import dataflowdiagram, threat, threat_management
from flowstrider.rules.collections import all_collections
[docs]
def main():
# Initialize Localization
try:
if os.name != "posix":
locale_info = locale.getlocale()
if locale_info:
locale_info = locale_info[0]
else:
locale.setlocale(locale.LC_ALL, "")
locale_info = locale.getlocale(locale.LC_MESSAGES)
if locale_info:
locale_info = locale_info[0]
if locale_info[:2] == "de":
lang_sys_string = "de"
else:
lang_sys_string = "en"
except Exception:
lang_sys_string = "en"
# Set system language
settings.init_localization(lang_sys_string, "sys")
_ = settings.lang_sys.gettext
exit_code = 0
just_fix_windows_console() # Fixes ANSI escape chars for windows
rules_checker.check_rule_name_duplicates()
# Parse command line
parser = argparse.ArgumentParser("flowstrider")
subparsers = parser.add_subparsers(
required=True,
dest="subcommand",
help="Subcommand",
)
# Threat elicitation
parser_elicit = subparsers.add_parser("elicit", help=_("Elicit threats."))
parser_elicit.add_argument(
"dfd_path",
type=pathlib.Path,
help=_("Path to dataflow diagram (in .json) you want to elicit."),
)
parser_elicit.add_argument(
"--management-path",
type=pathlib.Path,
help=_(
"Path to threat management information"
+ " file (.json). Will be created if it does not exist."
),
)
parser_elicit.add_argument(
"--output-path",
type=pathlib.Path,
help=_(
"Path to save the results as PDF to. Will not be saved as PDF if this "
+ "argument is missing."
),
)
parser_elicit.add_argument(
"--fail-on-threat",
choices=["off", "undecided", "todo", "all"],
default="off",
help=_(
"Tool fails when it identifies a threat with the given management state. "
+ "Default: off."
),
)
parser_elicit.add_argument(
"--out-lang",
choices=["en", "de"],
default=lang_sys_string,
help=_("Changes output language for elicitation and PDF."),
)
parser_elicit.add_argument(
"--filter",
action="append",
default=[],
help=_(
"Filter the elicited threats that are being displayed in the output "
+ "(console and PDF). Applicable filters (with example values) are: "
)
+ '"severity>1.0"; "rule_set=stride"; "location!=node_id"; '
+ '"management_state=mitigate,accept".'
+ _(
"This command can be used more than once, in which case the filters "
+ "add conjunctively. This means, only threats for which all filters apply "
+ "will be displayed."
),
)
parser_elicit.add_argument(
"--sort",
default="r-severity,alphabetical_source",
help=_(
"Sort the elicited threats that are being displayed in the output "
+ "(console and PDF). Applicable sorting criteria are: "
)
+ '"severity", "alphabetical_source", "alphabetical_location".'
+ _(
'Each criteria can be prepended with "r-" to allow for inverted '
+ "sorting. Multiple criteria can be combined with comma for secondary "
+ "sorting like: "
)
+ '"--sort "r-severity,alphabetical_source".',
)
parser_elicit.add_argument(
"--group",
default=None,
help=_(
"Group the elicited threats that are being displayed in the output "
+ "(console and PDF). It is possible to group by: "
)
+ '"source", "rule_set", "location", "management_state".',
)
parser_elicit.add_argument(
"-q",
"--quiet",
action="store_true",
default=False,
help=_(
"Limit the output on the console and the PDF to save space and give a more "
+ "compact overview. All threats are still being shown, but things like "
+ "descriptions and management state are hidden."
),
)
# check for metadata to add
parser_metadata = subparsers.add_parser(
"metadata", help=_("Give list of metadata you may want to add.")
)
parser_metadata.add_argument(
"dfd_path",
type=pathlib.Path,
help=_("Path to dataflow diagram in (.json) you want the metadata from."),
)
parser_metadata.add_argument(
"output_path",
type=pathlib.Path,
help=_("Path to save the metadata XLSX file to."),
)
parser_metadata.add_argument(
"--out-lang",
choices=["en", "de"],
default=lang_sys_string,
help=_("Changes output language for XLSX file."),
)
# update metadata from xlsx file
parser_update = subparsers.add_parser(
"update", help=_("Update metadata for a dfd.json from metadata.xlsx file.")
)
parser_update.add_argument(
"dfd_path",
type=pathlib.Path,
help=_("Path to dataflow diagram (in .json) you want to update."),
)
parser_update.add_argument(
"metadata_path",
type=pathlib.Path,
help=_("Path to metadata file (in .xlsx) you want to add."),
)
# Info
subparsers.add_parser("info", help=_("Print version information."))
args = parser.parse_args()
match args.subcommand:
case "elicit":
exit_code = elicit_cmd(
args.dfd_path,
args.management_path,
args.output_path,
args.fail_on_threat,
args.out_lang,
args.filter,
args.sort,
args.group,
args.quiet,
)
case "metadata":
exit_code = metadata_cmd(args.dfd_path, args.output_path, args.out_lang)
case "update":
exit_code = update_cmd(args.dfd_path, args.metadata_path)
case "info":
print(
_("FlowStrider - FlowStrider automates data flow-based threat modeling")
)
print(_("Version {vers}").format(vers=__version__))
print(_("DFD-version {vers}").format(vers=storage.DFD_VERSION))
print(
_("Threat management file version {vers}").format(
vers=storage.THREAT_MANAGEMENT_DB_VERSION
)
)
print(_("Copyright (C) 2025 German Aerospace Center DLR"))
# Inform of warnings
if WarningsCounter.count > 0:
print()
print(
settings.C_WARNING
+ wrap(
settings.lang_sys.ngettext(
"There was 1 warning!",
"There were {i} warnings!",
WarningsCounter.count,
).format(i=WarningsCounter.count)
)
+ settings.C_DEFAULT
)
sys.exit(exit_code)
CMD_LEFT_CHAR_WIDTH = settings.CMD_LEFT_CHAR_WIDTH
CMD_MAX_CHAR_WIDTH = settings.CMD_MAX_CHAR_WIDTH
wrap = dfd_to_dot_converter.wrap_text
[docs]
def elicit_cmd(
dfd_path: pathlib.Path,
management_path: typing.Optional[pathlib.Path],
output_path: typing.Optional[pathlib.Path],
fail_on_threat: str,
out_lang: str,
filters: typing.List[str],
sort: str,
group: str,
quiet: bool,
):
# Set output language
settings.init_localization(out_lang, "out")
_ = settings.lang_sys.gettext
# Attempt to open dfd file
try:
with open(dfd_path) as dfd_file:
serialized_dfd = dfd_file.read()
except FileNotFoundError:
print(
_(
"Error: Specified file '{path}' not found. Check that the file"
+ " exists and that the location is correct."
).format(path=dfd_path)
)
return 1
except IsADirectoryError:
print(
_(
"Error: '{path}' is a directory, please specify the path to a"
+ " .json file."
).format(path=dfd_path)
)
return 1
except PermissionError:
print(
_(
"Error: No permission to access '{path}'."
+ "Please specify a valid path to a"
+ " .json file."
).format(path=dfd_path)
)
return 1
# Deserialize dfd file in memory
try:
dfd: dataflowdiagram.DataflowDiagram = storage.deserialize_dfd(serialized_dfd)
except JSONDecodeError as error:
print(
_("Error: There is a json error in file '{path}':\n{err}.").format(
path=dfd_path, err=error
)
)
return 1
if management_path is not None and management_path.exists():
# Attempt to open management file if it exists
try:
with open(management_path) as management_file:
serialized_threat_management_database = management_file.read()
except IsADirectoryError:
print(
_(
"Error: '{path}' is a directory, please specify the path to a"
+ " .json file."
).format(path=management_path)
)
return 1
except PermissionError:
print(
_(
"Error: No permission to access '{path}'."
+ " Please specify a valid path to a"
+ " .json file."
).format(path=management_path)
)
return 1
# Deserialize management file in memory
try:
threat_management_database: threat_management.ThreatManagementDatabase = (
storage.deserialize_threat_management_database(
serialized_threat_management_database
)
)
except JSONDecodeError as error:
print(
_("Error: There is a json error in file '{path}':\n{err}.").format(
path=dfd_path, err=error
)
)
return 1
else:
# No threat management file given or file does not exist,
# initialize a new database.
threat_management_database = threat_management.ThreatManagementDatabase()
# Elicit threats using rules
results: typing.List[threat.Threat] = rules.elicit(dfd)
# Update threat management DB
threat_management_database.update(results, dfd)
if management_path is not None:
# Write changes to management file
try:
with open(management_path, "w") as management_file:
serialized_threat_management_database = (
storage.serialize_threat_management_database(
threat_management_database
)
)
management_file.write(serialized_threat_management_database)
except PermissionError:
print(
_(
"Error: No permission to access '{path}'."
+ "Please specify a valid path to a"
+ " .json file."
).format(path=management_path)
)
return 1
threats_info_holder = None
if fail_on_threat == "off":
# Only print the threats when not in CI/CD mode, otherwise it will be confusing
if threats_info_holder is None:
threats_info_holder = threats_formatter.format_threats(
dfd, results, threat_management_database, filters, sort, group
)
print_threats(dfd, threat_management_database, threats_info_holder, quiet)
if output_path is not None and not output_path.is_dir():
# Create dfd images and generate pdf with image and threats
dot_fail_result = dfd_to_dot_converter.render_dfd(dfd)
if threats_info_holder is None:
threats_info_holder = threats_formatter.format_threats(
dfd, results, threat_management_database, filters, sort, group
)
threats_to_file_converter.create_threats_pdf(
dfd,
threat_management_database,
threats_info_holder,
dfd_path,
management_path,
output_path,
quiet,
)
print()
if dot_fail_result == 0:
print(
wrap(
_("Results saved as PDF to '{path}'.").format(path=output_path)
+ " "
+ _(
"Diagram saved as PNG, SVG and dot to the output/visualization "
+ "folder."
)
)
)
else:
print(
wrap(
_("Results saved as PDF to '{path}'.").format(path=output_path)
+ " "
+ _("Diagram could not be generated.")
)
)
else:
print()
print(wrap(_("No or wrong output path specified. Threats were not saved.")))
if management_path is not None:
print()
print(
wrap(
_("Threat management states saved as JSON to ")
+ str(management_path)
+ "."
)
)
fail_result = threat_management_database.should_fail(results, dfd, fail_on_threat)
if len(fail_result) > 0:
print(
wrap(
_(
"The following threats caused a failure. Selected level: {level}"
).format(level=fail_on_threat)
)
)
threats_info_holder = threats_formatter.format_threats(
dfd, fail_result, threat_management_database, filters, sort, group
)
print_threats(dfd, threat_management_database, threats_info_holder, quiet)
return 1
else:
return 0
[docs]
def print_threats(
dfd: dataflowdiagram.DataflowDiagram,
threat_management_database: threat_management.ThreatManagementDatabase,
threats_info_holder: threats_formatter.ThreatsInfoContainer,
quiet: bool,
):
# Switch to output language
_ = settings.lang_out.gettext
print()
if "no_threats" in threats_info_holder.info_strings:
print(wrap(threats_info_holder.info_strings["no_threats"]) + "\n")
return 0
# Number of threats:
print(
settings.C_WARNING
+ wrap(threats_info_holder.info_strings["threat_numbers"])
+ "\n"
+ settings.C_DEFAULT
)
# Filters:
if len(threats_info_holder.info_strings["filters"]) > 0:
print(wrap(threats_info_holder.info_strings["filters"]) + "\n")
# Number of displayed threats:
if len(threats_info_holder.info_strings["threat_numbers_after_filters"]) > 0:
print(
wrap(threats_info_holder.info_strings["threat_numbers_after_filters"])
+ "\n"
)
# Used rule collections:
print(wrap(threats_info_holder.info_strings["collections_header"]))
i = 0
while "collection" + str(i) in threats_info_holder.info_strings:
print(wrap(threats_info_holder.info_strings["collection" + str(i)]))
i += 1
print()
sources_occurences: typing.Dict[str, int] = (
threats_info_holder.sources_occurences.copy()
)
# Iterate over all threat groups
source_index = 0
for group_index, (__, group) in enumerate(
threats_info_holder.threat_groups.items(), 1
):
# Print the group name (group.name == "" only occurs if there is only one group
# ...meaning no grouping was applied and groups are ignored for output)
if group.name != "":
print(
settings.C_HEADER
+ wrap(_("Group") + f" {str(group_index)} - {group.name}:")
+ settings.C_DEFAULT
+ "\n"
)
# Print each individual combination of threat source and severity within the
# ...current group
for index, ((source, severity), threats_) in enumerate(
group.threats_by_source_and_severity.items()
):
source_index += 1
if sources_occurences[source] > 0:
source_occurence = sources_occurences[source]
sources_occurences[source] += 1
print(
settings.C_HEADER
+ wrap(
_("#{number} Threat source: {src}").format(
number=source_index, src=source
)
+ " ("
+ str(source_occurence)
+ ")"
)
+ settings.C_DEFAULT
)
else:
print(
settings.C_HEADER
+ wrap(
_("#{number} Threat source: {src}").format(
number=source_index, src=source
)
)
+ settings.C_DEFAULT
)
# Print which rule set the rule came from if multiple rule sets are active
# ...("collection1" would be the second collection in the info strings if it
# ...exists meaning there are at least two)
if not quiet and "collection1" in threats_info_holder.info_strings:
left_offset = len(str(source_index)) + 2
temp_string = wrap(
"(" + threats_[0].rule_set_name + " " + _("rule") + ")",
CMD_MAX_CHAR_WIDTH - left_offset,
)
temp_strings = temp_string.split("\n")
for i in range(len(temp_strings)):
print(left_offset * " " + temp_strings[i])
if not quiet:
print()
# Print description
temp_string = wrap(
threats_[0].short_description,
CMD_MAX_CHAR_WIDTH - CMD_LEFT_CHAR_WIDTH - 1,
)
temp_strings = temp_string.split("\n")
print(
(CMD_LEFT_CHAR_WIDTH - len(_("Description:"))) * " "
+ _("Description:"),
end="",
)
for i in range(len(temp_strings)):
if i == 0:
print(" " + temp_strings[i])
else:
print((CMD_LEFT_CHAR_WIDTH + 1) * " " + temp_strings[i])
# Print severity
print(
(CMD_LEFT_CHAR_WIDTH - len(_("Severity:"))) * " "
+ _("Severity:")
+ " "
+ settings.C_WARNING
+ str(round(severity, 2))
+ settings.C_DEFAULT
)
if not quiet:
# Print long description
temp_string = wrap(
threats_[0].long_description,
CMD_MAX_CHAR_WIDTH - CMD_LEFT_CHAR_WIDTH - 1,
)
temp_strings = temp_string.split("\n")
print(
(CMD_LEFT_CHAR_WIDTH - len(_("Long Description:"))) * " "
+ _("Long Description:"),
end="",
)
for i in range(len(temp_strings)):
if i == 0:
print(" " + temp_strings[i])
else:
print((CMD_LEFT_CHAR_WIDTH + 1) * " " + temp_strings[i])
# Print mitigation options
for mitigation_option in threats_[0].mitigation_options:
temp_string = wrap(
mitigation_option, CMD_MAX_CHAR_WIDTH - CMD_LEFT_CHAR_WIDTH - 1
)
temp_strings = temp_string.split("\n")
print(
(CMD_LEFT_CHAR_WIDTH - len(_("Mitigation Option:"))) * " "
+ _("Mitigation Option:"),
end="",
)
for i in range(len(temp_strings)):
if i == 0:
print(" " + temp_strings[i])
else:
print((CMD_LEFT_CHAR_WIDTH + 1) * " " + temp_strings[i])
# Print requirements
if len(threats_[0].requirement) > 0:
temp_string = wrap(
threats_[0].requirement,
CMD_MAX_CHAR_WIDTH - CMD_LEFT_CHAR_WIDTH - 1,
)
temp_strings = temp_string.split("\n")
print(
(CMD_LEFT_CHAR_WIDTH - len(_("Requirement:"))) * " "
+ _("Requirement:"),
end="",
)
for i in range(len(temp_strings)):
if i == 0:
print(" " + temp_strings[i])
else:
print((CMD_LEFT_CHAR_WIDTH + 1) * " " + temp_strings[i])
# Print each location where the threat occurs
temp_string = _("Locations:") + " (" + str(len(threats_)) + ")"
print((CMD_LEFT_CHAR_WIDTH - len(temp_string)) * " " + temp_string)
for threat_ in threats_:
temp_string = wrap(
f"{threat_.location_str(dfd)}:",
CMD_MAX_CHAR_WIDTH - CMD_LEFT_CHAR_WIDTH - 1,
)
temp_strings = temp_string.split("\n")
for i in range(len(temp_strings)):
print((CMD_LEFT_CHAR_WIDTH + 1) * " " + temp_strings[i])
# Print status
req_status_list = threat_.req_status.split("\n")
for status in req_status_list:
if status != "":
temp_string = wrap(
status, CMD_MAX_CHAR_WIDTH - CMD_LEFT_CHAR_WIDTH - 3
)
temp_strings = temp_string.split("\n")
for i in range(len(temp_strings)):
print(
settings.C_WARNING
+ (CMD_LEFT_CHAR_WIDTH + 3) * " "
+ temp_strings[i]
+ settings.C_DEFAULT
)
if not quiet:
# Print management state and explanation
threat_management_item = threat_management_database.get(
threat_, dfd
)
temp_string = wrap(
_("Management State:")
+ " {content}".format(
content=threat_management_item.management_state
),
CMD_MAX_CHAR_WIDTH - CMD_LEFT_CHAR_WIDTH - 1,
)
temp_strings = temp_string.split("\n")
for i in range(len(temp_strings)):
print((CMD_LEFT_CHAR_WIDTH + 1) * " " + temp_strings[i])
if len(threat_management_item.explanation) > 0:
temp_string = wrap(
_("Management Explanation:")
+ " {content}".format(
content=threat_management_item.explanation
),
CMD_MAX_CHAR_WIDTH - CMD_LEFT_CHAR_WIDTH - 1,
)
temp_strings = temp_string.split("\n")
for i in range(len(temp_strings)):
print((CMD_LEFT_CHAR_WIDTH + 1) * " " + temp_strings[i])
print()
print()
# Add references for used rule sets:
if not quiet and len(threats_info_holder.sources_occurences) > 0:
print(settings.C_HEADER + wrap(_("References:")) + settings.C_DEFAULT)
for collection in all_collections:
if collection.tag in dfd.tags:
print()
print(wrap(collection.name + " " + _("rule collection") + ":"))
for ref in collection.references:
print(wrap(text_to_wrap=ref, include_hyphen=False))
return 0
[docs]
def update_cmd(dfd_path: pathlib.Path, metadata_path: pathlib.Path):
# Set output language (important because attributes have to be initialized,
# ...localization not actually used for updating (no real output))
settings.init_localization("en", "out")
_ = settings.lang_out.gettext
try:
with open(dfd_path) as dfd_file:
serialized_dfd = dfd_file.read()
except FileNotFoundError:
print(
_(
"Error: Specified dfd file '{path}' not found. Check that the file"
+ " exists and that the location is correct."
).format(path=dfd_path)
)
return 1
except IsADirectoryError:
print(
_(
"Error: '{path}' is a directory. Please specify the path to a"
+ " .json file."
).format(path=dfd_path)
)
return 1
except PermissionError:
print(
_(
"Error: No permission to access '{path}'."
+ "Please specify a valid path to a"
+ " .json file."
).format(path=dfd_path)
)
return 1
dfd: dataflowdiagram.DataflowDiagram = storage.deserialize_dfd(serialized_dfd)
try:
dfd = metadata_xlsx_converter.update_dfd_json_from_xlsx(dfd, metadata_path)
except FileNotFoundError:
print(
_(
"Error: Specified file '{path}' not found. Check that the file"
+ " exists and that the location is correct."
).format(path=metadata_path)
)
return 1
except InvalidFileException:
print(
_(
"Error: '{path}' is of wrong file format. Please specify the"
+ " path to a .xlsx file."
).format(path=metadata_path)
)
return 1
except PermissionError:
print(
_(
"Error: No permission to access '{path}'."
+ "Please specify a valid path to a"
+ " .xlsx file."
).format(path=metadata_path)
)
return 1
serialized_dfd = storage.serialize_dfd(dfd)
with open(dfd_path, "w") as dfd_file:
dfd_file.write(serialized_dfd)
print(
_("Successfully updated diagram '{id}' in file '{path}'.").format(
id=dfd.id, path=dfd_path
)
)
return 0
if __name__ == "__main__":
main()