Source code for flowstrider.converters.dfd_to_dot_converter
# SPDX-FileCopyrightText: 2025 German Aerospace Center (DLR)
#
# SPDX-License-Identifier: BSD-3-Clause
import os
import sys
import typing
from datetime import datetime
from io import StringIO
from graphviz import Source
from graphviz.backend.execute import ExecutableNotFound
from flowstrider import settings
from flowstrider.helpers.warnings import WarningsCounter
from flowstrider.models import common_models, dataflowdiagram
from flowstrider.settings import CMD_MAX_CHAR_WIDTH
[docs]
def wrap_text(
text_to_wrap: str,
max_line_char_length: int = CMD_MAX_CHAR_WIDTH,
include_hyphen: bool = True,
) -> str:
"""Inserts line breaks in the given string to fit the given maximum character
length per line
Args:
text_to_wrap: the text that is being wrapped
max_line_char_length: number of characters that will be allowed in one line
include_hyphen: if hyphen are used to indicate that a long word continues on
the next line; set to False for hyperlinks!
Returns:
wrapped version of the input text up to the maximum char length per line as a
string with line breaks
"""
if len(text_to_wrap) == 0:
return ""
text_paragraphs = text_to_wrap.split("\n")
text_wrapped = ""
for paragraph in text_paragraphs:
words = paragraph.split(" ")
while len(words) > 0:
# Wrap single words that are too long
if len(words[0]) > max_line_char_length:
if include_hyphen:
text_wrapped += words[0][: max_line_char_length - 1] + "-\n"
words[0] = words[0][max_line_char_length - 1 :]
continue
else:
text_wrapped += words[0][:max_line_char_length] + "\n"
words[0] = words[0][max_line_char_length:]
continue
# Wrap words that are not too long
else:
curr_line_char_length = len(words[0])
text_wrapped += words.pop(0)
# Terminate if whole text is wrapped
if len(words) == 0:
break
# Prepare for more words (extra '1' is for the space char between words)
curr_line_char_length += 1 + len(words[0])
# Add more words too current line until max is reached or no words are left
while curr_line_char_length <= max_line_char_length:
text_wrapped += " " + words.pop(0)
if len(words) == 0:
break
curr_line_char_length += 1 + len(words[0])
if curr_line_char_length > max_line_char_length:
text_wrapped += "\n"
if paragraph is not text_paragraphs[len(text_paragraphs) - 1]:
text_wrapped += "\n"
# Remove last \n and return
if text_wrapped[-1] == "\n":
return text_wrapped[:-1]
else:
return text_wrapped
[docs]
def render_dfd(dfd: dataflowdiagram.DataflowDiagram) -> int:
"""Renders a given dataflow diagram as a PNG file (or SVG)
Args:
dfd: the dataflowdiagram to be rendered
"""
dot_format = deserialized_dfd_to_dot(dfd)
output_file = "output/visualization/visualization"
source = Source(dot_format)
source.save(output_file + ".dot")
os.makedirs("output/visualization", exist_ok=True)
# Redirect any errors graphviz might throw
error_stream = StringIO()
temp_stderr = sys.stderr
sys.stderr = error_stream
try:
source.render(
output_file, format="png", cleanup=True
) # PNG can be added to pdf report (does not include metadata)
source.render(
output_file, format="svg", cleanup=True
) # SVG can be opened in browser to view metadata tooltips
except ExecutableNotFound:
_ = settings.lang_sys.gettext
print(
settings.C_WARNING
+ wrap_text(
_("Warning: ")
+ _(
"Graphviz seems to not be installed on your system or is not on "
+ "the system PATH. See {link} for help."
).format(link="https://graphviz.org/download/"),
include_hyphen=False,
)
+ settings.C_DEFAULT
)
WarningsCounter.add_warning()
return 1
sys.stderr = temp_stderr
# Write warnings_log if graphviz threw warnings
errors = error_stream.getvalue()
error_stream.close()
if errors:
with open("output/warnings_log.txt", "w") as log_file:
log_file.write(
"Log for {id} from {time}.\n\n".format(id=dfd.id, time=datetime.now())
)
log_file.write("Graphviz warnings:\n")
log_file.write(errors)
# The image borders have to be converted from transparent to white (no visual
# ...change) because fpdf2 doesn't support transparency
if os.path.exists(output_file + ".svg"):
temp_lines = []
with open(output_file + ".svg", "r") as visualization_svg:
temp_lines = visualization_svg.readlines()
for i in range(len(temp_lines)):
if '<polygon fill="white" stroke="transparent"' in temp_lines[i]:
temp_lines[i] = temp_lines[i].replace('"transparent"', '"white"')
with open(output_file + ".svg", "w") as visualization_svg:
visualization_svg.writelines(temp_lines)
return 0
[docs]
def text_length_warning(id: str):
_ = settings.lang_sys.gettext
print(
settings.C_WARNING
+ wrap_text(
_(
"Warning: The name of id: {id} is very long and might not be"
+ " displayed properly."
).format(id=id)
)
+ settings.C_DEFAULT
)
WarningsCounter.add_warning()
[docs]
def deserialized_dfd_to_dot(dfd: dataflowdiagram.DataflowDiagram) -> str:
"""takes dfd object as input and creates a dot representation (as string)
which can be rendered as PNG file
Args:
dfd: the dataflow diagram to be converted to dot format
Returns:
the dot representation of the dfd as a string
"""
dfd_as_dot = 'digraph "' + dfd.id.replace(" ", "_") + '" {\n'
# represent relationships between (nested) clusters (parent: list of children)
# this is necessary because graphviz does not automatically show nested structures
# adding a node twice as part of two different clusters does not work. it would be
# ignored by the second cluster
relationships: typing.Dict[str, typing.List[str]] = {
cluster.id: [] for cluster in dfd.clusters.values()
}
for parent_key, parent_cluster in dfd.clusters.items():
for child_key, child_cluster in dfd.clusters.items():
if parent_key != child_key and set(child_cluster.node_ids).issubset(
parent_cluster.node_ids
):
relationships[parent_key].append(child_key)
# Find the root clusters (not child of any other cluster)
all_subclusters = {sub_id for subs in relationships.values() for sub_id in subs}
root_clusters = [
cluster_id for cluster_id in relationships if cluster_id not in all_subclusters
]
# track all nodes that have been added already as part of a cluster
nodes_in_clusters = set()
# add root clusters to dot representation
# cluster_to_dot recursively add child clusters
for root_cluster_id in root_clusters:
cluster = dfd.clusters[root_cluster_id]
dfd_as_dot += cluster_to_dot(cluster, dfd, relationships)
nodes_in_clusters.update(cluster.node_ids)
# in case of nodes that do not belong to any cluster
for node_key in dfd.nodes:
node = dfd.nodes[node_key]
if node.id not in nodes_in_clusters:
dfd_as_dot += node_to_dot(node) + "\n"
# add edges to dot representation
for edge_key in dfd.edges:
edge = dfd.edges[edge_key]
dfd_as_dot += dataflow_to_dot(edge) + "\n"
dfd_as_dot += "}"
return dfd_as_dot
[docs]
def cluster_to_dot(
cluster: common_models.Cluster,
dfd: dataflowdiagram.DataflowDiagram,
relationships: dict,
) -> str:
"""takes a cluster object and generates dot string. recursively includes
nested clusters
Args:
cluster: the cluster to be converted to dot
dfd: the dataflow diagram to which the cluster belongs
(needed to get node and child cluster objects)
relationships: possible child clusters to be added recursively
Returns:
the dot representation of the cluster(s) as a string
"""
cluster_id_underscores = cluster.id.replace(" ", "_").replace("-", "_")
cluster_as_dot = f" subgraph cluster_{cluster_id_underscores} {{\n"
label = cluster.name if cluster.name != "" else cluster.id
label_formatted = wrap_text(label, 15)
if label_formatted.count("\n") > 3:
text_length_warning(cluster.id)
label_formatted = label_formatted.replace("\n", "<br/>")
cluster_as_dot += f" label=< <B>{label_formatted}</B> >;\n"
# tooltip only relevant if rendered as svg file
cluster_as_dot += f' tooltip="{format_attributes(cluster.attributes)}";\n'
cluster_as_dot += " style=dashed;\n"
# add nodes in cluster
node_ids_sorted = sorted(cluster.node_ids)
for node_id in node_ids_sorted:
node = dfd.get_node_by_id(node_id)
if node is None:
raise KeyError(f"Node '{node_id}' not found.")
cluster_as_dot += node_to_dot(node) + "\n"
# Recursively add subclusters
for subcluster_id in relationships[cluster.id]:
subcluster = dfd.clusters[subcluster_id]
cluster_as_dot += cluster_to_dot(subcluster, dfd, relationships)
cluster_as_dot += " }\n"
return cluster_as_dot
[docs]
def node_to_dot(node: common_models.Node) -> str:
"""takes a node object and generates dot string
Args:
node: the node to be converted to dot
Returns:
the dot representation of the node as a string
"""
label = node.name if node.name != "" else node.id
label_formatted = wrap_text(label, 15)
if label_formatted.count("\n") > 3:
text_length_warning(node.id)
node_as_dot = f' "{node.id}" ['
# Tooltip only relevant if rendered as svg file
node_as_dot += f'tooltip="{format_attributes(node.attributes)}"'
node_as_dot += ", fixedsize=true"
node_as_dot += ", width=2.5"
node_as_dot += ", height=1.0"
# Determine shape dependent on type of node
node_as_dot += ", shape="
if "STRIDE:DataStore" in node.tags:
# DataStore gets extra treatment as the shape is more complex and has to be
# ...handled with html (no default .dot shape)
node_as_dot += "none"
node_as_dot += ', label=<<table border="0" cellborder="1">'
node_as_dot += '<tr><td sides="tb" width="180" height="71">'
node_as_dot += label_formatted.replace("\n", "<br/>")
node_as_dot += "</td></tr>"
node_as_dot += "</table>>"
else:
if "STRIDE:Interactor" in node.tags:
node_as_dot += "box"
elif "STRIDE:Process" in node.tags:
node_as_dot += "circle"
# Add text
node_as_dot += f', label="{label_formatted}"'
node_as_dot += "]"
return node_as_dot
[docs]
def dataflow_to_dot(edge: common_models.Edge) -> str:
"""takes an edge object and generates dot string
Args:
edge: the edge to be converted to dot
Returns:
the dot representation of the edge as a string
"""
label = edge.name if edge.name != "" else edge.id
label_formatted = wrap_text(label, 15)
if label_formatted.count("\n") > 3:
text_length_warning(edge.id)
edge_as_dot = f' "{edge.source_id}" -> "{edge.sink_id}" '
edge_as_dot += f'[label="{label_formatted}"'
# tooltip only relevant if rendered as svg file
edge_as_dot += f', tooltip="{format_attributes(edge.attributes)}"];'
return edge_as_dot