Source code for ditto.ditto
import copy
from queue import Queue
from typing import Type, List
import logging
from airflow import DAG
from airflow.models import BaseOperator
from ditto import rendering
from ditto.api import TransformerDefaultsConf, SubDagTransformer, DAGFragment, \
TransformerResolver
from ditto.transformers import CopyTransformer
from ditto.utils import TransformerUtils
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
[docs]class AirflowDagTransformer:
"""
Ditto's core logic is executed by this class. It executes two operations on a DAG in sequence.
First it will run the provided subdag transformers to transform the source DAG itself
Then it will transform the operators in the source dag and create a target DAG out of the
returned :class:`~ditto.api.DAGFragment`\'s.
"""
def __init__(self,
target_dag: DAG,
transformer_defaults: TransformerDefaultsConf = None,
transformer_resolvers: List[TransformerResolver] = None,
subdag_transformers: List[Type[SubDagTransformer]] = None,
debug_mode: bool = False):
"""
:param target_dag: ditto allows you to provide a pre-fabricated airflow DAG object
so that you can set essential parameters like it's ``schedule_interval``, ``params``,
give it a unique ``dag_id``, etc. outside of ditto itself, instead of ditto
copying the attributes of the DAG over from the source DAG. This gives more flexbility.
:param transformer_defaults: allows you to pass a map of transformer type to their default
configuration. This is helpful to pass things like a default operator to use when the
transformer cannot transform the source operator for some reason, or any other configuration
required by the transformer
:param transformer_resolvers: resolvers to use to find the transformers for each kind of
operator in the source DAG.
:param subdag_transformers: subdag transformers to use for converting matching subdags
in the source DAG to transformed subdags
:param debug_mode: when `True` it will render the intermediate results of transformation
using `networkx <https://networkx.github.io/>`_ and `maplotlib <https://matplotlib.org/>`_
so that you can debug your transformations easily.
"""
self.transformer_cache = {}
self.target_dag = target_dag
self.transformer_defaults = transformer_defaults
self.transformer_resolvers = transformer_resolvers
self.subdag_transformers = subdag_transformers
self.debug_mode = debug_mode
[docs] def transform_operators(self, src_dag: DAG):
"""
Transforms the operators in the source DAG and creates the target DAG out of the returned
:class:`~ditto.api.DAGFragment`\'s. Finds the transformers by running each operator through
all the resolvers passed.
Does a bread-first-traversal on the source DAG such that the result of the transformation
of upstream (and previous ops in this level) are available to downstream transformers in
level-order. This is helpful for real world use cases of transformation like having a
spark step op transformer read the result of the transformation of a cluster create op transformer.
Caches the results of transformations to avoid repeat work, as this is a graph, not a tree
being traversed.
.. note::
Stitches the final target DAG after having transformed all operators.
:param src_dag: the source airflow DAG to be operator-transformed
:return: does not return anything. mutates ``self.target_dag`` directly
"""
src_task_q: "Queue[(BaseOperator,DAGFragment)]" = Queue()
for root in src_dag.roots:
src_task_q.put((root, None))
# a list representing all processed fragments so far
# transformers can use this to fetch information from the
# level before or even tasks before this one in the same level
# we'll also use this to stetch our final airflow DAG together
transformed_dag_fragments = []
while not src_task_q.empty():
src_task, parent_fragment = src_task_q.get()
# since this is a graph, we can encounter the same source
# task repeatedly if it has multiple parents in the src_dag
# to avoid transforming it repeatedly, check if has already been seen
task_dag_fragment = None
cached_fragment = False
if src_task in self.transformer_cache:
log.info("Already transformed source task: %s", src_task)
task_dag_fragment = self.transformer_cache[src_task]
cached_fragment = True
else:
# get transformer class for this operator
transformer_cl = None
for resolver in self.transformer_resolvers:
transformer_cl = resolver.resolve_transformer(src_task)
if transformer_cl:
log.info(f"Found transformer for operator {src_task.__class__.__name__}"
f": {transformer_cl.__name__} using {resolver.__class__.__name__}")
break
if not transformer_cl:
transformer_cl = CopyTransformer
# get transformer defaults for this operator
transformer_defaults = None
if self.transformer_defaults is not None:
if transformer_cl in self.transformer_defaults.defaults:
transformer_defaults = self.transformer_defaults.defaults[transformer_cl]
# create the transformer
transformer = transformer_cl(self.target_dag, transformer_defaults)
# do transformation, and get DAGFragment
task_dag_fragment = transformer.transform(src_task,
parent_fragment, transformed_dag_fragments)
self.transformer_cache[src_task] = task_dag_fragment
# add this transformed output fragment to
# the upstream fragments processed so far
transformed_dag_fragments.append(task_dag_fragment)
# add children to queue
if src_task.downstream_list:
for downstream_task in src_task.downstream_list:
src_task_q.put((downstream_task, task_dag_fragment))
# chain it to the parent
if parent_fragment:
task_dag_fragment.add_parent(parent_fragment)
# convert dag fragment relationships to airflow dag relationships
# for the processed fragments (which are now available in topological
# sorted order)
for output_fragment in transformed_dag_fragments:
# get a flattened list of roots for the child DAGFragments
all_child_fragment_roots = [step for frag in output_fragment.children for step in frag.tasks]
# attach the flattened roots of child DAGFragments to this DAGFragment
TransformerUtils.add_downstream_dag_fragment(output_fragment,
DAGFragment(all_child_fragment_roots))
[docs] def transform_sub_dags(self, src_dag: DAG):
"""
Transforms the subdags of the source DAG, as matched by the :class:`~ditto.api.TaskMatcher`
DAG provided by the :class:`~ditto.api.SubDagTransformer`\'s :meth:`~ditto.api.SubDagTransformer.get_sub_dag_matcher`
method.
Multiple subdag transformers can run through the source DAG, and each of them can
match+transform multiple subdags of the source DAG, _and_ each such transformation can
return multiple subdags as a result, so this can get quite flexible if you want.
The final DAG is carefully stitched with all the results of the subdag transformations.
See the unit tests at `test_core.py` for complex examples.
.. note::
If your matched input subdag had different leaves pointing to different
operators/nodes, the transformed subdags leaves will just get multiplexed
to all the leaves of the `source DAG`, since it is not possible to know which
new leaf is to be stitched to which node of the source DAG, and resolve
new relationships based on old ones.
.. warning::
Make sure that you don't provide :class:`~ditto.api.SubDagTransformer`\'s which
with overlapping subdag matchers, otherwise things can understandably get messy.
.. seealso::
The core logic behind this method lies in a graph algorithm called
subgraph isomorphism, and is explained in detail at
:meth:`~ditto.utils.TransformerUtils.find_sub_dag`
:param src_dag: the source airflow DAG to be subdag-transformed
:return: does not return anything. mutates the passed ``src_dag`` directly,
which is why you should pass a copy of the source DAG.
"""
for subdag_transformer_cl in self.subdag_transformers:
transformer_defaults = None
if self.transformer_defaults is not None:
if subdag_transformer_cl in self.transformer_defaults.defaults:
transformer_defaults = self.transformer_defaults.defaults[subdag_transformer_cl]
subdag_transformer = subdag_transformer_cl(src_dag, transformer_defaults)
matcher_roots = subdag_transformer.get_sub_dag_matcher()
# find matching sub-dags usng the [TaskMatcher] DAG
src_dag_dg, subdags = TransformerUtils.find_sub_dag(src_dag, matcher_roots)
src_dag_nodes = [t for t in src_dag_dg.nodes]
# transform each matching sub-dag and replace it in the DAG
cloned_subdags = copy.deepcopy(subdags)
# deep copy since DiGraph holds weak refs and that creates a problem
# with traversing the DiGraph after deleting nodes from the original airflow DAG
for subdag, cloned_subdag in zip(subdags, cloned_subdags):
# upstream tasks are nodes in the main dag in-edges of the nodes in this sub-dag
# which do not belong to this sub-dag
subdag_upstream_tasks = set(n for edge in src_dag_dg.in_edges(nbunch=subdag.nodes) \
for n in edge if n not in subdag)
# downstream tasks are nodes in the main dag out-edges of the nodes in this sub-dag
# which do not belong to this sub-dag
subdag_downstream_tasks = set(n for edge in src_dag_dg.edges(nbunch=subdag.nodes) \
for n in edge if n not in subdag)
subdag_nodes = [n for n in subdag.nodes]
for task in subdag_nodes:
TransformerUtils.remove_task_from_dag(src_dag, src_dag_nodes, task)
new_subdag_fragment = subdag_transformer.transform(cloned_subdag, DAGFragment(subdag_upstream_tasks))
# attach new subdag to upstream
if subdag_upstream_tasks:
for parent in subdag_upstream_tasks:
for new_root in new_subdag_fragment.tasks:
parent.set_downstream(new_root)
# assign new subdag to src_dag
TransformerUtils.add_dag_fragment_to_dag(src_dag, new_subdag_fragment)
# attach downstream to the leaves of the new subdag
TransformerUtils.add_downstream_dag_fragment(new_subdag_fragment, DAGFragment(subdag_downstream_tasks))
[docs] def transform(self, src_dag: DAG):
"""
This is the entry point to using ditto, and is the only method you
should have to call after creating the transformer object. This calls
the :meth:`.transform_sub_dags` and :meth:`.transform_operators` methods
in sequence to realize the final :attr:`target_dag` and return it
:param src_dag: the source airflow DAG to be ditto-transformed
:return: the resultant transformed DAG
"""
if self.debug_mode:
rendering.debug_dags(
[src_dag],
figsize=[14, 14])
# transform sub-DAGs of the src_dag
if self.subdag_transformers:
self.transform_sub_dags(src_dag)
if self.debug_mode:
rendering.debug_dags(
[src_dag],
figsize=[14, 14])
# transform each step of the src_dag
# and add it to the target_dag
self.transform_operators(src_dag)
if self.debug_mode:
rendering.debug_dags(
[src_dag, self.target_dag],
figsize=[14, 14])
return self.target_dag