Source code for flowstrider.converters.dfd_to_dot_converter

# SPDX-FileCopyrightText: 2025 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: BSD-3-Clause

import os
import sys
import typing
from datetime import datetime
from io import StringIO

from graphviz import Source
from graphviz.backend.execute import ExecutableNotFound

from flowstrider import settings
from flowstrider.helpers.warnings import WarningsCounter
from flowstrider.models import common_models, dataflowdiagram
from flowstrider.settings import CMD_MAX_CHAR_WIDTH


[docs] def wrap_text( text_to_wrap: str, max_line_char_length: int = CMD_MAX_CHAR_WIDTH, include_hyphen: bool = True, ) -> str: """Inserts line breaks in the given string to fit the given maximum character length per line Args: text_to_wrap: the text that is being wrapped max_line_char_length: number of characters that will be allowed in one line include_hyphen: if hyphen are used to indicate that a long word continues on the next line; set to False for hyperlinks! Returns: wrapped version of the input text up to the maximum char length per line as a string with line breaks """ if len(text_to_wrap) == 0: return "" text_paragraphs = text_to_wrap.split("\n") text_wrapped = "" for paragraph in text_paragraphs: words = paragraph.split(" ") while len(words) > 0: # Wrap single words that are too long if len(words[0]) > max_line_char_length: if include_hyphen: text_wrapped += words[0][: max_line_char_length - 1] + "-\n" words[0] = words[0][max_line_char_length - 1 :] continue else: text_wrapped += words[0][:max_line_char_length] + "\n" words[0] = words[0][max_line_char_length:] continue # Wrap words that are not too long else: curr_line_char_length = len(words[0]) text_wrapped += words.pop(0) # Terminate if whole text is wrapped if len(words) == 0: break # Prepare for more words (extra '1' is for the space char between words) curr_line_char_length += 1 + len(words[0]) # Add more words too current line until max is reached or no words are left while curr_line_char_length <= max_line_char_length: text_wrapped += " " + words.pop(0) if len(words) == 0: break curr_line_char_length += 1 + len(words[0]) if curr_line_char_length > max_line_char_length: text_wrapped += "\n" if paragraph is not text_paragraphs[len(text_paragraphs) - 1]: text_wrapped += "\n" # Remove last \n and return if text_wrapped[-1] == "\n": return text_wrapped[:-1] else: return text_wrapped
[docs] def render_dfd(dfd: dataflowdiagram.DataflowDiagram) -> int: """Renders a given dataflow diagram as a PNG file (or SVG) Args: dfd: the dataflowdiagram to be rendered """ dot_format = deserialized_dfd_to_dot(dfd) output_file = "output/visualization/visualization" source = Source(dot_format) source.save(output_file + ".dot") os.makedirs("output/visualization", exist_ok=True) # Redirect any errors graphviz might throw error_stream = StringIO() temp_stderr = sys.stderr sys.stderr = error_stream try: source.render( output_file, format="png", cleanup=True ) # PNG can be added to pdf report (does not include metadata) source.render( output_file, format="svg", cleanup=True ) # SVG can be opened in browser to view metadata tooltips except ExecutableNotFound: _ = settings.lang_sys.gettext print( settings.C_WARNING + wrap_text( _("Warning: ") + _( "Graphviz seems to not be installed on your system or is not on " + "the system PATH. See {link} for help." ).format(link="https://graphviz.org/download/"), include_hyphen=False, ) + settings.C_DEFAULT ) WarningsCounter.add_warning() return 1 sys.stderr = temp_stderr # Write warnings_log if graphviz threw warnings errors = error_stream.getvalue() error_stream.close() if errors: with open("output/warnings_log.txt", "w") as log_file: log_file.write( "Log for {id} from {time}.\n\n".format(id=dfd.id, time=datetime.now()) ) log_file.write("Graphviz warnings:\n") log_file.write(errors) # The image borders have to be converted from transparent to white (no visual # ...change) because fpdf2 doesn't support transparency if os.path.exists(output_file + ".svg"): temp_lines = [] with open(output_file + ".svg", "r") as visualization_svg: temp_lines = visualization_svg.readlines() for i in range(len(temp_lines)): if '<polygon fill="white" stroke="transparent"' in temp_lines[i]: temp_lines[i] = temp_lines[i].replace('"transparent"', '"white"') with open(output_file + ".svg", "w") as visualization_svg: visualization_svg.writelines(temp_lines) return 0
[docs] def text_length_warning(id: str): _ = settings.lang_sys.gettext print( settings.C_WARNING + wrap_text( _( "Warning: The name of id: {id} is very long and might not be" + " displayed properly." ).format(id=id) ) + settings.C_DEFAULT ) WarningsCounter.add_warning()
[docs] def deserialized_dfd_to_dot(dfd: dataflowdiagram.DataflowDiagram) -> str: """takes dfd object as input and creates a dot representation (as string) which can be rendered as PNG file Args: dfd: the dataflow diagram to be converted to dot format Returns: the dot representation of the dfd as a string """ dfd_as_dot = 'digraph "' + dfd.id.replace(" ", "_") + '" {\n' # represent relationships between (nested) clusters (parent: list of children) # this is necessary because graphviz does not automatically show nested structures # adding a node twice as part of two different clusters does not work. it would be # ignored by the second cluster relationships: typing.Dict[str, typing.List[str]] = { cluster.id: [] for cluster in dfd.clusters.values() } for parent_key, parent_cluster in dfd.clusters.items(): for child_key, child_cluster in dfd.clusters.items(): if parent_key != child_key and set(child_cluster.node_ids).issubset( parent_cluster.node_ids ): relationships[parent_key].append(child_key) # Find the root clusters (not child of any other cluster) all_subclusters = {sub_id for subs in relationships.values() for sub_id in subs} root_clusters = [ cluster_id for cluster_id in relationships if cluster_id not in all_subclusters ] # track all nodes that have been added already as part of a cluster nodes_in_clusters = set() # add root clusters to dot representation # cluster_to_dot recursively add child clusters for root_cluster_id in root_clusters: cluster = dfd.clusters[root_cluster_id] dfd_as_dot += cluster_to_dot(cluster, dfd, relationships) nodes_in_clusters.update(cluster.node_ids) # in case of nodes that do not belong to any cluster for node_key in dfd.nodes: node = dfd.nodes[node_key] if node.id not in nodes_in_clusters: dfd_as_dot += node_to_dot(node) + "\n" # add edges to dot representation for edge_key in dfd.edges: edge = dfd.edges[edge_key] dfd_as_dot += dataflow_to_dot(edge) + "\n" dfd_as_dot += "}" return dfd_as_dot
[docs] def cluster_to_dot( cluster: common_models.Cluster, dfd: dataflowdiagram.DataflowDiagram, relationships: dict, ) -> str: """takes a cluster object and generates dot string. recursively includes nested clusters Args: cluster: the cluster to be converted to dot dfd: the dataflow diagram to which the cluster belongs (needed to get node and child cluster objects) relationships: possible child clusters to be added recursively Returns: the dot representation of the cluster(s) as a string """ cluster_id_underscores = cluster.id.replace(" ", "_").replace("-", "_") cluster_as_dot = f" subgraph cluster_{cluster_id_underscores} {{\n" label = cluster.name if cluster.name != "" else cluster.id label_formatted = wrap_text(label, 15) if label_formatted.count("\n") > 3: text_length_warning(cluster.id) label_formatted = label_formatted.replace("\n", "<br/>") cluster_as_dot += f" label=< <B>{label_formatted}</B> >;\n" # tooltip only relevant if rendered as svg file cluster_as_dot += f' tooltip="{format_attributes(cluster.attributes)}";\n' cluster_as_dot += " style=dashed;\n" # add nodes in cluster node_ids_sorted = sorted(cluster.node_ids) for node_id in node_ids_sorted: node = dfd.get_node_by_id(node_id) if node is None: raise KeyError(f"Node '{node_id}' not found.") cluster_as_dot += node_to_dot(node) + "\n" # Recursively add subclusters for subcluster_id in relationships[cluster.id]: subcluster = dfd.clusters[subcluster_id] cluster_as_dot += cluster_to_dot(subcluster, dfd, relationships) cluster_as_dot += " }\n" return cluster_as_dot
[docs] def node_to_dot(node: common_models.Node) -> str: """takes a node object and generates dot string Args: node: the node to be converted to dot Returns: the dot representation of the node as a string """ label = node.name if node.name != "" else node.id label_formatted = wrap_text(label, 15) if label_formatted.count("\n") > 3: text_length_warning(node.id) node_as_dot = f' "{node.id}" [' # Tooltip only relevant if rendered as svg file node_as_dot += f'tooltip="{format_attributes(node.attributes)}"' node_as_dot += ", fixedsize=true" node_as_dot += ", width=2.5" node_as_dot += ", height=1.0" # Determine shape dependent on type of node node_as_dot += ", shape=" if "STRIDE:DataStore" in node.tags: # DataStore gets extra treatment as the shape is more complex and has to be # ...handled with html (no default .dot shape) node_as_dot += "none" node_as_dot += ', label=<<table border="0" cellborder="1">' node_as_dot += '<tr><td sides="tb" width="180" height="71">' node_as_dot += label_formatted.replace("\n", "<br/>") node_as_dot += "</td></tr>" node_as_dot += "</table>>" else: if "STRIDE:Interactor" in node.tags: node_as_dot += "box" elif "STRIDE:Process" in node.tags: node_as_dot += "circle" # Add text node_as_dot += f', label="{label_formatted}"' node_as_dot += "]" return node_as_dot
[docs] def dataflow_to_dot(edge: common_models.Edge) -> str: """takes an edge object and generates dot string Args: edge: the edge to be converted to dot Returns: the dot representation of the edge as a string """ label = edge.name if edge.name != "" else edge.id label_formatted = wrap_text(label, 15) if label_formatted.count("\n") > 3: text_length_warning(edge.id) edge_as_dot = f' "{edge.source_id}" -> "{edge.sink_id}" ' edge_as_dot += f'[label="{label_formatted}"' # tooltip only relevant if rendered as svg file edge_as_dot += f', tooltip="{format_attributes(edge.attributes)}"];' return edge_as_dot
[docs] def format_attributes(attributes: dict) -> str: """takes metadata of an entity and formats it in a more human-readable format (helpful for the SVG representation where a metadata tooltip can be added) Args: attributes: the metadata dictionary of an entity (node, edge, cluster) Returns: the formatted attributes to be added to the dot string of the entity """ if not attributes: return "No metadata listed" attributes_as_string = "" for key, value in attributes.items(): attributes_as_string += f"{key}: {value} \n" return attributes_as_string