Source code for flowstrider.converters.dfd_to_dot_converter
# SPDX-FileCopyrightText: 2025 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: BSD-3-Clause
import os
import sys
from datetime import datetime
from io import StringIO
from graphviz import Source
from flowstrider import settings
from flowstrider.helpers.warnings import WarningsCounter
from flowstrider.models import common_models, dataflowdiagram
from flowstrider.settings import CMD_MAX_CHAR_WIDTH
[docs]
def wrap_text(
text_to_wrap: str,
max_line_char_length: int = CMD_MAX_CHAR_WIDTH,
include_hyphen: bool = True,
) -> str:
"""Inserts line breaks in the given string to fit the given maximum character
length per line
Args:
text_to_wrap: the text that is being wrapped
max_line_char_length: number of characters that will be allowed in one line
include_hyphen: if hyphen are used to indicate that a long word continues on
the next line; set to False for hyperlinks!
Returns:
wrapped version of the input text up to the maximum char length per line as a
string with line breaks
"""
if len(text_to_wrap) == 0:
return ""
text_paragraphs = text_to_wrap.split("\n")
text_wrapped = ""
for paragraph in text_paragraphs:
words = paragraph.split(" ")
while len(words) > 0:
# Wrap single words that are too long
if len(words[0]) > max_line_char_length:
if include_hyphen:
text_wrapped += words[0][: max_line_char_length - 1] + "-\n"
words[0] = words[0][max_line_char_length - 1 :]
continue
else:
text_wrapped += words[0][:max_line_char_length] + "\n"
words[0] = words[0][max_line_char_length:]
continue
# Wrap words that are not too long
else:
curr_line_char_length = len(words[0])
text_wrapped += words.pop(0)
# Terminate if whole text is wrapped
if len(words) == 0:
break
# Prepare for more words (extra '1' is for the space char between words)
curr_line_char_length += 1 + len(words[0])
# Add more words too current line until max is reached or no words are left
while curr_line_char_length <= max_line_char_length:
text_wrapped += " " + words.pop(0)
if len(words) == 0:
break
curr_line_char_length += 1 + len(words[0])
if curr_line_char_length > max_line_char_length:
text_wrapped += "\n"
if paragraph is not text_paragraphs[len(text_paragraphs) - 1]:
text_wrapped += "\n"
# Remove last \n and return
if text_wrapped[-1] == "\n":
return text_wrapped[:-1]
else:
return text_wrapped
[docs]
def render_dfd(dfd: dataflowdiagram.DataflowDiagram):
"""Renders a given dataflow diagram as a PNG file (or SVG)
Args:
dfd: the dataflowdiagram to be rendered
"""
dot_format = deserialized_dfd_to_dot(dfd)
output_file = "output/visualization/visualization"
source = Source(dot_format)
source.save(output_file + ".dot")
os.makedirs("output/visualization", exist_ok=True)
# Redirect any errors graphviz might throw
error_stream = StringIO()
temp_stderr = sys.stderr
sys.stderr = error_stream
source.render(
output_file, format="png", cleanup=True
) # PNG can be added to pdf report (does not include metadata)
source.render(
output_file, format="svg", cleanup=True
) # SVG can be opened in browser to view metadata tooltips
sys.stderr = temp_stderr
# Write warnings_log if graphviz threw warnings
errors = error_stream.getvalue()
error_stream.close()
if errors:
with open("output/warnings_log.txt", "w") as log_file:
log_file.write(
"Log for {id} from {time}.\n\n".format(id=dfd.id, time=datetime.now())
)
log_file.write("Graphviz warnings:\n")
log_file.write(errors)
[docs]
def text_length_warning(id: str):
_ = settings.lang_sys.gettext
print(
settings.C_WARNING
+ wrap_text(
_(
"Warning: The name of id: {id} is very long and might not be"
+ " displayed properly."
).format(id=id)
)
+ settings.C_DEFAULT
)
WarningsCounter.add_warning()
[docs]
def deserialized_dfd_to_dot(dfd: dataflowdiagram.DataflowDiagram) -> str:
"""takes dfd object as input and creates a dot representation (as string)
which can be rendered as PNG file
Args:
dfd: the dataflow diagram to be converted to dot format
Returns:
the dot representation of the dfd as a string
"""
dfd_as_dot = 'digraph "' + dfd.id.replace(" ", "_") + '" {\n'
# represent relationships between (nested) clusters (parent: list of children)
# this is necessary because graphviz does not automatically show nested structures
# adding a node twice as part of two different clusters does not work. it would be
# ignored by the second cluster
relationships = {cluster.id: [] for cluster in dfd.clusters.values()}
for parent_key, parent_cluster in dfd.clusters.items():
for child_key, child_cluster in dfd.clusters.items():
if parent_key != child_key and set(child_cluster.node_ids).issubset(
parent_cluster.node_ids
):
relationships[parent_key].append(child_key)
# Find the root clusters (not child of any other cluster)
all_subclusters = {sub_id for subs in relationships.values() for sub_id in subs}
root_clusters = [
cluster_id for cluster_id in relationships if cluster_id not in all_subclusters
]
# track all nodes that have been added already as part of a cluster
nodes_in_clusters = set()
# add root clusters to dot representation
# cluster_to_dot recursively add child clusters
for root_cluster_id in root_clusters:
cluster = dfd.clusters[root_cluster_id]
dfd_as_dot += cluster_to_dot(cluster, dfd, relationships)
nodes_in_clusters.update(cluster.node_ids)
# in case of nodes that do not belong to any cluster
for node_key in dfd.nodes:
node = dfd.nodes[node_key]
if node.id not in nodes_in_clusters:
dfd_as_dot += node_to_dot(node) + "\n"
# add edges to dot representation
for edge_key in dfd.edges:
edge = dfd.edges[edge_key]
dfd_as_dot += dataflow_to_dot(edge) + "\n"
dfd_as_dot += "}"
return dfd_as_dot
[docs]
def cluster_to_dot(
cluster: common_models.Cluster,
dfd: dataflowdiagram.DataflowDiagram,
relationships: dict,
) -> str:
"""takes a cluster object and generates dot string. recursively includes
nested clusters
Args:
cluster: the cluster to be converted to dot
dfd: the dataflow diagram to which the cluster belongs
(needed to get node and child cluster objects)
relationships: possible child clusters to be added recursively
Returns:
the dot representation of the cluster(s) as a string
"""
cluster_id_underscores = cluster.id.replace(" ", "_").replace("-", "_")
cluster_as_dot = f" subgraph cluster_{cluster_id_underscores} {{\n"
label = cluster.name if cluster.name != "" else cluster.id
label_formatted = wrap_text(label, 15)
if label_formatted.count("\n") > 3:
text_length_warning(cluster.id)
label_formatted = label_formatted.replace("\n", "<br/>")
cluster_as_dot += f" label=< <B>{label_formatted}</B> >;\n"
# tooltip only relevant if rendered as svg file
cluster_as_dot += f' tooltip="{format_attributes(cluster.attributes)}";\n'
cluster_as_dot += " style=dashed;\n"
# add nodes in cluster
node_ids_sorted = sorted(cluster.node_ids)
for node_id in node_ids_sorted:
node = dfd.get_node_by_id(node_id)
cluster_as_dot += node_to_dot(node) + "\n"
# Recursively add subclusters
for subcluster_id in relationships[cluster.id]:
subcluster = dfd.clusters[subcluster_id]
cluster_as_dot += cluster_to_dot(subcluster, dfd, relationships)
cluster_as_dot += " }\n"
return cluster_as_dot
[docs]
def node_to_dot(node: common_models.Node) -> str:
"""takes a node object and generates dot string
Args:
node: the node to be converted to dot
Returns:
the dot representation of the node as a string
"""
label = node.name if node.name != "" else node.id
label_formatted = wrap_text(label, 15)
if label_formatted.count("\n") > 3:
text_length_warning(node.id)
node_as_dot = f' "{node.id}" ['
# Tooltip only relevant if rendered as svg file
node_as_dot += f'tooltip="{format_attributes(node.attributes)}"'
node_as_dot += ", fixedsize=true"
node_as_dot += ", width=2.5"
node_as_dot += ", height=1.0"
# Determine shape dependent on type of node
node_as_dot += ", shape="
if "STRIDE:DataStore" in node.tags:
# DataStore gets extra treatment as the shape is more complex and has to be
# ...handled with html (no default .dot shape)
node_as_dot += "none"
node_as_dot += ', label=<<table border="0" cellborder="1">'
node_as_dot += '<tr><td sides="tb" width="180" height="71">'
node_as_dot += label_formatted.replace("\n", "<br/>")
node_as_dot += "</td></tr>"
node_as_dot += "</table>>"
else:
if "STRIDE:Interactor" in node.tags:
node_as_dot += "box"
elif "STRIDE:Process" in node.tags:
node_as_dot += "circle"
# Add text
node_as_dot += f', label="{label_formatted}"'
node_as_dot += "]"
return node_as_dot
[docs]
def dataflow_to_dot(edge: common_models.Edge) -> str:
"""takes an edge object and generates dot string
Args:
edge: the edge to be converted to dot
Returns:
the dot representation of the edge as a string
"""
label = edge.name if edge.name != "" else edge.id
label_formatted = wrap_text(label, 15)
if label_formatted.count("\n") > 3:
text_length_warning(edge.id)
edge_as_dot = f' "{edge.source_id}" -> "{edge.sink_id}" '
edge_as_dot += f'[label="{label_formatted}"'
# tooltip only relevant if rendered as svg file
edge_as_dot += f', tooltip="{format_attributes(edge.attributes)}"];'
return edge_as_dot