ditto.api

class ditto.api.DAGFragment(tasks, parents=None, children=None)[source]

Bases: object

A DAG fragment represents a sub-DAG of a DAG. It is meant to be able to represent fragments of operator subDAGs which are not yet attached to an airflow DAG. DAGFragments can also be part of its own fragment DAG using the parents and children references. Operator references are used to traverse the fragment’s subDAG and parent/children references are used to traverse the DAG of fragments themselves if needed.

Parameters
  • tasks (List[BaseOperator]) – root tasks of the fragment (operator references are used to traverse the operator subDAG)

  • parents (Optional[List[DAGFragment]]) – parent fragments

  • children (Optional[List[DAGFragment]]) – child fragments

add_parent(dag_fragment)[source]

set a parent-child relationship between this DAGFragment and another

Parameters

dag_fragment (DAGFragment) – the other DAGFragment

add_child(dag_fragment)[source]

set a child-parent relationship between this DAGFragment and another

Parameters

dag_fragment (DAGFragment) – the other DAGFragment

add_child_in_place(child)[source]

Add an operator as a child DAGFragment at the place in the DAGFragment dag where it belonged in this fragment’s operator DAG

Parameters

child (BaseOperator) – the operator to add in place

class ditto.api.TransformerDefaults(default_operator, other_defaults=None)[source]

Bases: object

Used to hold configuration for a OperatorTransformer

default_operator

default output operator of this transformer

other_defaults

any other defaults or conf you might want to set

class ditto.api.Transformer[source]

Bases: object

Base class for all kinds of ditto transformers

TRANSFORMED_BY_HEADER = '__transformed_by'

this header is added as a param to the transformed airflow operator and can be used to find transformed operators in a target DAG

class ditto.api.OperatorTransformer(target_dag, defaults)[source]

Bases: typing.Generic, abc.ABC, ditto.api.Transformer

A type of Transformer which can transform individual airflow operators This is an abstract class which your operator transformers need to subclass, where T is the source operator type being transformed

Parameters
  • target_dag (DAG) – the target to which the transformed operators must be added

  • defaults (TransformerDefaults) – the default configuration for this transformer

abstract transform(src_operator, parent_fragment, upstream_fragments)[source]

The main method to implement for an OperatorTransformer

Parameters
  • src_operator (BaseOperator) – The source operator to be tranformed

  • parent_fragment (DAGFragment) – This will give you a linked-list of parent DAGFragment’s, uptil the root The transform operation can make use of the upstream parent chain, for example, to obtain information about the upstream transformations. A good example of this is an add-step operator transformation which needs to get the cluster ID from an upstream transformed create-cluster operator

  • upstream_fragments (List[DAGFragment]) – This will give you all the upstream DAGFragment’s transformed so far in level-order, from the root till this node in the source DAG. This is just a more exhaustive version of parent_fragment, and can give you parent’s siblings, etc.

Return type

DAGFragment

Returns

the DAGFragment of transformed airflow operators

static copy_op_attrs(to_op, from_op)[source]

A utility method to copy all the attributes of a source operator to the transformed operator after transformation. Does not copy attributes which need to be unique like task_id or dag_id, and that have to be set by the transformer itself

Parameters
  • to_op (BaseOperator) – the target operator to copy the attrs to

  • from_op (BaseOperator) – the source operator to copy the attrs from

Return type

BaseOperator

Returns

the to_op

get_default_op(src_operator)[source]

A utility method to deep copy a default_operator in the TransformerDefaults of this transformer

Parameters

src_operator (BaseOperator) – the source operator from which attrs need to be copied

Return type

BaseOperator

Returns

the deep copied operator

sign_op(output_op)[source]

Adds the TRANSFORMED_BY_HEADER header as a param to the output_op

Parameters

output_op (BaseOperator) – the op to which the header has to be added

class ditto.api.TransformerResolver[source]

Bases: abc.ABC

The abstract base class to define ditto resolvers

abstract resolve_transformer(task)[source]

The main method to be implemented by a resolver

Parameters

task (BaseOperator) – the source task for which a transformer has to be resolved (found)

Return type

Type[OperatorTransformer]

Returns

the type of OperatorTransformer found for this BaseOperator

exception ditto.api.TransformerException[source]

Bases: Exception

Exceptions thrown by ditto transformers

exception ditto.api.UpstreamOperatorNotFoundException(upstream_op, transformer_op)[source]

Bases: ditto.api.TransformerException

A type of TransformerException thrown when an operator is not found upstream of this operator to be transformed and was required.

class ditto.api.TransformerDefaultsConf(defaults)[source]

Bases: object

class ditto.api.TaskMatcher[source]

Bases: abc.ABC

The abstract base class used to define task matchers. Task matchers are used to fingerprint and match operators based on certain qualifying criteria. They fit in ditto’s workflow for subdag transformation where a subdag of TaskMatchers is found in the source airflow DAG TaskMatchers can be linked to each other to form a DAG using its parents and children attributes

parents

parent matchers of this matcher

children

child matchers of this matcher

set_downstream(other)[source]

add the other matcher to this matcher’s children

Parameters

other – the other matcher

Returns

self

set_upstream(other)[source]

add the other matcher to this matcher’s parents

Parameters

other – the other matcher

Returns

self

abstract does_match(task)[source]

the main method to implement, containing logic to match the provided operator with the matcher’s criteria

Parameters

task (BaseOperator) – the operator to try to match

Return type

bool

Returns

True if matched otherwise False

class ditto.api.SubDagTransformer(dag, defaults)[source]

Bases: abc.ABC, ditto.api.Transformer

The abstract base class to implement to define ditto SubDagTransformers

SubDag transformers are meant to, as their name goes, match subdags of operators in the source airflow DAG, and then transform and replace them with a new subdag as per the DAGFragment’s returned.

Parameters
  • dag (DAG) – the source dag where subdags are to be found

  • defaults (TransformerDefaults) – the default configuration for this subdag transformer

abstract get_sub_dag_matcher()[source]

Implement this method to construct and return a DAG of matchers for ditto to then try to use and find a subdag in the source airflow DAG

Return type

List[TaskMatcher]

Returns

list of root nodes of the TaskMatcher DAG

abstract transform(subdag, parent_fragment)[source]

Implement this method to do the actual transformation of the subdag found using the matchers returned by get_sub_dag_matcher()

Parameters
Return type

DAGFragment

Returns

the DAGFragment of transformed airflow operators