# SPDX-FileCopyrightText: 2025 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: BSD-3-Clause
import re
import typing
from flowstrider import settings
from flowstrider.models import dataflowdiagram
from flowstrider.models.common_models import Cluster, Edge, Node
from flowstrider.rules import attributes_dict
from flowstrider.rules.common_rules import (
DataflowDiagramRuleCollection,
EdgeRule,
NodeRule,
meet_any_requirement,
)
# Rules derived from the basis of the BSI checklists
# ...'Checklisten zum IT-Grundschutz-Kompendium (Edition 2023)'
# ...https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Grundschutz/IT-GS-Kompendium/checklisten_2023.html
# Current severity is based on the description of the rules and gives 2.0 for MUST(MUSS)
# ...and 1.0 for SHOULD(SOLL) as stated in the rule (2.0 if both)
# Helper method
def get_smallest_cluster(
edge: Edge, dfd: dataflowdiagram.DataflowDiagram, just_sink: bool
) -> typing.Union[Cluster, None]:
clusters = dfd.get_clusters_for_node_id(edge.sink_id)
if not just_sink:
clusters += dfd.get_clusters_for_node_id(edge.source_id)
if not clusters:
return None
return min(clusters, key=lambda cluster: len(cluster.node_ids))
# Helper method
def does_edge_cross_cluster_boundary(edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
smallest_involved_cluster = get_smallest_cluster(edge, dfd, False)
if not smallest_involved_cluster:
return True
if (
edge.sink_id in smallest_involved_cluster.node_ids
and edge.source_id in smallest_involved_cluster.node_ids
):
return False
return True
# Helper method
def does_edge_have_external_source(edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
smallest_sink_cluster = get_smallest_cluster(edge, dfd, True)
if not smallest_sink_cluster:
return True
if edge.source_id in smallest_sink_cluster.node_ids:
return False
return True
class UntrustworthyDataflowEdgeRule(EdgeRule):
BASE_SEVERITY = 2.0 # Supposed to be constant
severity = BASE_SEVERITY # Change severity if needed in the _test method
# ...(so that it can vary per threat)
bsi_ids = ["APP.3.2.A11", "NET.1.1.A7"]
allowed_protocols: typing.List[str] = []
@classmethod
def init_texts(cls):
_ = settings.lang_out_bsi.gettext
cls.display_name = _("Nicht vertrauenswürdiger Datenfluss")
cls.short_description = _(
"Transportprotokoll für Verbindungen außerhalb der Vertrauensgrenze"
)
cls.long_description = (
_(
"Datenflüsse, die Vertrauensgrenzen überschreiten, MÜSSEN "
"ein sicheres Transportprotokoll einsetzen, um die Vertraulichkeit der "
"Daten zu bewahren. Gemäß der Technischen Richtlinie BSI TR-02102 sind "
"das: IPsec, MLS, SRTP, SSH-2, TLS 1.2 und TLS 1.3."
)
+ "\n"
+ _("BSI IT-Grundschutzkompendium ID: ")
+ (", ".join(cls.bsi_ids))
)
cls.attribute_names = ["transport_protocol"]
cls.allowed_protocols = attributes_dict.attributes[
cls.attribute_names[0]
].accepted_values
cls.mitigation_options = [_("TLS einsetzen")]
cls.requirement = (
attributes_dict.attributes[cls.attribute_names[0]].display_name
+ _(": eines von {")
+ ", ".join(cls.allowed_protocols)
+ "}"
)
@classmethod
def _test(cls, edge: Edge, dfd: dataflowdiagram.DataflowDiagram) -> bool:
crosses_boundary = does_edge_cross_cluster_boundary(edge, dfd)
uses_TLS = cls.attribute_names[0] in edge.attributes and meet_any_requirement(
edge.attributes[cls.attribute_names[0]], cls.allowed_protocols
)
return crosses_boundary and not uses_TLS
class ConfidentialDataflowEdgeRule(EdgeRule):
BASE_SEVERITY = 1.0
severity = BASE_SEVERITY
bsi_ids = ["APP.2.1.A13"]
allowed_protocols: typing.List[str] = []
@classmethod
def init_texts(cls):
_ = settings.lang_out_bsi.gettext
cls.display_name = _("Vertraulicher Datenfluss")
cls.short_description = _("Transportprotokoll für vertrauliche Daten")
cls.long_description = (
_(
"Datenflüsse, die innerhalb der Vertrauensgrenzen "
"vertrauliche Daten übertragen SOLLTEN ein sicheres, etabliertes "
"Transportprotokoll einsetzen. Gemäß der Technischen Richtlinie BSI "
"TR-02102 sind das: IPsec, MLS, SRTP, SSH-2, TLS 1.2 und TLS 1.3."
)
+ "\n"
+ _("BSI IT-Grundschutzkompendium ID: ")
+ (", ".join(cls.bsi_ids))
)
cls.attribute_names = ["handles_confidential_data", "transport_protocol"]
cls.allowed_protocols = attributes_dict.attributes[
cls.attribute_names[1]
].accepted_values
cls.mitigation_options = [_("TLS einsetzen")]
cls.requirement = (
attributes_dict.attributes[cls.attribute_names[1]].display_name
+ _(": eines von {")
+ ", ".join(cls.allowed_protocols)
+ "}"
)
@classmethod
def _test(cls, edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
is_inside_boundary = not does_edge_cross_cluster_boundary(
edge, dfd
) and not does_edge_have_external_source(edge, dfd)
handles_confidential_data = (
cls.attribute_names[0] in edge.attributes
and not meet_any_requirement(
edge.attributes[cls.attribute_names[0]], [False]
)
) or cls.attribute_names[0] not in edge.attributes
uses_TLS = cls.attribute_names[1] in edge.attributes and meet_any_requirement(
edge.attributes[cls.attribute_names[1]], cls.allowed_protocols
)
return is_inside_boundary and handles_confidential_data and not uses_TLS
class SecureHTTPConfigEdgeRule(EdgeRule):
BASE_SEVERITY = 1.0
severity = BASE_SEVERITY
bsi_ids = ["CON.10.A14", "APP.3.1.A21"]
checked_protocols: typing.List[str] = []
@classmethod
def init_texts(cls):
_ = settings.lang_out_bsi.gettext
cls.display_name = _("Sichere HTTP Konfiguration")
cls.short_description = _("Sichere HTTP-Konfiguration bei Webanwendungen")
cls.long_description = (
_(
"Es SOLLTEN zum Schutz vor Clickjacking, "
"Cross-Site-Scripting und anderen Angriffen geeignete "
"HTTP-Response-Header verwendet werden. Mindestens "
"Content-Security-Policy, Strict-Transport-Security, Content-Type, "
"X-Content-Type-Options und Cache-Control. Die HTTP-Header SOLLTEN auf "
"die Webanwendung abgestimmt werden und SOLLTEN so restriktiv wie "
"möglich sein. Für Cookies SOLLTEN die Attribute Secure, SameSite und "
"HttpOnly gesetzt sein."
)
+ "\n"
+ _("BSI IT-Grundschutzkompendium ID: ")
+ (", ".join(cls.bsi_ids))
)
cls.attribute_names = [
"transport_protocol",
"http_cache_control",
"http_content_security_policy",
"http_content_type",
"http_cookie_httponly",
"http_cookie_samesite",
"http_cookie_secure",
"http_strict_transport_security",
"http_x_content_type_options",
]
cls.checked_protocols = []
if (
"HTTPS"
in attributes_dict.attributes[cls.attribute_names[0]].accepted_values
):
cls.checked_protocols.append("HTTPS")
if (
"TLS 1.2"
in attributes_dict.attributes[cls.attribute_names[0]].accepted_values
):
cls.checked_protocols.append("TLS 1.2")
if (
"TLS 1.3"
in attributes_dict.attributes[cls.attribute_names[0]].accepted_values
):
cls.checked_protocols.append("TLS 1.3")
cls.mitigation_options = [
_("Prüfen, dass alle erforderlichen HTTP-Response-Header gesetzt sind")
]
cls.requirement = ""
for i in range(1, 9):
cls.requirement += (
attributes_dict.attributes[cls.attribute_names[i]].display_name
+ " = True"
)
if i < 8:
cls.requirement += ", "
@classmethod
def _test(cls, edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
is_https_request = (
cls.attribute_names[0] in edge.attributes
and meet_any_requirement(
edge.attributes[cls.attribute_names[0]], cls.checked_protocols
)
or cls.attribute_names[0] not in edge.attributes
)
sets_required_headers = True
for i in range(1, 9):
if cls.attribute_names[
i
] not in edge.attributes or not meet_any_requirement(
edge.attributes[cls.attribute_names[i]], [True]
):
sets_required_headers = False
return is_https_request and not sets_required_headers
class IntegrityOfExternalEntitiesEdgeRule(EdgeRule):
BASE_SEVERITY = 2.0
severity = BASE_SEVERITY
bsi_ids = ["CON.8.A20"]
allowed_checks: typing.List[str] = []
@classmethod
def init_texts(cls):
_ = settings.lang_out_bsi.gettext
cls.display_name = _("Integrität von Externen Entitäten")
cls.short_description = _("Integritätsprüfung externer Elemente")
cls.long_description = (
_(
"Externe Komponeneten und Daten von externen Elementen MÜSSEN auf ihre "
"Integrität und Schwachstellen geprüft werden. Die Integrität MUSS "
"mittels Prüfsummen oder kryptografischen Zertifikaten überprüft "
"werden. Es SOLLTEN keine veralteten Versionen von externen "
"Komponenten verwendet werden."
)
+ "\n"
+ _("BSI IT-Grundschutzkompendium ID: ")
+ (", ".join(cls.bsi_ids))
)
cls.attribute_names = ["integrity_check"]
cls.allowed_checks = attributes_dict.attributes[
cls.attribute_names[0]
].accepted_values
cls.mitigation_options = [
_("Prüfsummen oder digitale Zertifikate zur Integritätsprüfung einsetzen")
]
cls.requirement = (
attributes_dict.attributes[cls.attribute_names[0]].display_name
+ _(": eines von {")
+ ", ".join(cls.allowed_checks)
+ "}"
)
@classmethod
def _test(cls, edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
has_external_source = does_edge_have_external_source(edge, dfd)
uses_allowed_check = cls.attribute_names[
0
] in edge.attributes and meet_any_requirement(
edge.attributes[cls.attribute_names[0]], cls.allowed_checks
)
return has_external_source and not uses_allowed_check
class UseOfProxiesEdgeRule(EdgeRule):
BASE_SEVERITY = 1.0
severity = BASE_SEVERITY
bsi_ids = ["DER.1.A10"]
@classmethod
def init_texts(cls):
_ = settings.lang_out_bsi.gettext
cls.display_name = _("Einsatz von Proxies")
cls.short_description = _("Einsatz von TLS/SSL-Proxies")
cls.long_description = (
_(
"An den Übergängen zu externen Netzen SOLLTEN TLS-/SSL-Proxies "
"eingesetzt werden, um übertragene Daten auf Malware zu prüfen. Diese "
"Proxies SOLLTEN vor unbefugten Zugriffen geschützt werden. "
"Sicherheitsrelevante Ereignisse SOLLTEN automatisch entdeckt werden."
)
+ "\n"
+ _("BSI IT-Grundschutzkompendium ID: ")
+ (", ".join(cls.bsi_ids))
)
cls.attribute_names = ["proxy"]
cls.mitigation_options = [_("Proxies einsetzen")]
cls.requirement = attributes_dict.attributes[
cls.attribute_names[0]
].display_name + (" = True")
@classmethod
def _test(cls, edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
crosses_boundary = does_edge_cross_cluster_boundary(edge, dfd)
passes_through_proxy = cls.attribute_names[
0
] in edge.attributes and meet_any_requirement(
edge.attributes[cls.attribute_names[0]], [True]
)
return crosses_boundary and not passes_through_proxy
class LoggingDataNodeRule(NodeRule):
BASE_SEVERITY = 1.0
severity = BASE_SEVERITY
bsi_ids = ["OPS.1.1.5.A12"]
allowed_signature_schemes: typing.List[str] = []
@classmethod
def init_texts(cls):
_ = settings.lang_out_bsi.gettext
cls.display_name = _("Signatur von Protokollierungsdaten")
cls.short_description = _("Digitale Signatur für Protokollierungsdaten")
cls.long_description = (
_(
"Gespeicherte Protokollierungsdaten SOLLTEN digital signiert sein. "
"Zu den empfohlenen Signaturverfahren gemäß der "
"Technischen Richtlinie TR-02102 des BSI zählen: "
"RSA, DSA, ECDSA, ECKDSA, ECKCDSA, ECGDSA."
)
+ "\n"
+ _("BSI IT-Grundschutzkompendium ID: ")
+ (", ".join(cls.bsi_ids))
)
cls.attribute_names = ["handles_logs", "signature_scheme"]
cls.allowed_signature_schemes = attributes_dict.attributes[
cls.attribute_names[1]
].accepted_values
cls.mitigation_options = [
_(
"Prüfen, dass Protokollierungsdaten mit einem"
+ " empfohlenen Verfahren signiert werden"
)
]
cls.requirement = (
attributes_dict.attributes[cls.attribute_names[1]].display_name
+ _(": eines von {")
+ ", ".join(cls.allowed_signature_schemes)
+ "}"
)
@classmethod
def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
is_datastore = "STRIDE:DataStore" in node.tags
handles_logging_data = (
cls.attribute_names[0] in node.attributes
and not meet_any_requirement(
node.attributes[cls.attribute_names[0]], [False]
)
) or cls.attribute_names[0] not in node.attributes
uses_allowed_signature_scheme = cls.attribute_names[
1
] in node.attributes and meet_any_requirement(
node.attributes[cls.attribute_names[1]], cls.allowed_signature_schemes
)
return (
handles_logging_data and is_datastore and not uses_allowed_signature_scheme
)
class HashedPasswordsNodeRule(NodeRule):
BASE_SEVERITY = 2.0
severity = BASE_SEVERITY
bsi_ids = [
"ORP.4.A23",
"CON.8.A5",
"CON.10.A7",
"APP.3.1.A14",
"APP.3.2.A5",
"APP.4.2.A13",
"APP.4.3.A3",
"SYS.1.6.A8",
"NET.3.1.A1",
"NET.3.2.A4",
]
allowed_hash_functions: typing.List[str] = []
@classmethod
def init_texts(cls):
_ = settings.lang_out_bsi.gettext
cls.display_name = _("Hashing von Passwörtern")
cls.short_description = _("Passwörter müssen gehashed werden")
cls.long_description = (
_(
"Passwörter DÜRFEN NICHT im Klartext gespeichert werden. Passwörter "
"MÜSSEN serverseitig mit einem sicheren Salted Hash "
"Verfahren gespeichert werden. Dazu zählen gemäß der "
"Technischen Richtlinie TR-02102 des BSI: "
"SHA-256, SHA-512/256, SHA-384, SHA-512, SHA3-256, SHA3-384, SHA3-512."
)
+ "\n"
+ _("BSI IT-Grundschutzkompendium ID: ")
+ (", ".join(cls.bsi_ids))
)
cls.attribute_names = ["stores_credentials", "hash_function"]
cls.allowed_hash_functions = attributes_dict.attributes[
cls.attribute_names[1]
].accepted_values
cls.mitigation_options = [
_("Prüfen, dass eines der empfohlenen Hash-Verfahren genutzt wird")
]
cls.requirement = (
attributes_dict.attributes[cls.attribute_names[1]].display_name
+ _(": eines von {")
+ ", ".join(cls.allowed_hash_functions)
+ "}"
)
@classmethod
def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
is_datastore = "STRIDE:DataStore" in node.tags
stores_passwords = (
cls.attribute_names[0] in node.attributes
and not meet_any_requirement(
node.attributes[cls.attribute_names[0]], [False]
)
) or cls.attribute_names[0] not in node.attributes
uses_allowed_hash_function = cls.attribute_names[
1
] in node.attributes and meet_any_requirement(
node.attributes[cls.attribute_names[1]], cls.allowed_hash_functions
)
return is_datastore and stores_passwords and not uses_allowed_hash_function
class EncryptionOfConfidentialDataNodeRule(NodeRule):
BASE_SEVERITY = 1.0
severity = BASE_SEVERITY
bsi_ids = ["CON.8.A5", "CON.10.A18"]
allowed_encryption: typing.List[str] = []
@classmethod
def init_texts(cls):
_ = settings.lang_out_bsi.gettext
cls.display_name = _("Verschlüsselung vertraulicher Daten")
cls.short_description = _("Vertrauliche Daten müssen verschlüsselt werden")
cls.long_description = (
_(
"Vertrauliche Daten SOLLTEN mit einem sicheren kryptografischen "
"Verfahren verschlüsselt werden. Dazu zählen gemäß der "
"Technischen Richtlinie TR-02102 des BSI: "
"AES-128, AES-192, AES-256."
)
+ "\n"
+ _("BSI IT-Grundschutzkompendium ID: ")
+ (", ".join(cls.bsi_ids))
)
cls.attribute_names = ["handles_confidential_data", "encryption_method"]
cls.allowed_encryption = attributes_dict.attributes[
cls.attribute_names[1]
].accepted_values
cls.mitigation_options = [
_(
"Prüfen, dass eines der empfohlenen Verschlüsselungsverfahren genutzt "
"wird"
)
]
cls.requirement = (
attributes_dict.attributes[cls.attribute_names[1]].display_name
+ _(": eines von {")
+ ", ".join(cls.allowed_encryption)
+ "}"
)
@classmethod
def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
is_datastore = "STRIDE:DataStore" in node.tags
handles_confidential_data = (
cls.attribute_names[0] in node.attributes
and not meet_any_requirement(
node.attributes[cls.attribute_names[0]], [False]
)
) or cls.attribute_names[0] not in node.attributes
uses_allowed_encryption = cls.attribute_names[
1
] in node.attributes and meet_any_requirement(
node.attributes[cls.attribute_names[1]], cls.allowed_encryption
)
return (
is_datastore and handles_confidential_data and not uses_allowed_encryption
)
class AuthenticationProtocolNodeRule(NodeRule):
BASE_SEVERITY = 1.0
severity = BASE_SEVERITY
bsi_ids = ["SYS.1.8.A24"]
allowed_protocols: typing.List[str] = []
@classmethod
def init_texts(cls):
_ = settings.lang_out_bsi.gettext
cls.display_name = _("Authentifizierungsprotokolle für SAN fabric")
cls.short_description = _(
"Sicherstellung der Speicher-Integrität durch sichere Protokolle"
)
cls.long_description = (
_(
"Um die Integrität der Speicherlösung sicherzustellen, "
"SOLLTEN Protokolle mit zusätzlichen Sicherheitsmerkmalen eingesetzt "
"und entsprechend konfiguriert werden. Dazu zählen: DH-CHAP, FCAP, und "
"FCPAP."
)
+ "\n"
+ _("BSI IT-Grundschutzkompendium ID: ")
+ (", ".join(cls.bsi_ids))
)
cls.attribute_names = ["is_san_fabric", "auth_protocol"]
cls.allowed_protocols = attributes_dict.attributes[
cls.attribute_names[1]
].accepted_values
cls.mitigation_options = [
_("Prüfen, dass eines der empfohlenen Protokolle verwendet wird")
]
cls.requirement = (
attributes_dict.attributes[cls.attribute_names[1]].display_name
+ _(": eines von {")
+ ", ".join(cls.allowed_protocols)
+ "}"
)
@classmethod
def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
is_datastore = "STRIDE:DataStore" in node.tags
is_san_fabric = (
cls.attribute_names[0] in node.attributes
and not meet_any_requirement(
node.attributes[cls.attribute_names[0]], [False]
)
) or cls.attribute_names[0] not in node.attributes
uses_allowed_protocol = cls.attribute_names[
1
] in node.attributes and meet_any_requirement(
node.attributes[cls.attribute_names[1]], cls.allowed_protocols
)
return is_datastore and is_san_fabric and not uses_allowed_protocol
class MFANodeRule(NodeRule):
BASE_SEVERITY = 1.0
severity = BASE_SEVERITY
bsi_ids = [""]
# TODO
# Diese Regel ist in der hier notierten Form so nicht im Grundschutzkompendium zu
# ...finden. Das nahekommenste ist CON.10.A16 (Entwicklung von Webanwendungen):
# ..."Es SOLLTE eine Mehr-Faktor-Authentisierung implementiert werden." Aber nicht,
# ...wann sie benutzt werden soll. Möglicherweise sollte diese Regel komplett
# ...entfernt werden
@classmethod
def init_texts(cls):
_ = settings.lang_out_bsi.gettext
cls.display_name = _("Multi-Faktor-Authentisierung")
cls.short_description = _("Multi-Faktor-Authentisierung")
cls.long_description = (
_(
"Falls eine Authentisierung gemäß Richtlinien CON.10.A16, APP.3.1.A1 "
"und CON.8.A5 des IT-Grundschutzkompendium erforderlich ist, SOLLTE "
"die Liste der Authentisierungs-Faktoren zwei oder mehr Elemente "
"umfassen."
)
+ "\n"
+ _("BSI IT-Grundschutzkompendium ID: ")
+ (", ".join(cls.bsi_ids))
)
cls.attribute_names = ["auth_req", "auth_factors"]
cls.mitigation_options = [_("Authentisierungsfaktoren hinzufügen")]
cls.requirement = attributes_dict.attributes[
cls.attribute_names[1]
].display_name + _(": Anzahl >= 2")
@classmethod
def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
is_Process_or_Datastore = (
"STRIDE:DataStore" in node.tags or "STRIDE:Process" in node.tags
)
requires_authentication = (
cls.attribute_names[0] in node.attributes
and not meet_any_requirement(
node.attributes[cls.attribute_names[0]], [False]
)
) or cls.attribute_names[0] not in node.attributes
if cls.attribute_names[1] in node.attributes:
node_auth_factors = node.attributes[cls.attribute_names[1]]
if type(node.attributes[cls.attribute_names[1]]) is not str:
node_auth_factors = ", ".join(node_auth_factors)
node_auth_factors = re.split(",|;|\\.", node_auth_factors)
uses_multiple_factors = (
cls.attribute_names[1] in node.attributes and len(node_auth_factors) >= 2
)
return (
is_Process_or_Datastore
and requires_authentication
and not uses_multiple_factors
)
class MFAHighSecurityNodeRule(NodeRule):
BASE_SEVERITY = 1.0
severity = BASE_SEVERITY
bsi_ids = ["ORP.4.A21", "CON.8.A5"]
secure_factors: typing.List[str] = []
@classmethod
def init_texts(cls):
_ = settings.lang_out_bsi.gettext
cls.display_name = _("Multi-Faktor-Authentisierung bei hohem Sicherheitsbedarf")
cls.short_description = _(
"Authentisierungsfaktoren bei hohem Sicherheitsbedarf"
)
cls.long_description = (
_(
"Falls hoher Sicherheitsbedarf besteht, SOLLTE eine sichere "
"Mehr-Faktor-Authentisierung verwendet werden. Zum Beispiel mit "
"kryptografischen Zertifikaten, Chipkarten oder Tokens."
)
+ "\n"
+ _("BSI IT-Grundschutzkompendium ID: ")
+ (", ".join(cls.bsi_ids))
)
cls.attribute_names = ["handles_confidential_data", "auth_factors", "auth_req"]
cls.secure_factors = attributes_dict.attributes[
cls.attribute_names[1]
].accepted_values
cls.mitigation_options = [_("Authentisierungsfaktoren hinzufügen")]
cls.requirement = (
attributes_dict.attributes[cls.attribute_names[1]].display_name
+ _(": eines von {")
+ ", ".join(cls.secure_factors)
+ "}"
)
@classmethod
def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
is_process_or_datastore = (
"STRIDE:DataStore" in node.tags or "STRIDE:Process" in node.tags
)
requires_authentication = (
cls.attribute_names[2] in node.attributes
and not meet_any_requirement(
node.attributes[cls.attribute_names[2]], [False]
)
) or cls.attribute_names[2] not in node.attributes
handles_confidential_data = (
cls.attribute_names[0] in node.attributes
and not meet_any_requirement(
node.attributes[cls.attribute_names[0]], [False]
)
) or cls.attribute_names[0] not in node.attributes
uses_secure_factor = False
if cls.attribute_names[1] in node.attributes:
for factor in node.attributes[cls.attribute_names[1]]:
if meet_any_requirement(factor, cls.secure_factors):
uses_secure_factor = True
return (
is_process_or_datastore
and requires_authentication
and handles_confidential_data
and not uses_secure_factor
)
class PermissionNodeRule(NodeRule):
BASE_SEVERITY = 2.0
severity = BASE_SEVERITY
bsi_ids = ["CON.8.A5"]
@classmethod
def init_texts(cls):
_ = settings.lang_out_bsi.gettext
cls.display_name = _("Geringste Berechtigungen")
cls.short_description = _("Nur notwendige Berechtigungen vergeben")
cls.long_description = (
_(
"Prozesse MÜSSEN mit möglichst geringen Privilegien "
"ausgeführt werden können. Nutzer SOLLTEN nur Berechtigungen "
"erhalten, die zur Dürchführung ihrer Aufgabe notwendig sind."
)
+ "\n"
+ _("BSI IT-Grundschutzkompendium ID: ")
+ (", ".join(cls.bsi_ids))
)
cls.attribute_names = ["req_permissions", "given_permissions"]
cls.mitigation_options = [_("Prüfen, ob alle vergebenen Rechte notwendig sind")]
cls.requirement = (
attributes_dict.attributes[cls.attribute_names[0]].display_name
+ _(" gleich wie ")
+ attributes_dict.attributes[cls.attribute_names[1]].display_name
)
@classmethod
def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
is_interactor_or_process = (
"STRIDE:Interactor" in node.tags or "STRIDE:Process" in node.tags
)
fullfills_least_privilege = (
cls.attribute_names[0] in node.attributes
and cls.attribute_names[1] in node.attributes
)
if fullfills_least_privilege:
for req_perm in node.attributes[cls.attribute_names[1]]:
if not meet_any_requirement(
req_perm, node.attributes[cls.attribute_names[0]]
):
fullfills_least_privilege = False
return is_interactor_or_process and not fullfills_least_privilege
class InputValidationNodeRule(NodeRule):
BASE_SEVERITY = 2.0
severity = BASE_SEVERITY
bsi_ids = ["CON.8.A5", "CON.10.A8"]
@classmethod
def init_texts(cls):
_ = settings.lang_out_bsi.gettext
cls.display_name = _("Eingabevalidierung")
cls.short_description = _("Eingabevalidierung")
cls.long_description = (
_(
"Sämtliche an eine Webanwendung übergebenen Daten sind potentiell "
"gefährlich. Sämtliche Eingabedaten, Datenströme und Sekundärdaten, "
"wie z.B. Session-IDs MÜSSEN vor der Weiterverarbeitung serverseitig "
"validiert werden. Fehleingaben SOLLTEN möglichst nicht automatisch "
"behandelt werden (Sanitizing). Lässt es sich doch nicht vermeiden, "
"MUSS Sanitizing sicher umgesetzt werden."
)
+ "\n"
+ _("BSI IT-Grundschutzkompendium ID: ")
+ (", ".join(cls.bsi_ids))
)
cls.attribute_names = [
"input_data",
"input_validation",
"sanitization",
"sanitization_secure",
]
cls.mitigation_options = [_("Alle Eingabedaten validieren")]
cls.requirement = (
attributes_dict.attributes[cls.attribute_names[0]].display_name
+ _(" und ")
+ attributes_dict.attributes[cls.attribute_names[1]].display_name
+ _(" und ")
+ "("
+ _("nicht ")
+ attributes_dict.attributes[cls.attribute_names[2]].display_name
+ _(" oder ")
+ "("
+ attributes_dict.attributes[cls.attribute_names[2]].display_name
+ _(" und ")
+ attributes_dict.attributes[cls.attribute_names[3]].display_name
+ "))"
)
@classmethod
def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
is_Process = "STRIDE:Process" in node.tags
validation_matches_input = (
cls.attribute_names[0] in node.attributes
and cls.attribute_names[1] in node.attributes
and meet_any_requirement(node.attributes[cls.attribute_names[1]], [True])
)
sanitization = cls.attribute_names[
2
] not in node.attributes or not meet_any_requirement(
node.attributes[cls.attribute_names[2]], [False]
)
sanitization_secure = cls.attribute_names[
3
] in node.attributes and meet_any_requirement(
node.attributes[cls.attribute_names[3]], [True]
)
return is_Process and (
not validation_matches_input or (sanitization and not sanitization_secure)
)
[docs]
class BSIRuleCollection(DataflowDiagramRuleCollection):
tag = "bsi_rules"
[docs]
@classmethod
def init_texts(cls):
_ = settings.lang_out_bsi.gettext
cls.name = "BSI"
cls.set_rules_ruleset_name()
cls.references = [
(
"https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Grundschutz/IT-GS-"
+ "Kompendium/checklisten_2023.html"
)
]
node_rules = [
HashedPasswordsNodeRule,
EncryptionOfConfidentialDataNodeRule,
AuthenticationProtocolNodeRule,
MFANodeRule,
MFAHighSecurityNodeRule,
PermissionNodeRule,
InputValidationNodeRule,
LoggingDataNodeRule,
]
edge_rules = [
UntrustworthyDataflowEdgeRule,
ConfidentialDataflowEdgeRule,
SecureHTTPConfigEdgeRule,
IntegrityOfExternalEntitiesEdgeRule,
UseOfProxiesEdgeRule,
]
__all__ = ["BSIRuleCollection"]