Source code for flowstrider.rules.builtin.bsi_rules.bsi_rule_collection

# SPDX-FileCopyrightText: 2025 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: BSD-3-Clause

import re

from flowstrider import settings
from flowstrider.models import dataflowdiagram
from flowstrider.models.common_models import Cluster, Edge, Node
from flowstrider.rules import attributes_dict
from flowstrider.rules.common_rules import (
    DataflowDiagramRuleCollection,
    EdgeRule,
    NodeRule,
    meet_any_requirement,
)

# Rules derived from the basis of the BSI checklists
# ...'Checklisten zum IT-Grundschutz-Kompendium (Edition 2023)'
# ...https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Grundschutz/IT-GS-Kompendium/checklisten_2023.html

# Current severity is based on the description of the rules and gives 2.0 for MUST(MUSS)
# ...and 1.0 for SHOULD(SOLL) as stated in the rule (2.0 if both)


# Helper method
def get_smallest_cluster(
    edge: Edge, dfd: dataflowdiagram.DataflowDiagram, just_sink: bool
) -> Cluster:
    clusters = dfd.get_clusters_for_node_id(edge.sink_id)
    if not just_sink:
        clusters += dfd.get_clusters_for_node_id(edge.source_id)

    if not clusters:
        return None

    return min(clusters, key=lambda cluster: len(cluster.node_ids))


# Helper method
def does_edge_cross_cluster_boundary(edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
    smallest_involved_cluster = get_smallest_cluster(edge, dfd, False)

    if not smallest_involved_cluster:
        return True

    if (
        edge.sink_id in smallest_involved_cluster.node_ids
        and edge.source_id in smallest_involved_cluster.node_ids
    ):
        return False

    return True


# Helper method
def does_edge_have_external_source(edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
    smallest_sink_cluster = get_smallest_cluster(edge, dfd, True)

    if not smallest_sink_cluster:
        return True

    if edge.source_id in smallest_sink_cluster.node_ids:
        return False

    return True


class UntrustworthyDataflowEdgeRule(EdgeRule):
    BASE_SEVERITY = 2.0  # Don't change base severity
    severity = BASE_SEVERITY  # Change severity if needed in the _test method
    # ...(so that it can vary per threat)

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out.gettext
        cls.display_name = _("Untrustworthy Data Flow")
        cls.short_description = _(
            "Transportprotokoll für Verbindungen außerhalb der Vertrauensgrenze"
        )
        cls.long_description = _(
            "Datenflüsse, die Vertrauensgrenzen überschreiten, MÜSSEN "
            "gemäß Richtlinien APP.3.2.A11 und NET.1.1.A7 des "
            "IT-Grundschutzkompendium des BSI, "
            "ein sicheres Transportprotokoll, wie TLS, einsetzen, "
            "um die Vertraulichkeit der Daten zu bewahren."
        )
        cls.attribute_names = ["transport_protocol"]
        cls.allowed_protocols = attributes_dict.attributes[
            cls.attribute_names[0]
        ].accepted_values

        cls.mitigation_options = [_("TLS einsetzen")]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[0]].display_name
            + _(": one of {")
            + ", ".join(cls.allowed_protocols)
            + "}"
        )

    @classmethod
    def _test(cls, edge: Edge, dfd: dataflowdiagram.DataflowDiagram) -> bool:
        crosses_boundary = does_edge_cross_cluster_boundary(edge, dfd)

        uses_TLS = cls.attribute_names[0] in edge.attributes and meet_any_requirement(
            edge.attributes[cls.attribute_names[0]], cls.allowed_protocols
        )

        return crosses_boundary and not uses_TLS


class ConfidentialDataflowEdgeRule(EdgeRule):
    BASE_SEVERITY = 1.0
    severity = BASE_SEVERITY

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out.gettext
        cls.display_name = _("Confidential Data Flow")
        cls.short_description = _("Transportprotokoll für vertrauliche Daten")
        cls.long_description = _(
            "Gemäß u.a. Richtlinie APP.2.1.A13 des IT-Grundschutzkompendiums des BSI "
            "SOLLTEN Datenflüsse, die innerhalb der Vertrauensgrenzen vertrauliche "
            "Daten übertragen, ein sicheres Transportprotokoll, wie TLS, einsetzen."
        )
        cls.attribute_names = ["handles_confidential_data", "transport_protocol"]
        cls.allowed_protocols = attributes_dict.attributes[
            cls.attribute_names[1]
        ].accepted_values

        cls.mitigation_options = [_("TLS einsetzen")]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[1]].display_name
            + _(": one of {")
            + ", ".join(cls.allowed_protocols)
            + "}"
        )

    @classmethod
    def _test(cls, edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
        is_inside_boundary = not does_edge_cross_cluster_boundary(
            edge, dfd
        ) and not does_edge_have_external_source(edge, dfd)

        handles_confidential_data = (
            cls.attribute_names[0] in edge.attributes
            and not meet_any_requirement(
                edge.attributes[cls.attribute_names[0]], [False]
            )
        ) or cls.attribute_names[0] not in edge.attributes

        uses_TLS = cls.attribute_names[1] in edge.attributes and meet_any_requirement(
            edge.attributes[cls.attribute_names[1]], cls.allowed_protocols
        )

        return is_inside_boundary and handles_confidential_data and not uses_TLS


class SecureHTTPConfigEdgeRule(EdgeRule):
    BASE_SEVERITY = 1.0
    severity = BASE_SEVERITY

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out.gettext
        cls.display_name = _("Secure HTTP Configuration")
        cls.short_description = _("Sichere HTTP-Konfiguration bei Webanwendungen")
        cls.long_description = _(
            "Gemäß Richtlinien CON.10.A14, APP.3.1.A21 des IT-Grundschutzkompendium "
            "des BSI SOLLTEN zum Schutz vor Clickjacking, Cross-Site-Scripting und "
            "anderen Angriffen geeignete HTTP-Response-Header verwendet werden. "
            "Mindestens Content-Security-Policy, Strict-Transport-Security, "
            "Content-Type, X-Content-Options und Cache-Control. Die HTTP-Header "
            "SOLLTEN auf die Webanwendung abgestimmt werden und SOLLTEN so restriktiv "
            "wie möglilch sein."
        )
        cls.attribute_names = [
            "transport_protocol",
            "http_content_security_policy",
            "http_strict_transport_security",
            "http_content_type",
            "http_x_content_options",
            "http_cache_control",
        ]
        cls.checked_protocols = attributes_dict.attributes[
            cls.attribute_names[0]
        ].accepted_values

        cls.mitigation_options = [
            _("Prüfen, dass alle erforderlichen HTTP-Response-Header gesetzt sind")
        ]

        cls.requirement = ""
        for i in range(1, 6):
            cls.requirement += (
                attributes_dict.attributes[cls.attribute_names[i]].display_name
                + " = True"
            )
            if i < 5:
                cls.requirement += ", "

    @classmethod
    def _test(cls, edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
        is_https_request = (
            cls.attribute_names[0] in edge.attributes
            and meet_any_requirement(
                edge.attributes[cls.attribute_names[0]], cls.checked_protocols
            )
            or cls.attribute_names[0] not in edge.attributes
        )

        sets_required_headers = True

        for i in range(1, 6):
            if cls.attribute_names[
                i
            ] not in edge.attributes or not meet_any_requirement(
                edge.attributes[cls.attribute_names[i]], [True]
            ):
                sets_required_headers = False

        return is_https_request and not sets_required_headers


class IntegrityOfExternalEntitiesEdgeRule(EdgeRule):
    BASE_SEVERITY = 2.0
    severity = BASE_SEVERITY

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out.gettext
        cls.display_name = _("Integrity of External Entities")
        cls.short_description = _("Integritätsprüfung externer Elemente")
        cls.long_description = _(
            "Gemäß Richtlinie CON.8.A20 des IT-Grundschutzkompendium des BSI, "
            "MÜSSEN externe Komponeneten und Daten von externen Elementen auf ihre "
            "Integrität und Schwachstellen geprüft werden. Die Integrität MUSS "
            "mittels Prüfsummen oder kryptografischen Zertifikaten überprügt werden. "
            "Es SOLLTEN keine veralteten Versionen von externen Komponenten verwendet "
            "werden."
        )
        cls.attribute_names = ["integrity_check"]
        cls.allowed_checks = attributes_dict.attributes[
            cls.attribute_names[0]
        ].accepted_values

        cls.mitigation_options = [
            _("Prüfsummen oder digitale Zertifikate zur Integritätsprüfung einsetzen")
        ]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[0]].display_name
            + _(": one of {")
            + ", ".join(cls.allowed_checks)
            + "}"
        )

    @classmethod
    def _test(cls, edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
        has_external_source = does_edge_have_external_source(edge, dfd)

        uses_allowed_check = cls.attribute_names[
            0
        ] in edge.attributes and meet_any_requirement(
            edge.attributes[cls.attribute_names[0]], cls.allowed_checks
        )

        return has_external_source and not uses_allowed_check


class UseOfProxiesEdgeRule(EdgeRule):
    BASE_SEVERITY = 1.0
    severity = BASE_SEVERITY

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out.gettext
        cls.display_name = _("Use of Proxies")
        cls.short_description = _("Einsatz von TLS/SSL-Proxies")
        cls.long_description = _(
            "Gemäß Richtlinie DER.1.A10 des IT-Grundschutzkompendium des BSI "
            "SOLLTEN an den Übergängen zu externen Netzen TLS-/SSL-Proxies eingesetzt "
            "werden um übertragene Daten auf Malware zu prüfen. Diese Proxies SOLLTEN "
            "vor unbefugten Zugriffen geschützt werden. Sicherheitsrelevante "
            "Ereignisse SOLLTEN automatisch entdeckt werden."
        )
        cls.attribute_names = ["proxy"]

        cls.mitigation_options = [_("Proxies einsetzen")]
        cls.requirement = attributes_dict.attributes[
            cls.attribute_names[0]
        ].display_name + (" = True")

    @classmethod
    def _test(cls, edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
        crosses_boundary = does_edge_cross_cluster_boundary(edge, dfd)

        passes_through_proxy = cls.attribute_names[
            0
        ] in edge.attributes and meet_any_requirement(
            edge.attributes[cls.attribute_names[0]], [True]
        )

        return crosses_boundary and not passes_through_proxy


class LoggingDataNodeRule(NodeRule):
    BASE_SEVERITY = 1.0
    severity = BASE_SEVERITY

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out.gettext
        cls.display_name = _("Signature of Logging Data")
        cls.short_description = _("Digitale Signatur für Protokollierungsdaten")
        cls.long_description = _(
            "Gemäß Richtlinie OPS.1.1.5.A12 des IT-Grundschutzkompendiums des BSI, "
            "SOLLTEN gespeicherte Protokollierungsdaten digital signiert sein. "
            "Zu den empfohlenen Signaturverfahren gemäß der "
            "Technischen Richtlinie TR-02102 des BSI zählen: "
            "RSA, DSA, ECDSA, ECKDSA, ECGDSA, XMSS, LMS."
        )
        cls.attribute_names = ["handles_logs", "signature_scheme"]
        cls.allowed_signature_schemes = attributes_dict.attributes[
            cls.attribute_names[1]
        ].accepted_values

        cls.mitigation_options = [
            _(
                "Prüfen, dass Protokollierungsdaten mit einem"
                + " empfohlenen Verfahren signiert werden"
            )
        ]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[1]].display_name
            + _(": one of {")
            + ", ".join(cls.allowed_signature_schemes)
            + "}"
        )

    @classmethod
    def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
        is_datastore = "STRIDE:DataStore" in node.tags

        handles_logging_data = (
            cls.attribute_names[0] in node.attributes
            and not meet_any_requirement(
                node.attributes[cls.attribute_names[0]], [False]
            )
        ) or cls.attribute_names[0] not in node.attributes

        uses_allowed_signature_scheme = cls.attribute_names[
            1
        ] in node.attributes and meet_any_requirement(
            node.attributes[cls.attribute_names[1]], cls.allowed_signature_schemes
        )

        return (
            handles_logging_data and is_datastore and not uses_allowed_signature_scheme
        )


class HashedPasswordsNodeRule(NodeRule):
    BASE_SEVERITY = 2.0
    severity = BASE_SEVERITY

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out.gettext
        cls.display_name = _("Hashing of Passwords")
        cls.short_description = _("Passwörter müssen gehashed werden")
        cls.long_description = _(
            "Gemäß Richtlinien CON.8.A5, CON.10.A7, APP.3.1.A14 und APP.3.2.A5 "
            "des IT-Grundschutzkompendium "
            "MÜSSEN Passwörter serverseitig mit einem sicheren Salted Hash "
            "Verfahren gespeichert werden. Dazu zählen gemäß der "
            "Technischen Richtlinie TR-02102 des BSI: "
            "SHA-256, SHA-512/256, SHA-384, SHA-512, SHA3-256, SHA3-384, SHA3-512."
        )
        cls.attribute_names = ["stores_credentials", "hash_function"]
        cls.allowed_hash_functions = attributes_dict.attributes[
            cls.attribute_names[1]
        ].accepted_values

        cls.mitigation_options = [
            _("Prüfen, dass eines der empfohlenen Hash-Verfahren genutzt wird")
        ]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[1]].display_name
            + _(": one of {")
            + ", ".join(cls.allowed_hash_functions)
            + "}"
        )

    @classmethod
    def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
        is_datastore = "STRIDE:DataStore" in node.tags

        stores_passwords = (
            cls.attribute_names[0] in node.attributes
            and not meet_any_requirement(
                node.attributes[cls.attribute_names[0]], [False]
            )
        ) or cls.attribute_names[0] not in node.attributes

        uses_allowed_hash_function = cls.attribute_names[
            1
        ] in node.attributes and meet_any_requirement(
            node.attributes[cls.attribute_names[1]], cls.allowed_hash_functions
        )

        return is_datastore and stores_passwords and not uses_allowed_hash_function


class EncryptionOfConfidentialDataNodeRule(NodeRule):
    BASE_SEVERITY = 1.0
    severity = BASE_SEVERITY

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out.gettext
        cls.display_name = _("Encryption of Confidential Data")
        cls.short_description = _("Vertrauliche Daten müssen verschlüsselt werden")
        cls.long_description = _(
            "Gemäß der Richtlinien CON.8.A5, CON.10.A18, "
            "APP.4.3.A24 und SYS.1.8.A23 des IT-Grundschutzkompendium "
            "SOLLTEN Vertrauliche Daten mit einem sicheren kryptografischen "
            "Verfahren verschlüsselt werden "
            "Dazu zählen: AES-128, AES-192, AES-256."
        )
        cls.attribute_names = ["handles_confidential_data", "encryption_method"]
        cls.allowed_encryption = attributes_dict.attributes[
            cls.attribute_names[1]
        ].accepted_values

        cls.mitigation_options = [
            _(
                "Prüfen, dass eines der empfohlenen Verschlüsselungsverfahren genutzt "
                "wird"
            )
        ]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[1]].display_name
            + _(": one of {")
            + ", ".join(cls.allowed_encryption)
            + "}"
        )

    @classmethod
    def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
        is_datastore = "STRIDE:DataStore" in node.tags

        handles_confidential_data = (
            cls.attribute_names[0] in node.attributes
            and not meet_any_requirement(
                node.attributes[cls.attribute_names[0]], [False]
            )
        ) or cls.attribute_names[0] not in node.attributes

        uses_allowed_encryption = cls.attribute_names[
            1
        ] in node.attributes and meet_any_requirement(
            node.attributes[cls.attribute_names[1]], cls.allowed_encryption
        )

        return (
            is_datastore and handles_confidential_data and not uses_allowed_encryption
        )


class AuthenticationProtocolNodeRule(NodeRule):
    BASE_SEVERITY = 1.0
    severity = BASE_SEVERITY

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out.gettext
        cls.display_name = _("Authentication Protocols for SAN fabric")
        cls.short_description = _(
            "Sicherstellung der Speicher-Integrität durch sichere Protokolle"
        )
        cls.long_description = _(
            "Um die Integrität der Speicherlösung sicherzustellen, "
            "SOLLTEN Protokolle mit zusätzlichen Sicherheitsmerkmalen eingesetzt "
            "und entsprechend konfiguriert werden. Dazu zählen gemäß Richtlinie "
            "SYS.1.8.A24 des IT-Grunschutzkompendium: DH-CHAP, FCAP, FCPAP."
        )
        cls.attribute_names = ["is_san_fabric", "auth_protocol"]
        cls.allowed_protocols = attributes_dict.attributes[
            cls.attribute_names[1]
        ].accepted_values

        cls.mitigation_options = [
            _("Prüfen, dass eines der empfohlenen Protokolle verwendet wird")
        ]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[1]].display_name
            + _(": one of {")
            + ", ".join(cls.allowed_protocols)
            + "}"
        )

    @classmethod
    def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
        is_datastore = "STRIDE:DataStore" in node.tags

        is_san_fabric = (
            cls.attribute_names[0] in node.attributes
            and not meet_any_requirement(
                node.attributes[cls.attribute_names[0]], [False]
            )
        ) or cls.attribute_names[0] not in node.attributes

        uses_allowed_protocol = cls.attribute_names[
            1
        ] in node.attributes and meet_any_requirement(
            node.attributes[cls.attribute_names[1]], cls.allowed_protocols
        )

        return is_datastore and is_san_fabric and not uses_allowed_protocol


class MFANodeRule(NodeRule):
    BASE_SEVERITY = 1.0
    severity = BASE_SEVERITY

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out.gettext
        cls.display_name = _("Multi Factor Authentication")
        cls.short_description = _("Multi-Faktor-Authentisierung")
        cls.long_description = _(
            "Falls eine Authentisierung gemäß Richtlinien CON.10.A16, APP.3.1.A1 und "
            "CON.8.A5 des IT-Grundschutzkompendium erforderlich ist, SOLLTE die Liste "
            "der Authentisierungs-Faktoren zwei oder mehr Elemente umfassen."
        )
        cls.attribute_names = ["auth_req", "auth_factors"]

        cls.mitigation_options = [_("Authentisierungsfaktoren hinzufügen")]
        cls.requirement = attributes_dict.attributes[
            cls.attribute_names[1]
        ].display_name + _(": count >= 2")

    @classmethod
    def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
        is_Process_or_Datastore = (
            "STRIDE:DataStore" in node.tags or "STRIDE:Process" in node.tags
        )

        requires_authentication = (
            cls.attribute_names[0] in node.attributes
            and not meet_any_requirement(
                node.attributes[cls.attribute_names[0]], [False]
            )
        ) or cls.attribute_names[0] not in node.attributes

        if cls.attribute_names[1] in node.attributes:
            node_auth_factors = node.attributes[cls.attribute_names[1]]
            if type(node.attributes[cls.attribute_names[1]]) is not str:
                node_auth_factors = ", ".join(node_auth_factors)
            node_auth_factors = re.split(",|;|\\.", node_auth_factors)
        uses_multiple_factors = (
            cls.attribute_names[1] in node.attributes and len(node_auth_factors) >= 2
        )

        return (
            is_Process_or_Datastore
            and requires_authentication
            and not uses_multiple_factors
        )


class MFAHighSecurityNodeRule(NodeRule):
    BASE_SEVERITY = 1.0
    severity = BASE_SEVERITY

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out.gettext
        cls.display_name = _("Multi Factor Authentication for High Security")
        cls.short_description = _(
            "Authentisierungsfaktoren bei hohem Sicherheitsbedarf"
        )
        cls.long_description = _(
            "Falls hoher Sicherheitsbedarf besteht, SOLLTE gemäß der Richtlinien "
            "ORP.4.A21 und CON.8.A5 des IT-Grundschutzkompendiums eine sichere "
            "Mehr-Faktor-Authentisierung verwendet werden. Zum Beispiel mit "
            "kryptografischen Zertifikaten, Chipkarten oder Tokens."
        )
        cls.attribute_names = ["handles_confidential_data", "auth_factors", "auth_req"]
        cls.secure_factors = attributes_dict.attributes[
            cls.attribute_names[1]
        ].accepted_values

        cls.mitigation_options = [_("Authentisierungsfaktoren hinzufügen")]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[1]].display_name
            + _(": one of {")
            + ", ".join(cls.secure_factors)
            + "}"
        )

    @classmethod
    def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
        is_process_or_datastore = (
            "STRIDE:DataStore" in node.tags or "STRIDE:Process" in node.tags
        )
        requires_authentication = (
            cls.attribute_names[2] in node.attributes
            and not meet_any_requirement(
                node.attributes[cls.attribute_names[2]], [False]
            )
        ) or cls.attribute_names[2] not in node.attributes

        handles_confidential_data = (
            cls.attribute_names[0] in node.attributes
            and not meet_any_requirement(
                node.attributes[cls.attribute_names[0]], [False]
            )
        ) or cls.attribute_names[0] not in node.attributes

        uses_secure_factor = False
        if cls.attribute_names[1] in node.attributes:
            for factor in node.attributes[cls.attribute_names[1]]:
                if meet_any_requirement(factor, cls.secure_factors):
                    uses_secure_factor = True

        return (
            is_process_or_datastore
            and requires_authentication
            and handles_confidential_data
            and not uses_secure_factor
        )


class PermissionNodeRule(NodeRule):
    BASE_SEVERITY = 2.0
    severity = BASE_SEVERITY

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out.gettext
        cls.display_name = _("Least Privileges")
        cls.short_description = _("Nur notwendige Berechtigungen vergeben")
        cls.long_description = _(
            "Prozesse MÜSSEN gemäß Richtlinie CON.8.A5 des IT-Grundschutzkompendium "
            "mit möglichst geringen Privilegien ausgeführt werden können. Nutzer "
            "SOLLTEN nur Berechtigungen erhalten, die zur Dürchführung ihrer Aufgabe "
            "notwendig sind."
        )
        cls.attribute_names = ["req_permissions", "given_permissions"]

        cls.mitigation_options = [_("Prüfen, ob alle vergebenen Rechte notwendig sind")]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[0]].display_name
            + _(" same as ")
            + attributes_dict.attributes[cls.attribute_names[1]].display_name
        )

    @classmethod
    def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
        is_interactor_or_process = (
            "STRIDE:Interactor" in node.tags or "STRIDE:Process" in node.tags
        )
        fullfills_least_privilege = (
            cls.attribute_names[0] in node.attributes
            and cls.attribute_names[1] in node.attributes
        )
        if fullfills_least_privilege:
            for req_perm in node.attributes[cls.attribute_names[1]]:
                if not meet_any_requirement(
                    req_perm, node.attributes[cls.attribute_names[0]]
                ):
                    fullfills_least_privilege = False

        return is_interactor_or_process and not fullfills_least_privilege


class InputValidationNodeRule(NodeRule):
    BASE_SEVERITY = 2.0
    severity = BASE_SEVERITY

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out.gettext
        cls.display_name = _("Input Validation")
        cls.short_description = _("Eingabevalidierung")
        cls.long_description = _(
            "Gemäß der Richtlinien CON.8.A5 und CON.10.A8 des IT-Grundschutzkompendium "
            "MÜSSEN sämtliche Eingabedaten, Datenströme und Sekundärdaten, wie z.B. "
            "Session-IDs serverseitig validiert werden."
            # "die Liste der Eingabevalidierungen mind. gleich lang sein,"
            # + " wie die Liste der Eingabedaten."
        )
        cls.attribute_names = ["input_data", "input_validation"]

        cls.mitigation_options = [_("Alle Eingabedaten validieren")]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[0]].display_name
            + _(" and ")
            + attributes_dict.attributes[cls.attribute_names[1]].display_name
        )

    @classmethod
    def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
        is_Process = "STRIDE:Process" in node.tags

        # no_sanitization = (
        #     "input_validation" in node.attributes
        #     and "sanitization" not in node.attributes["input_validation"]
        # )
        validation_matches_input = (
            cls.attribute_names[0] in node.attributes
            and cls.attribute_names[1] in node.attributes
            and meet_any_requirement(node.attributes[cls.attribute_names[1]], [True])
            # and len(node.attributes["input_validation"])
            # >= len(node.attributes["input_data"])
        )

        return (
            is_Process and not validation_matches_input
        )  # (not no_sanitization or not validation_matches_input)


[docs] class BSIRuleCollection(DataflowDiagramRuleCollection): tags = {"bsi_rules"}
[docs] @classmethod def init_texts(cls): _ = settings.lang_out.gettext cls.name = _("BSI rule collection") cls.references = [ ( "https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Grundschutz/IT-GS-" + "Kompendium/checklisten_2023.html" ) ]
node_rules = [ HashedPasswordsNodeRule, EncryptionOfConfidentialDataNodeRule, AuthenticationProtocolNodeRule, MFANodeRule, MFAHighSecurityNodeRule, PermissionNodeRule, InputValidationNodeRule, LoggingDataNodeRule, ] edge_rules = [ UntrustworthyDataflowEdgeRule, ConfidentialDataflowEdgeRule, SecureHTTPConfigEdgeRule, IntegrityOfExternalEntitiesEdgeRule, UseOfProxiesEdgeRule, ]
__all__ = ["BSIRuleCollection"]