Source code for ditto.api
import copy
from abc import abstractmethod, ABC
from collections import deque
from typing import List, Type, Dict, TypeVar, Generic
from airflow import DAG
from airflow.models import BaseOperator
[docs]class DAGFragment:
"""
A DAG fragment represents a sub-DAG of a DAG. It is meant to be able to
represent fragments of operator subDAGs which are not yet attached to an
airflow DAG. DAGFragments can also be part of its own fragment DAG using
the ``parents`` and ``children`` references. Operator references are used
to traverse the fragment's subDAG and parent/children references are used
to traverse the DAG of fragments themselves if needed.
"""
def __init__(self, tasks: List[BaseOperator], parents: List['DAGFragment']=None, children: List['DAGFragment']=None):
"""
:param tasks: root tasks of the fragment (operator references are used to
traverse the operator subDAG)
:param parents: parent fragments
:param children: child fragments
"""
self.tasks: List[BaseOperator] = tasks
if not parents:
parents = []
self.parents = parents # type: List[DAGFragment]
if not children:
children = []
self.children = children # type: List[DAGFragment]
[docs] def add_parent(self, dag_fragment: 'DAGFragment'):
"""
set a parent-child relationship between this `DAGFragment` and another
:param dag_fragment: the other `DAGFragment`
"""
if dag_fragment not in self.parents:
self.parents.append(dag_fragment)
dag_fragment.add_child(self)
[docs] def add_child(self, dag_fragment: 'DAGFragment'):
"""
set a child-parent relationship between this `DAGFragment` and another
:param dag_fragment: the other `DAGFragment`
"""
if dag_fragment not in self.children:
self.children.append(dag_fragment)
dag_fragment.add_parent(self)
[docs] def add_child_in_place(self, child: BaseOperator):
"""
Add an operator as a child `DAGFragment` at the place
in the `DAGFragment` dag where it belonged in this fragment's
operator DAG
:param child: the operator to add in place
"""
frag_q: "deque[DAGFragment]" = deque()
frag_q.append(self)
while len(frag_q) > 0:
frag = frag_q.popleft()
for task in frag.tasks:
if child in task.downstream_list:
frag.add_child(DAGFragment([child]))
if frag.children:
for child_frag in frag.children:
frag_q.append(child_frag)
[docs]class TransformerDefaults:
"""
Used to hold configuration for a :class:`~ditto.api.OperatorTransformer`
"""
def __init__(self, default_operator: BaseOperator, other_defaults: Dict[str, str] = None):
#: default output operator of this transformer
self.default_operator = default_operator
#: any other defaults or conf you might want to set
self.other_defaults = other_defaults
T = TypeVar('T', bound=BaseOperator)
[docs]class Transformer:
"""
Base class for all kinds of ditto transformers
"""
#: this header is added as a param to the transformed airflow operator
#: and can be used to find transformed operators in a target DAG
TRANSFORMED_BY_HEADER = '__transformed_by'
[docs]class OperatorTransformer(Generic[T], ABC, Transformer):
"""
A type of :class:`~ditto.api.Transformer` which can transform individual airflow operators
This is an abstract class which your operator transformers need to subclass, where `T`
is the source operator type being transformed
"""
def __init__(self, target_dag: DAG, defaults: TransformerDefaults):
"""
:param target_dag: the target to which the transformed operators must be added
:param defaults: the default configuration for this transformer
"""
self.dag = target_dag
self.defaults = defaults if defaults is not None else \
TransformerDefaults(default_operator=None, other_defaults={})
[docs] @abstractmethod
def transform(self, src_operator: BaseOperator, parent_fragment: DAGFragment, upstream_fragments: List[DAGFragment]) -> DAGFragment:
"""
The main method to implement for an `OperatorTransformer`
:param src_operator: The source operator to be tranformed
:param parent_fragment: This will give you a linked-list of parent :class:`~ditto.api.DAGFragment`\'s, uptil the root
The transform operation can make use of the upstream parent chain, for example, to obtain information
about the upstream transformations. A good example of this is an add-step operator transformation
which needs to get the cluster ID from an upstream transformed create-cluster operator
:param upstream_fragments: This will give you *all* the upstream :class:`~ditto.api.DAGFragment`\'s transformed so far
in level-order, from the root till this node in the source DAG. This is just a more exhaustive version of
`parent_fragment`, and can give you parent's siblings, etc.
:return: the :class:`~ditto.api.DAGFragment` of transformed airflow :class:`operators <airflow.models.BaseOperator>`
"""
pass
[docs] @staticmethod
def copy_op_attrs(to_op: BaseOperator, from_op: BaseOperator) -> BaseOperator:
"""
A utility method to copy all the attributes of a source operator
to the transformed operator after transformation. Does not copy
attributes which need to be unique like task_id or dag_id, and that
have to be set by the transformer itself
:param to_op: the target operator to copy the attrs to
:param from_op: the source operator to copy the attrs from
:return: the to_op
"""
to_op.owner = from_op.owner if from_op.owner else to_op.owner
to_op.email = from_op.email if from_op.email else to_op.email
to_op.email_on_retry = from_op.email_on_retry if from_op.email_on_retry else to_op.email_on_retry
to_op.email_on_failure = from_op.email_on_failure if from_op.email_on_failure else to_op.email_on_failure
to_op.retries = from_op.retries if from_op.retries else to_op.retries
to_op.retry_delay = from_op.retry_delay if from_op.retry_delay else to_op.retry_delay
to_op.retry_exponential_backoff = from_op.retry_exponential_backoff if from_op.retry_exponential_backoff else to_op.retry_exponential_backoff
to_op.max_retry_delay = from_op.max_retry_delay if from_op.max_retry_delay else to_op.max_retry_delay
to_op.start_date = from_op.start_date if from_op.start_date else to_op.start_date
to_op.end_date = from_op.end_date if from_op.end_date else to_op.end_date
to_op.depends_on_past = from_op.depends_on_past if from_op.depends_on_past else to_op.depends_on_past
to_op.wait_for_downstream = from_op.wait_for_downstream if from_op.wait_for_downstream else to_op.wait_for_downstream
to_op.priority_weight = from_op.priority_weight if from_op.priority_weight else to_op.priority_weight
to_op.weight_rule = from_op.weight_rule if from_op.weight_rule else to_op.weight_rule
to_op.queue = from_op.queue if from_op.queue else to_op.queue
to_op.pool = from_op.pool if from_op.pool else to_op.pool
to_op.pool_slots = from_op.pool_slots if from_op.pool_slots else to_op.pool
to_op.sla = from_op.sla if from_op.sla else to_op.sla
to_op.execution_timeout = from_op.execution_timeout if from_op.execution_timeout else to_op.execution_timeout
to_op.on_failure_callback = from_op.on_failure_callback if from_op.on_failure_callback else to_op.on_failure_callback
to_op.on_success_callback = from_op.on_success_callback if from_op.on_success_callback else to_op.on_success_callback
to_op.on_retry_callback = from_op.on_retry_callback if from_op.on_retry_callback else to_op.on_retry_callback
to_op.trigger_rule = from_op.trigger_rule if from_op.trigger_rule else to_op.trigger_rule
to_op.resources = from_op.resources if from_op.resources else to_op.resources
to_op.run_as_user = from_op.run_as_user if from_op.run_as_user else to_op.run_as_user
to_op.task_concurrency = from_op.task_concurrency if from_op.task_concurrency else to_op.task_concurrency
to_op.executor_config = from_op.executor_config if from_op.executor_config else to_op.executor_config
to_op.do_xcom_push = from_op.do_xcom_push if from_op.do_xcom_push else to_op.do_xcom_push
to_op.inlets = from_op.inlets if from_op.inlets else to_op.inlets
to_op.outlets = from_op.outlets if from_op.outlets else to_op.outlets
return to_op
[docs] def get_default_op(self, src_operator: BaseOperator) -> BaseOperator:
"""
A utility method to deep copy a :paramref:`~ditto.api.TransformerDefaults.default_operator`
in the :class:`~ditto.api.TransformerDefaults` of this transformer
:param src_operator: the source operator from which attrs need to be copied
:return: the deep copied operator
"""
if self.defaults is not None:
if self.defaults.default_operator is not None:
new_op = copy.deepcopy(self.defaults.default_operator)
new_op.task_id = src_operator.task_id
return self.copy_op_attrs(new_op, src_operator)
[docs] def sign_op(self, output_op: BaseOperator):
"""
Adds the :const:`~ditto.api.Transformer.TRANSFORMED_BY_HEADER` header
as a param to the `output_op`
:param output_op: the op to which the header has to be added
"""
output_op.params[Transformer.TRANSFORMED_BY_HEADER] = self.__class__.__name__
[docs]class TransformerResolver(ABC):
"""
The abstract base class to define ditto resolvers
"""
[docs] @abstractmethod
def resolve_transformer(self, task: BaseOperator) -> Type[OperatorTransformer]:
"""
The main method to be implemented by a resolver
:param task: the source task for which a transformer has to be resolved (found)
:return: the type of :class:`~ditto.api.OperatorTransformer` found for this :class:`~airflow.models.BaseOperator`
"""
pass
[docs]class UpstreamOperatorNotFoundException(TransformerException):
"""
A type of :class:`~ditto.api.TransformerException` thrown
when an operator is not found upstream of this operator
to be transformed and was required.
"""
def __init__(self, upstream_op: BaseOperator, transformer_op: BaseOperator):
message = (f"Cannot transform operator {transformer_op.__name__} "
f"without a {upstream_op.__name__} defined upstream. "
f"Make sure you have connections between operators configured properly.")
super(TransformerException, self).__init__(message)
[docs]class TransformerDefaultsConf:
def __init__(self, defaults: Dict[Type[Transformer], TransformerDefaults]):
self.defaults = defaults
[docs]class TaskMatcher(ABC):
"""
The abstract base class used to define task matchers.
Task matchers are used to fingerprint and match operators based on certain
qualifying criteria. They fit in ditto's workflow for subdag transformation
where a subdag of TaskMatchers is found in the source airflow DAG
TaskMatchers can be linked to each other to form a DAG using its ``parents``
and ``children`` attributes
"""
def __init__(self):
#: parent matchers of this matcher
self.parents = []
#: child matchers of this matcher
self.children = []
[docs] def set_downstream(self, other):
"""
add the `other` matcher to this matcher's children
:param other: the other matcher
:return: self
"""
self.children.append(other)
other.parents.append(self)
[docs] def set_upstream(self, other):
"""
add the `other` matcher to this matcher's parents
:param other: the other matcher
:return: self
"""
self.parents.append(other)
other.children.append(self)
def __rshift__(self, other):
"""
Implements Self >> Other == self.set_downstream(other)
"""
if isinstance(other, list):
for o in other:
self.set_downstream(o)
else:
self.set_downstream(other)
return other
def __lshift__(self, other):
"""
Implements Self << Other == self.set_upstream(other)
"""
if isinstance(other, list):
for o in other:
self.set_upstream(o)
else:
self.set_upstream(other)
return other
def __rrshift__(self, other):
self.__lshift__(other)
return self
def __rlshift__(self, other):
self.__rshift__(other)
return self
[docs] @abstractmethod
def does_match(self, task: BaseOperator) -> bool:
"""
the main method to implement, containing logic to match
the provided operator with the matcher's criteria
:param task: the operator to try to match
:return: `True` if matched otherwise `False`
"""
pass
[docs]class SubDagTransformer(ABC, Transformer):
"""
The abstract base class to implement to define ditto SubDagTransformers
SubDag transformers are meant to, as their name goes, match subdags of operators
in the source airflow DAG, and then transform and replace them with a new subdag
as per the :class:`~ditto.api.DAGFragment`\'s returned.
"""
def __init__(self, dag: DAG, defaults: TransformerDefaults):
"""
:param dag: the source dag where subdags are to be found
:param defaults: the default configuration for this subdag transformer
"""
self.dag = dag
self.defaults = defaults if defaults is not None else \
TransformerDefaults(default_operator=None, other_defaults={})
[docs] @abstractmethod
def get_sub_dag_matcher(self) -> List[TaskMatcher]:
"""
Implement this method to construct and return a DAG of matchers for ditto to
then try to use and find a subdag in the source airflow DAG
:return: list of root nodes of the :class:`~ditto.api.TaskMatcher` DAG
"""
pass
[docs] @abstractmethod
def transform(self, subdag: DAGFragment, parent_fragment: DAGFragment) -> DAGFragment:
"""
Implement this method to do the actual transformation of the subdag found using
the matchers returned by :meth:`.get_sub_dag_matcher`
:param subdag: :class:`~ditto.api.DAGFragment` representing subdag of source DAG operators matched
:param parent_fragment: This will give you a linked-list of parent :class:`~ditto.api.DAGFragment`\'s,
uptil the root. See :paramref:`~ditto.api.OperatorTransformer.transform.parent_fragment`.
:return: the :class:`~ditto.api.DAGFragment` of transformed airflow :class:`operators <airflow.models.BaseOperator>`
"""
pass