Source code for flowstrider.models.dataflowdiagram

# SPDX-FileCopyrightText: 2025 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: BSD-3-Clause

import typing
from dataclasses import dataclass, field

from flowstrider.models import common_models


[docs] @dataclass class DataflowDiagram: """ Represents a data flow diagram. Attributes: id (str): A unique identifier for the diagram. nodes (Dict[str, Node]): The nodes in the diagram. These can represent processes, external entities, or data stores. edges (Dict[str, Edge]): The edges in the diagram. These represent data flows between nodes. clusters (Dict[str, Cluster]): The clusters in the diagram. These contain nodes and represent trust boundaries. name (str): The name of the diagram. tags (Set[str]): A set of tags specifying the rule set to use ['stride', 'bsi_rules', 'linddun_rules']. attributes (Dict[str, Any]): Metadata about the data flow diagram. This information is not used in the current version. """ id: str nodes: typing.Dict[str, common_models.Node] edges: typing.Dict[str, common_models.Edge] clusters: typing.Dict[str, common_models.Cluster] name: str = "" tags: typing.Set[str] = field(default_factory=set) attributes: typing.Dict[str, typing.Any] = field(default_factory=dict)
[docs] def get_node_by_id(self, node_id: str) -> typing.Union[common_models.Node, None]: return self.nodes.get(node_id, None)
[docs] def get_edge_by_id(self, edge_id: str) -> typing.Union[common_models.Edge, None]: return self.edges.get(edge_id, None)
[docs] def get_cluster_by_id( self, cluster_id: str ) -> typing.Union[common_models.Cluster, None]: return self.clusters.get(cluster_id, None)
[docs] def get_element_by_id( self, element_id: str ) -> typing.Union[ common_models.Node, common_models.Edge, common_models.Cluster, None ]: element: typing.Union[ common_models.Node, common_models.Edge, common_models.Cluster, None ] = self.get_node_by_id(element_id) if element is None: element = self.get_edge_by_id(element_id) if element is None: element = self.get_cluster_by_id(element_id) return element
[docs] def get_clusters_for_node_id( self, node_id: str ) -> typing.List[common_models.Cluster]: """Returns all clusters a node is in""" clusters_with_node = [] for key in self.clusters: if node_id in self.clusters[key].node_ids: clusters_with_node.append(self.clusters[key]) return clusters_with_node