# SPDX-FileCopyrightText: 2025 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: BSD-3-Clause
import typing
from dataclasses import dataclass
import marshmallow_dataclass
from flowstrider import settings
from flowstrider.converters.dfd_to_dot_converter import wrap_text as wrap
from flowstrider.helpers.warnings import WarningsCounter
from flowstrider.models import dataflowdiagram, threat, threat_management
from flowstrider.rules import collections
@dataclass
class Container:
dfd: dataflowdiagram.DataflowDiagram
ThreatList = typing.List[threat.Threat]
@dataclass
class ThreatContainer:
threats: ThreatList
@dataclass
class ThreatManagementContainer:
threat_management_data: threat_management.ThreatManagementDatabase
container_schema = marshmallow_dataclass.class_schema(Container)()
threat_container_schema = marshmallow_dataclass.class_schema(ThreatContainer)()
threat_management_container_schema = marshmallow_dataclass.class_schema(
ThreatManagementContainer
)()
DFD_CORRECT_ELEMENT_TAGS = [
"STRIDE:Interactor",
"STRIDE:DataStore",
"STRIDE:Process",
"STRIDE:Dataflow",
"STRIDE:TrustBoundary",
]
DFD_CORRECT_RULE_TAGS = []
for collection in collections.all_collections:
for tag in collection.tags:
DFD_CORRECT_RULE_TAGS.append(tag)
[docs]
def deserialize_dfd(serialized_dfd: str) -> dataflowdiagram.DataflowDiagram:
"""Convert a dfd given as string to a DataflowDiagram class"""
check_dfd(serialized_dfd)
dfd: dataflowdiagram.DataflowDiagram = container_schema.loads(serialized_dfd).dfd
if not isinstance(dfd, dataflowdiagram.DataflowDiagram):
raise ValueError("Could not deserialize DataflowDiagram: wrong type!")
return dfd
def check_dfd(serialized_dfd: str) -> None:
"""Check a dfd given as string for errors and print warnings"""
from flowstrider.rules import attributes_dict
def print_warning(warning_key: str, line: int, **str_substitutes: str):
"""Function used to print warnings to cmd"""
global _
_ = settings.lang_sys.gettext
warnings = {
"id_reuse": _('id "{id}" was used for more than one object in the dfd.'),
"name_id_mismatch": _(
'element name "{name}" does not correspond with its id "{id}".'
),
"invalid_tag": _('tag "{tag}" is not a valid tag.'),
"missing_reference": _('id "{id}" was referenced but not found.'),
"wrong_attribute": _('attribute "{att}" is not a valid attribute.'),
"wrong_tag_for_attribute": _(
'attribute "{att}" is not applicable for an element of type {tag}.'
),
"negative_severity": _("the severity multiplier can't be negative."),
}
print(
settings.C_WARNING
+ wrap(
_("Warning: ")
+ _(warnings[warning_key]).format(**str_substitutes)
+ " (JSON "
+ _("line")
+ " "
+ str(line)
+ ")"
)
+ settings.C_DEFAULT
)
WarningsCounter.add_warning()
# Checking for errors in the JSON:
# Check for multiple uses of the same id
unique_ids = []
lines = serialized_dfd.split("\n")
for i in range(len(lines)):
line = lines[i]
# For each occurence of an id in the JSON
if '"id":' in line:
id_check = line[line.find(":") + 1 :]
id_check = id_check[id_check.find('"') + 1 :]
id_check = id_check[: id_check.find('"')]
# Add to unique ids or throw error if already inside
if id_check in unique_ids:
print_warning("id_reuse", i + 1, id=id_check)
else:
unique_ids.append(id_check)
# Check, that id corresponds with its node name in JSON
name_check = lines[i - 1]
name_check = name_check[name_check.find('"') + 1 :]
name_check = name_check[: name_check.find('"')]
if id_check != name_check and name_check != "dfd":
print_warning("name_id_mismatch", i, name=name_check, id=id_check)
# Check for severity_multiplicator being negative (should throw a warning)
elif '"severity_multiplier":' in line:
multp_check = line[line.find(":") + 1 :]
multp_check = multp_check.strip()
sev_multp = float(multp_check)
if sev_multp < 0:
print_warning("negative_severity", i + 1)
current_tags = []
for i in range(len(lines)):
line = lines[i]
# Check, that tags are correct
if '"tags":' in line:
current_tags = []
# Tags in one line
if "]" in line:
tag_check_list = line[line.find("[") + 1 : line.find("]")].split(",")
for tag_check in tag_check_list:
tag_check = tag_check.replace('"', "").strip()
if len(tag_check) == 0:
continue
current_tags.append(tag_check)
if (
tag_check not in DFD_CORRECT_ELEMENT_TAGS
and tag_check not in DFD_CORRECT_RULE_TAGS
):
print_warning("invalid_tag", i + 1, tag=tag_check)
else:
# Tags in mutiple lines
j = i + 1
while "]" not in lines[j]:
tag_check = lines[j][lines[j].find('"') + 1 :]
tag_check = tag_check[: tag_check.find('"')]
if len(tag_check) == 0:
j += 1
continue
current_tags.append(tag_check)
if (
tag_check not in DFD_CORRECT_ELEMENT_TAGS
and tag_check not in DFD_CORRECT_RULE_TAGS
):
print_warning("invalid_tag", j + 1, tag=tag_check)
j += 1
# Check for references to non existing ids
# Edges referencing source and sink
if '"source_id":' in line or '"sink_id":' in line:
id_check = line[line.find(":") + 1 :]
id_check = id_check[id_check.find('"') + 1 :]
id_check = id_check[: id_check.find('"')]
if id_check not in unique_ids:
print_warning("missing_reference", i + 1, id=id_check)
# Clusters referencing nodes inside
elif '"node_ids":' in line:
# Nodes in one line
if "]" in line:
id_check_list = line[line.find("[") + 1 : line.find("]")].split(",")
for id_check in id_check_list:
id_check = id_check.replace('"', "").strip()
if id_check not in unique_ids:
print_warning("missing_reference", i + 1, id=id_check)
# Nodes in multiple lines
else:
j = i + 1
while "]" not in lines[j]:
id_check = lines[j][lines[j].find('"') + 1 :]
id_check = id_check[: id_check.find('"')]
if id_check not in unique_ids:
print_warning("missing_reference", j + 1, id=id_check)
j += 1
# Check for false attribute usage
if '"attributes":' in line:
# Attributes in one line
if "}" in line:
att_check_list = line[line.find("{") + 1 : line.find("}")].split(",")
for att_check in att_check_list:
att_key = att_check.split(":")[0].replace('"', "").strip()
if not att_key:
continue
if att_key not in attributes_dict.attributes:
print_warning("wrong_attribute", i + 1, att=att_key)
continue
tags_match = False
for tag in current_tags:
# Exception for dfds
if tag in DFD_CORRECT_RULE_TAGS:
tag = "DataflowDiagram"
current_tags = ["DataflowDiagram"]
tag = tag[tag.find(":") + 1 :]
for correct_tag in attributes_dict.attributes[att_key][2]:
if tag in correct_tag:
tags_match = True
if not tags_match:
print_warning(
"wrong_tag_for_attribute",
i + 1,
att=att_key,
tag=", ".join(current_tags),
)
# Attributes in multiple lines
else:
j = i
while "}" not in lines[j]:
att_check = lines[j]
if att_check.find("{") != -1:
att_check = att_check[att_check.find("{") :]
att_check = att_check.replace(",", "").replace("{", "")
# Check if attribute is in line (there could also be just a value)
if ":" not in lines[j]:
j += 1
continue
att_key = att_check.split(":")[0].replace('"', "").strip()
if len(att_key) == 0:
j += 1
continue
if att_key not in attributes_dict.attributes:
print_warning("wrong_attribute", j + 1, att=att_key)
j += 1
continue
tags_match = False
for tag in current_tags:
# Exception for dfds
if tag in DFD_CORRECT_RULE_TAGS:
tag = "DataflowDiagram"
current_tags = ["DataflowDiagram"]
tag = tag[tag.find(":") + 1 :]
for correct_tag in attributes_dict.attributes[att_key][2]:
if tag in correct_tag:
tags_match = True
if not tags_match:
print_warning(
"wrong_tag_for_attribute",
j + 1,
att=att_key,
tag=", ".join(current_tags),
)
j += 1
[docs]
def undictify_dfd(dictified_dfd: dict) -> dataflowdiagram.DataflowDiagram:
dfd: dataflowdiagram.DataflowDiagram = container_schema.load(dictified_dfd).dfd
if not isinstance(dfd, dataflowdiagram.DataflowDiagram):
raise ValueError("Could not deserialize DataflowDiagram: wrong type!")
return dfd
[docs]
def serialize_dfd(dfd: dataflowdiagram.DataflowDiagram) -> str:
serialized_dfd: str = container_schema.dumps(Container(dfd), indent=2)
return serialized_dfd
[docs]
def dictify_dfd(dfd: dataflowdiagram.DataflowDiagram) -> str:
dictified_dfd: dict = container_schema.dump(Container(dfd))
return dictified_dfd
[docs]
def deserialize_threats(serialized_threats: str) -> ThreatList:
threats: ThreatList = threat_container_schema.loads(serialized_threats).dfd
if not isinstance(threats, ThreatList):
raise ValueError("Could not deserialize Threats: wrong type!")
return threats
[docs]
def undictify_threats(dictified_threats: dict) -> ThreatList:
threats: ThreatList = threat_container_schema.load(dictified_threats).dfd
if not isinstance(threats, ThreatList):
raise ValueError("Could not deserialize Threats: wrong type!")
return threats
[docs]
def serialize_threats(threats: ThreatList) -> str:
serialized_threats: str = threat_container_schema.dumps(
ThreatContainer(threats), indent=2
)
return serialized_threats
[docs]
def dictify_threats(threats: ThreatList) -> str:
dictified_threats: dict = threat_container_schema.dump(ThreatContainer(threats))
return dictified_threats
[docs]
def deserialize_threat_management_database(
serialized_threat_management_database: str,
) -> threat_management.ThreatManagementDatabase:
threat_management_database: threat_management.ThreatManagementDatabase = (
threat_management_container_schema.loads(
serialized_threat_management_database
).threat_management_data
)
if not isinstance(
threat_management_database, threat_management.ThreatManagementDatabase
):
raise ValueError("Could not deserialize ThreatManagementDatabase: wrong type!")
return threat_management_database
[docs]
def undictify_threat_management_database(
dictified_threat_management_database: dict,
) -> threat_management.ThreatManagementDatabase:
threat_management_database: threat_management.ThreatManagementDatabase = (
threat_management_container_schema.load(
dictified_threat_management_database
).threat_management_data
)
if not isinstance(
threat_management_database, threat_management.ThreatManagementDatabase
):
raise ValueError("Could not deserialize ThreatManagementDatabase: wrong type!")
return threat_management_database
[docs]
def serialize_threat_management_database(
threat_management_database: threat_management.ThreatManagementDatabase,
) -> str:
serialized_threat_management_database: str = (
threat_management_container_schema.dumps(
ThreatManagementContainer(threat_management_database), indent=2
)
)
return serialized_threat_management_database
[docs]
def dictify_threat_management_database(
threat_management_database: threat_management.ThreatManagementDatabase,
) -> str:
dictified_threat_management_database: dict = (
threat_management_container_schema.dump(
ThreatManagementContainer(threat_management_database)
)
)
return dictified_threat_management_database
__all__ = [
"deserialize_dfd",
"serialize_dfd",
"undictify_dfd",
"dictify_dfd",
"deserialize_threats",
"serialize_threats",
"undictify_threats",
"dictify_threats",
"deserialize_threat_management_database",
"serialize_threat_management_database",
"undictify_threat_management_database",
"dictify_threat_management_database",
]