Source code for flowstrider.rules.builtin.bsi_rules.bsi_rule_collection

# SPDX-FileCopyrightText: 2025 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: BSD-3-Clause

import re
import typing

from flowstrider import settings
from flowstrider.models import dataflowdiagram
from flowstrider.models.common_models import Cluster, Edge, Node
from flowstrider.rules import attributes_dict
from flowstrider.rules.common_rules import (
    DataflowDiagramRuleCollection,
    EdgeRule,
    NodeRule,
    meet_any_requirement,
)

# Rules derived from the basis of the BSI checklists
# ...'Checklisten zum IT-Grundschutz-Kompendium (Edition 2023)'
# ...https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Grundschutz/IT-GS-Kompendium/checklisten_2023.html

# Current severity is based on the description of the rules and gives 2.0 for MUST(MUSS)
# ...and 1.0 for SHOULD(SOLL) as stated in the rule (2.0 if both)


# Helper method
def get_smallest_cluster(
    edge: Edge, dfd: dataflowdiagram.DataflowDiagram, just_sink: bool
) -> typing.Union[Cluster, None]:
    clusters = dfd.get_clusters_for_node_id(edge.sink_id)
    if not just_sink:
        clusters += dfd.get_clusters_for_node_id(edge.source_id)

    if not clusters:
        return None

    return min(clusters, key=lambda cluster: len(cluster.node_ids))


# Helper method
def does_edge_cross_cluster_boundary(edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
    smallest_involved_cluster = get_smallest_cluster(edge, dfd, False)

    if not smallest_involved_cluster:
        return True

    if (
        edge.sink_id in smallest_involved_cluster.node_ids
        and edge.source_id in smallest_involved_cluster.node_ids
    ):
        return False

    return True


# Helper method
def does_edge_have_external_source(edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
    smallest_sink_cluster = get_smallest_cluster(edge, dfd, True)

    if not smallest_sink_cluster:
        return True

    if edge.source_id in smallest_sink_cluster.node_ids:
        return False

    return True


class UntrustworthyDataflowEdgeRule(EdgeRule):
    BASE_SEVERITY = 2.0  # Supposed to be constant
    severity = BASE_SEVERITY  # Change severity if needed in the _test method
    # ...(so that it can vary per threat)
    bsi_ids = ["APP.3.2.A11", "NET.1.1.A7"]
    allowed_protocols: typing.List[str] = []

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out_bsi.gettext
        cls.display_name = _("Nicht vertrauenswürdiger Datenfluss")
        cls.short_description = _(
            "Transportprotokoll für Verbindungen außerhalb der Vertrauensgrenze"
        )
        cls.long_description = (
            _(
                "Datenflüsse, die Vertrauensgrenzen überschreiten, MÜSSEN "
                "ein sicheres Transportprotokoll einsetzen, um die Vertraulichkeit der "
                "Daten zu bewahren. Gemäß der Technischen Richtlinie BSI TR-02102 sind "
                "das: IPsec, MLS, SRTP, SSH-2, TLS 1.2 und TLS 1.3."
            )
            + "\n"
            + _("BSI IT-Grundschutzkompendium ID: ")
            + (", ".join(cls.bsi_ids))
        )
        cls.attribute_names = ["transport_protocol"]
        cls.allowed_protocols = attributes_dict.attributes[
            cls.attribute_names[0]
        ].accepted_values

        cls.mitigation_options = [_("TLS einsetzen")]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[0]].display_name
            + _(": eines von {")
            + ", ".join(cls.allowed_protocols)
            + "}"
        )

    @classmethod
    def _test(cls, edge: Edge, dfd: dataflowdiagram.DataflowDiagram) -> bool:
        crosses_boundary = does_edge_cross_cluster_boundary(edge, dfd)

        uses_TLS = cls.attribute_names[0] in edge.attributes and meet_any_requirement(
            edge.attributes[cls.attribute_names[0]], cls.allowed_protocols
        )

        return crosses_boundary and not uses_TLS


class ConfidentialDataflowEdgeRule(EdgeRule):
    BASE_SEVERITY = 1.0
    severity = BASE_SEVERITY
    bsi_ids = ["APP.2.1.A13"]
    allowed_protocols: typing.List[str] = []

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out_bsi.gettext
        cls.display_name = _("Vertraulicher Datenfluss")
        cls.short_description = _("Transportprotokoll für vertrauliche Daten")
        cls.long_description = (
            _(
                "Datenflüsse, die innerhalb der Vertrauensgrenzen "
                "vertrauliche Daten übertragen SOLLTEN ein sicheres, etabliertes "
                "Transportprotokoll einsetzen. Gemäß der Technischen Richtlinie BSI "
                "TR-02102 sind das: IPsec, MLS, SRTP, SSH-2, TLS 1.2 und TLS 1.3."
            )
            + "\n"
            + _("BSI IT-Grundschutzkompendium ID: ")
            + (", ".join(cls.bsi_ids))
        )
        cls.attribute_names = ["handles_confidential_data", "transport_protocol"]
        cls.allowed_protocols = attributes_dict.attributes[
            cls.attribute_names[1]
        ].accepted_values

        cls.mitigation_options = [_("TLS einsetzen")]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[1]].display_name
            + _(": eines von {")
            + ", ".join(cls.allowed_protocols)
            + "}"
        )

    @classmethod
    def _test(cls, edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
        is_inside_boundary = not does_edge_cross_cluster_boundary(
            edge, dfd
        ) and not does_edge_have_external_source(edge, dfd)

        handles_confidential_data = (
            cls.attribute_names[0] in edge.attributes
            and not meet_any_requirement(
                edge.attributes[cls.attribute_names[0]], [False]
            )
        ) or cls.attribute_names[0] not in edge.attributes

        uses_TLS = cls.attribute_names[1] in edge.attributes and meet_any_requirement(
            edge.attributes[cls.attribute_names[1]], cls.allowed_protocols
        )

        return is_inside_boundary and handles_confidential_data and not uses_TLS


class SecureHTTPConfigEdgeRule(EdgeRule):
    BASE_SEVERITY = 1.0
    severity = BASE_SEVERITY
    bsi_ids = ["CON.10.A14", "APP.3.1.A21"]
    checked_protocols: typing.List[str] = []

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out_bsi.gettext
        cls.display_name = _("Sichere HTTP Konfiguration")
        cls.short_description = _("Sichere HTTP-Konfiguration bei Webanwendungen")
        cls.long_description = (
            _(
                "Es SOLLTEN zum Schutz vor Clickjacking, "
                "Cross-Site-Scripting und anderen Angriffen geeignete "
                "HTTP-Response-Header verwendet werden. Mindestens "
                "Content-Security-Policy, Strict-Transport-Security, Content-Type, "
                "X-Content-Type-Options und Cache-Control. Die HTTP-Header SOLLTEN auf "
                "die Webanwendung abgestimmt werden und SOLLTEN so restriktiv wie "
                "möglich sein. Für Cookies SOLLTEN die Attribute Secure, SameSite und "
                "HttpOnly gesetzt sein."
            )
            + "\n"
            + _("BSI IT-Grundschutzkompendium ID: ")
            + (", ".join(cls.bsi_ids))
        )
        cls.attribute_names = [
            "transport_protocol",
            "http_cache_control",
            "http_content_security_policy",
            "http_content_type",
            "http_cookie_httponly",
            "http_cookie_samesite",
            "http_cookie_secure",
            "http_strict_transport_security",
            "http_x_content_type_options",
        ]
        cls.checked_protocols = []
        if (
            "HTTPS"
            in attributes_dict.attributes[cls.attribute_names[0]].accepted_values
        ):
            cls.checked_protocols.append("HTTPS")
        if (
            "TLS 1.2"
            in attributes_dict.attributes[cls.attribute_names[0]].accepted_values
        ):
            cls.checked_protocols.append("TLS 1.2")
        if (
            "TLS 1.3"
            in attributes_dict.attributes[cls.attribute_names[0]].accepted_values
        ):
            cls.checked_protocols.append("TLS 1.3")

        cls.mitigation_options = [
            _("Prüfen, dass alle erforderlichen HTTP-Response-Header gesetzt sind")
        ]

        cls.requirement = ""
        for i in range(1, 9):
            cls.requirement += (
                attributes_dict.attributes[cls.attribute_names[i]].display_name
                + " = True"
            )
            if i < 8:
                cls.requirement += ", "

    @classmethod
    def _test(cls, edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
        is_https_request = (
            cls.attribute_names[0] in edge.attributes
            and meet_any_requirement(
                edge.attributes[cls.attribute_names[0]], cls.checked_protocols
            )
            or cls.attribute_names[0] not in edge.attributes
        )

        sets_required_headers = True

        for i in range(1, 9):
            if cls.attribute_names[
                i
            ] not in edge.attributes or not meet_any_requirement(
                edge.attributes[cls.attribute_names[i]], [True]
            ):
                sets_required_headers = False

        return is_https_request and not sets_required_headers


class IntegrityOfExternalEntitiesEdgeRule(EdgeRule):
    BASE_SEVERITY = 2.0
    severity = BASE_SEVERITY
    bsi_ids = ["CON.8.A20"]
    allowed_checks: typing.List[str] = []

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out_bsi.gettext
        cls.display_name = _("Integrität von Externen Entitäten")
        cls.short_description = _("Integritätsprüfung externer Elemente")
        cls.long_description = (
            _(
                "Externe Komponeneten und Daten von externen Elementen MÜSSEN auf ihre "
                "Integrität und Schwachstellen geprüft werden. Die Integrität MUSS "
                "mittels Prüfsummen oder kryptografischen Zertifikaten überprüft "
                "werden. Es SOLLTEN keine veralteten Versionen von externen "
                "Komponenten verwendet werden."
            )
            + "\n"
            + _("BSI IT-Grundschutzkompendium ID: ")
            + (", ".join(cls.bsi_ids))
        )
        cls.attribute_names = ["integrity_check"]
        cls.allowed_checks = attributes_dict.attributes[
            cls.attribute_names[0]
        ].accepted_values

        cls.mitigation_options = [
            _("Prüfsummen oder digitale Zertifikate zur Integritätsprüfung einsetzen")
        ]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[0]].display_name
            + _(": eines von {")
            + ", ".join(cls.allowed_checks)
            + "}"
        )

    @classmethod
    def _test(cls, edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
        has_external_source = does_edge_have_external_source(edge, dfd)

        uses_allowed_check = cls.attribute_names[
            0
        ] in edge.attributes and meet_any_requirement(
            edge.attributes[cls.attribute_names[0]], cls.allowed_checks
        )

        return has_external_source and not uses_allowed_check


class UseOfProxiesEdgeRule(EdgeRule):
    BASE_SEVERITY = 1.0
    severity = BASE_SEVERITY
    bsi_ids = ["DER.1.A10"]

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out_bsi.gettext
        cls.display_name = _("Einsatz von Proxies")
        cls.short_description = _("Einsatz von TLS/SSL-Proxies")
        cls.long_description = (
            _(
                "An den Übergängen zu externen Netzen SOLLTEN TLS-/SSL-Proxies "
                "eingesetzt werden, um übertragene Daten auf Malware zu prüfen. Diese "
                "Proxies SOLLTEN vor unbefugten Zugriffen geschützt werden. "
                "Sicherheitsrelevante Ereignisse SOLLTEN automatisch entdeckt werden."
            )
            + "\n"
            + _("BSI IT-Grundschutzkompendium ID: ")
            + (", ".join(cls.bsi_ids))
        )
        cls.attribute_names = ["proxy"]

        cls.mitigation_options = [_("Proxies einsetzen")]
        cls.requirement = attributes_dict.attributes[
            cls.attribute_names[0]
        ].display_name + (" = True")

    @classmethod
    def _test(cls, edge: Edge, dfd: dataflowdiagram.DataflowDiagram):
        crosses_boundary = does_edge_cross_cluster_boundary(edge, dfd)

        passes_through_proxy = cls.attribute_names[
            0
        ] in edge.attributes and meet_any_requirement(
            edge.attributes[cls.attribute_names[0]], [True]
        )

        return crosses_boundary and not passes_through_proxy


class LoggingDataNodeRule(NodeRule):
    BASE_SEVERITY = 1.0
    severity = BASE_SEVERITY
    bsi_ids = ["OPS.1.1.5.A12"]
    allowed_signature_schemes: typing.List[str] = []

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out_bsi.gettext
        cls.display_name = _("Signatur von Protokollierungsdaten")
        cls.short_description = _("Digitale Signatur für Protokollierungsdaten")
        cls.long_description = (
            _(
                "Gespeicherte Protokollierungsdaten SOLLTEN digital signiert sein. "
                "Zu den empfohlenen Signaturverfahren gemäß der "
                "Technischen Richtlinie TR-02102 des BSI zählen: "
                "RSA, DSA, ECDSA, ECKDSA, ECKCDSA, ECGDSA."
            )
            + "\n"
            + _("BSI IT-Grundschutzkompendium ID: ")
            + (", ".join(cls.bsi_ids))
        )
        cls.attribute_names = ["handles_logs", "signature_scheme"]
        cls.allowed_signature_schemes = attributes_dict.attributes[
            cls.attribute_names[1]
        ].accepted_values

        cls.mitigation_options = [
            _(
                "Prüfen, dass Protokollierungsdaten mit einem"
                + " empfohlenen Verfahren signiert werden"
            )
        ]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[1]].display_name
            + _(": eines von {")
            + ", ".join(cls.allowed_signature_schemes)
            + "}"
        )

    @classmethod
    def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
        is_datastore = "STRIDE:DataStore" in node.tags

        handles_logging_data = (
            cls.attribute_names[0] in node.attributes
            and not meet_any_requirement(
                node.attributes[cls.attribute_names[0]], [False]
            )
        ) or cls.attribute_names[0] not in node.attributes

        uses_allowed_signature_scheme = cls.attribute_names[
            1
        ] in node.attributes and meet_any_requirement(
            node.attributes[cls.attribute_names[1]], cls.allowed_signature_schemes
        )

        return (
            handles_logging_data and is_datastore and not uses_allowed_signature_scheme
        )


class HashedPasswordsNodeRule(NodeRule):
    BASE_SEVERITY = 2.0
    severity = BASE_SEVERITY
    bsi_ids = [
        "ORP.4.A23",
        "CON.8.A5",
        "CON.10.A7",
        "APP.3.1.A14",
        "APP.3.2.A5",
        "APP.4.2.A13",
        "APP.4.3.A3",
        "SYS.1.6.A8",
        "NET.3.1.A1",
        "NET.3.2.A4",
    ]
    allowed_hash_functions: typing.List[str] = []

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out_bsi.gettext
        cls.display_name = _("Hashing von Passwörtern")
        cls.short_description = _("Passwörter müssen gehashed werden")
        cls.long_description = (
            _(
                "Passwörter DÜRFEN NICHT im Klartext gespeichert werden. Passwörter "
                "MÜSSEN serverseitig mit einem sicheren Salted Hash "
                "Verfahren gespeichert werden. Dazu zählen gemäß der "
                "Technischen Richtlinie TR-02102 des BSI: "
                "SHA-256, SHA-512/256, SHA-384, SHA-512, SHA3-256, SHA3-384, SHA3-512."
            )
            + "\n"
            + _("BSI IT-Grundschutzkompendium ID: ")
            + (", ".join(cls.bsi_ids))
        )
        cls.attribute_names = ["stores_credentials", "hash_function"]
        cls.allowed_hash_functions = attributes_dict.attributes[
            cls.attribute_names[1]
        ].accepted_values

        cls.mitigation_options = [
            _("Prüfen, dass eines der empfohlenen Hash-Verfahren genutzt wird")
        ]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[1]].display_name
            + _(": eines von {")
            + ", ".join(cls.allowed_hash_functions)
            + "}"
        )

    @classmethod
    def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
        is_datastore = "STRIDE:DataStore" in node.tags

        stores_passwords = (
            cls.attribute_names[0] in node.attributes
            and not meet_any_requirement(
                node.attributes[cls.attribute_names[0]], [False]
            )
        ) or cls.attribute_names[0] not in node.attributes

        uses_allowed_hash_function = cls.attribute_names[
            1
        ] in node.attributes and meet_any_requirement(
            node.attributes[cls.attribute_names[1]], cls.allowed_hash_functions
        )

        return is_datastore and stores_passwords and not uses_allowed_hash_function


class EncryptionOfConfidentialDataNodeRule(NodeRule):
    BASE_SEVERITY = 1.0
    severity = BASE_SEVERITY
    bsi_ids = ["CON.8.A5", "CON.10.A18"]
    allowed_encryption: typing.List[str] = []

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out_bsi.gettext
        cls.display_name = _("Verschlüsselung vertraulicher Daten")
        cls.short_description = _("Vertrauliche Daten müssen verschlüsselt werden")
        cls.long_description = (
            _(
                "Vertrauliche Daten SOLLTEN mit einem sicheren kryptografischen "
                "Verfahren verschlüsselt werden. Dazu zählen gemäß der "
                "Technischen Richtlinie TR-02102 des BSI: "
                "AES-128, AES-192, AES-256."
            )
            + "\n"
            + _("BSI IT-Grundschutzkompendium ID: ")
            + (", ".join(cls.bsi_ids))
        )
        cls.attribute_names = ["handles_confidential_data", "encryption_method"]
        cls.allowed_encryption = attributes_dict.attributes[
            cls.attribute_names[1]
        ].accepted_values

        cls.mitigation_options = [
            _(
                "Prüfen, dass eines der empfohlenen Verschlüsselungsverfahren genutzt "
                "wird"
            )
        ]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[1]].display_name
            + _(": eines von {")
            + ", ".join(cls.allowed_encryption)
            + "}"
        )

    @classmethod
    def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
        is_datastore = "STRIDE:DataStore" in node.tags

        handles_confidential_data = (
            cls.attribute_names[0] in node.attributes
            and not meet_any_requirement(
                node.attributes[cls.attribute_names[0]], [False]
            )
        ) or cls.attribute_names[0] not in node.attributes

        uses_allowed_encryption = cls.attribute_names[
            1
        ] in node.attributes and meet_any_requirement(
            node.attributes[cls.attribute_names[1]], cls.allowed_encryption
        )

        return (
            is_datastore and handles_confidential_data and not uses_allowed_encryption
        )


class AuthenticationProtocolNodeRule(NodeRule):
    BASE_SEVERITY = 1.0
    severity = BASE_SEVERITY
    bsi_ids = ["SYS.1.8.A24"]
    allowed_protocols: typing.List[str] = []

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out_bsi.gettext
        cls.display_name = _("Authentifizierungsprotokolle für SAN fabric")
        cls.short_description = _(
            "Sicherstellung der Speicher-Integrität durch sichere Protokolle"
        )
        cls.long_description = (
            _(
                "Um die Integrität der Speicherlösung sicherzustellen, "
                "SOLLTEN Protokolle mit zusätzlichen Sicherheitsmerkmalen eingesetzt "
                "und entsprechend konfiguriert werden. Dazu zählen: DH-CHAP, FCAP, und "
                "FCPAP."
            )
            + "\n"
            + _("BSI IT-Grundschutzkompendium ID: ")
            + (", ".join(cls.bsi_ids))
        )
        cls.attribute_names = ["is_san_fabric", "auth_protocol"]
        cls.allowed_protocols = attributes_dict.attributes[
            cls.attribute_names[1]
        ].accepted_values

        cls.mitigation_options = [
            _("Prüfen, dass eines der empfohlenen Protokolle verwendet wird")
        ]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[1]].display_name
            + _(": eines von {")
            + ", ".join(cls.allowed_protocols)
            + "}"
        )

    @classmethod
    def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
        is_datastore = "STRIDE:DataStore" in node.tags

        is_san_fabric = (
            cls.attribute_names[0] in node.attributes
            and not meet_any_requirement(
                node.attributes[cls.attribute_names[0]], [False]
            )
        ) or cls.attribute_names[0] not in node.attributes

        uses_allowed_protocol = cls.attribute_names[
            1
        ] in node.attributes and meet_any_requirement(
            node.attributes[cls.attribute_names[1]], cls.allowed_protocols
        )

        return is_datastore and is_san_fabric and not uses_allowed_protocol


class MFANodeRule(NodeRule):
    BASE_SEVERITY = 1.0
    severity = BASE_SEVERITY
    bsi_ids = [""]

    # TODO
    # Diese Regel ist in der hier notierten Form so nicht im Grundschutzkompendium zu
    # ...finden. Das nahekommenste ist CON.10.A16 (Entwicklung von Webanwendungen):
    # ..."Es SOLLTE eine Mehr-Faktor-Authentisierung implementiert werden." Aber nicht,
    # ...wann sie benutzt werden soll. Möglicherweise sollte diese Regel komplett
    # ...entfernt werden

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out_bsi.gettext
        cls.display_name = _("Multi-Faktor-Authentisierung")
        cls.short_description = _("Multi-Faktor-Authentisierung")
        cls.long_description = (
            _(
                "Falls eine Authentisierung gemäß Richtlinien CON.10.A16, APP.3.1.A1 "
                "und CON.8.A5 des IT-Grundschutzkompendium erforderlich ist, SOLLTE "
                "die Liste der Authentisierungs-Faktoren zwei oder mehr Elemente "
                "umfassen."
            )
            + "\n"
            + _("BSI IT-Grundschutzkompendium ID: ")
            + (", ".join(cls.bsi_ids))
        )
        cls.attribute_names = ["auth_req", "auth_factors"]

        cls.mitigation_options = [_("Authentisierungsfaktoren hinzufügen")]
        cls.requirement = attributes_dict.attributes[
            cls.attribute_names[1]
        ].display_name + _(": Anzahl >= 2")

    @classmethod
    def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
        is_Process_or_Datastore = (
            "STRIDE:DataStore" in node.tags or "STRIDE:Process" in node.tags
        )

        requires_authentication = (
            cls.attribute_names[0] in node.attributes
            and not meet_any_requirement(
                node.attributes[cls.attribute_names[0]], [False]
            )
        ) or cls.attribute_names[0] not in node.attributes

        if cls.attribute_names[1] in node.attributes:
            node_auth_factors = node.attributes[cls.attribute_names[1]]
            if type(node.attributes[cls.attribute_names[1]]) is not str:
                node_auth_factors = ", ".join(node_auth_factors)
            node_auth_factors = re.split(",|;|\\.", node_auth_factors)
        uses_multiple_factors = (
            cls.attribute_names[1] in node.attributes and len(node_auth_factors) >= 2
        )

        return (
            is_Process_or_Datastore
            and requires_authentication
            and not uses_multiple_factors
        )


class MFAHighSecurityNodeRule(NodeRule):
    BASE_SEVERITY = 1.0
    severity = BASE_SEVERITY
    bsi_ids = ["ORP.4.A21", "CON.8.A5"]
    secure_factors: typing.List[str] = []

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out_bsi.gettext
        cls.display_name = _("Multi-Faktor-Authentisierung bei hohem Sicherheitsbedarf")
        cls.short_description = _(
            "Authentisierungsfaktoren bei hohem Sicherheitsbedarf"
        )
        cls.long_description = (
            _(
                "Falls hoher Sicherheitsbedarf besteht, SOLLTE eine sichere "
                "Mehr-Faktor-Authentisierung verwendet werden. Zum Beispiel mit "
                "kryptografischen Zertifikaten, Chipkarten oder Tokens."
            )
            + "\n"
            + _("BSI IT-Grundschutzkompendium ID: ")
            + (", ".join(cls.bsi_ids))
        )
        cls.attribute_names = ["handles_confidential_data", "auth_factors", "auth_req"]
        cls.secure_factors = attributes_dict.attributes[
            cls.attribute_names[1]
        ].accepted_values

        cls.mitigation_options = [_("Authentisierungsfaktoren hinzufügen")]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[1]].display_name
            + _(": eines von {")
            + ", ".join(cls.secure_factors)
            + "}"
        )

    @classmethod
    def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
        is_process_or_datastore = (
            "STRIDE:DataStore" in node.tags or "STRIDE:Process" in node.tags
        )
        requires_authentication = (
            cls.attribute_names[2] in node.attributes
            and not meet_any_requirement(
                node.attributes[cls.attribute_names[2]], [False]
            )
        ) or cls.attribute_names[2] not in node.attributes

        handles_confidential_data = (
            cls.attribute_names[0] in node.attributes
            and not meet_any_requirement(
                node.attributes[cls.attribute_names[0]], [False]
            )
        ) or cls.attribute_names[0] not in node.attributes

        uses_secure_factor = False
        if cls.attribute_names[1] in node.attributes:
            for factor in node.attributes[cls.attribute_names[1]]:
                if meet_any_requirement(factor, cls.secure_factors):
                    uses_secure_factor = True

        return (
            is_process_or_datastore
            and requires_authentication
            and handles_confidential_data
            and not uses_secure_factor
        )


class PermissionNodeRule(NodeRule):
    BASE_SEVERITY = 2.0
    severity = BASE_SEVERITY
    bsi_ids = ["CON.8.A5"]

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out_bsi.gettext
        cls.display_name = _("Geringste Berechtigungen")
        cls.short_description = _("Nur notwendige Berechtigungen vergeben")
        cls.long_description = (
            _(
                "Prozesse MÜSSEN mit möglichst geringen Privilegien "
                "ausgeführt werden können. Nutzer SOLLTEN nur Berechtigungen "
                "erhalten, die zur Dürchführung ihrer Aufgabe notwendig sind."
            )
            + "\n"
            + _("BSI IT-Grundschutzkompendium ID: ")
            + (", ".join(cls.bsi_ids))
        )
        cls.attribute_names = ["req_permissions", "given_permissions"]

        cls.mitigation_options = [_("Prüfen, ob alle vergebenen Rechte notwendig sind")]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[0]].display_name
            + _(" gleich wie ")
            + attributes_dict.attributes[cls.attribute_names[1]].display_name
        )

    @classmethod
    def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
        is_interactor_or_process = (
            "STRIDE:Interactor" in node.tags or "STRIDE:Process" in node.tags
        )
        fullfills_least_privilege = (
            cls.attribute_names[0] in node.attributes
            and cls.attribute_names[1] in node.attributes
        )
        if fullfills_least_privilege:
            for req_perm in node.attributes[cls.attribute_names[1]]:
                if not meet_any_requirement(
                    req_perm, node.attributes[cls.attribute_names[0]]
                ):
                    fullfills_least_privilege = False

        return is_interactor_or_process and not fullfills_least_privilege


class InputValidationNodeRule(NodeRule):
    BASE_SEVERITY = 2.0
    severity = BASE_SEVERITY
    bsi_ids = ["CON.8.A5", "CON.10.A8"]

    @classmethod
    def init_texts(cls):
        _ = settings.lang_out_bsi.gettext
        cls.display_name = _("Eingabevalidierung")
        cls.short_description = _("Eingabevalidierung")
        cls.long_description = (
            _(
                "Sämtliche an eine Webanwendung übergebenen Daten sind potentiell "
                "gefährlich. Sämtliche Eingabedaten, Datenströme und Sekundärdaten, "
                "wie z.B. Session-IDs MÜSSEN vor der Weiterverarbeitung serverseitig "
                "validiert werden. Fehleingaben SOLLTEN möglichst nicht automatisch "
                "behandelt werden (Sanitizing). Lässt es sich doch nicht vermeiden, "
                "MUSS Sanitizing sicher umgesetzt werden."
            )
            + "\n"
            + _("BSI IT-Grundschutzkompendium ID: ")
            + (", ".join(cls.bsi_ids))
        )
        cls.attribute_names = [
            "input_data",
            "input_validation",
            "sanitization",
            "sanitization_secure",
        ]

        cls.mitigation_options = [_("Alle Eingabedaten validieren")]
        cls.requirement = (
            attributes_dict.attributes[cls.attribute_names[0]].display_name
            + _(" und ")
            + attributes_dict.attributes[cls.attribute_names[1]].display_name
            + _(" und ")
            + "("
            + _("nicht ")
            + attributes_dict.attributes[cls.attribute_names[2]].display_name
            + _(" oder ")
            + "("
            + attributes_dict.attributes[cls.attribute_names[2]].display_name
            + _(" und ")
            + attributes_dict.attributes[cls.attribute_names[3]].display_name
            + "))"
        )

    @classmethod
    def _test(cls, node: Node, dfd: dataflowdiagram.DataflowDiagram):
        is_Process = "STRIDE:Process" in node.tags

        validation_matches_input = (
            cls.attribute_names[0] in node.attributes
            and cls.attribute_names[1] in node.attributes
            and meet_any_requirement(node.attributes[cls.attribute_names[1]], [True])
        )

        sanitization = cls.attribute_names[
            2
        ] not in node.attributes or not meet_any_requirement(
            node.attributes[cls.attribute_names[2]], [False]
        )
        sanitization_secure = cls.attribute_names[
            3
        ] in node.attributes and meet_any_requirement(
            node.attributes[cls.attribute_names[3]], [True]
        )

        return is_Process and (
            not validation_matches_input or (sanitization and not sanitization_secure)
        )


[docs] class BSIRuleCollection(DataflowDiagramRuleCollection): tag = "bsi_rules"
[docs] @classmethod def init_texts(cls): _ = settings.lang_out_bsi.gettext cls.name = "BSI" cls.set_rules_ruleset_name() cls.references = [ ( "https://www.bsi.bund.de/SharedDocs/Downloads/DE/BSI/Grundschutz/IT-GS-" + "Kompendium/checklisten_2023.html" ) ]
node_rules = [ HashedPasswordsNodeRule, EncryptionOfConfidentialDataNodeRule, AuthenticationProtocolNodeRule, MFANodeRule, MFAHighSecurityNodeRule, PermissionNodeRule, InputValidationNodeRule, LoggingDataNodeRule, ] edge_rules = [ UntrustworthyDataflowEdgeRule, ConfidentialDataflowEdgeRule, SecureHTTPConfigEdgeRule, IntegrityOfExternalEntitiesEdgeRule, UseOfProxiesEdgeRule, ]
__all__ = ["BSIRuleCollection"]