ditto.utils

class ditto.utils.TransformerUtils[source]

Bases: object

static get_list_index_from_xcom_pull(xcom_template)[source]

Parses an airflow template variable and finds the list index accessed from the return value of an xcom_pull()

Example

>>> task_instance.xcom_pull("add_steps_to_cluster", key="return_value")[3]
    will return "3"
>>> {{ ti.xcom_pull("add_steps_to_cluster", key="return_value")[2] }}
    will return "2"
Parameters

xcom_template (str) – airflow template string to parse

Return type

str

Returns

the list index accessed

static get_task_id_from_xcom_pull(xcom_template)[source]

Parses an airflow template variable and finds the task ID from which an xcom_pull() is being done

Example

>>> {{ ti.xcom_pull("add_steps_to_cluster", key="return_value")[0] }}
    will return "add_steps_to_cluster"
>>> {{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}
    will return "add_steps"
Parameters

xcom_template (str) – airflow template string to parse

Return type

str

Returns

the task_id

static add_downstream_dag_fragment(fragment_up, fragment_down)[source]

Attaches the roots of the fragment_down to the leaves of fragment_up,

Note

The leaves of fragment_up are found by traversing its operator DAG, not its DAGFragment dag. This does not join fragment DAGS but only operator DAGS in two DAGFragment’s

See the documentation of DAGFragment for understanding what that means.

Parameters
classmethod find_op_in_parent_fragment_chain(parent_fragment, operator_type=None, task_id=None)[source]

Finds the operator matched by the operator_type class and having task ID task_id in the passed linked-list referenced by the parent_fragment DAGFragment

Uses the find_op_in_dag_fragment() for understanding how the search is done.

Parameters
Return type

BaseOperator

Returns

the operator found

classmethod find_op_in_fragment_list(fragment_list, operator_type=None, task_id=None)[source]

Lenient version of find_op_in_fragment_list_strict()

Parameters
  • fragment_list (List[DAGFragment]) – the list of DAGFragment’s to search in

  • operator_type (Optional[Type[BaseOperator]]) – the type of operator to find

  • task_id (Optional[str]) – the task_id of the operator to find

Return type

BaseOperator

Returns

the operator found

classmethod find_op_in_fragment_list_strict(fragment_list, operator_type=None, task_id=None)[source]

Uses find_op_in_dag_fragment() to find an operator in a list of DAGFragment’s

Parameters
  • fragment_list (List[DAGFragment]) – the list of DAGFragment’s to search in

  • operator_type (Optional[Type[BaseOperator]]) – the type of operator to find

  • task_id (Optional[str]) – the task_id of the operator to find

Return type

BaseOperator

Returns

the operator found

static find_op_in_dag_fragment(dag_fragment, operator_type=None, task_id=None, upstream=False)[source]

Traverses the operator dag of the given DAGFragment and finds a BaseOperator matching the given operator_type and task_id. First matches using the operator_type and subsequently using the task_id. Can search upstream or downstream of the tasks in the given DAGFragment

Parameters
  • dag_fragment (DAGFragment) – fragment whose operator dag has to be searched

  • operator_type (Optional[Type[BaseOperator]]) – the type of operator to find

  • task_id (Optional[str]) – the task_id of the operator to find

  • upstream – search upstream if True otherwise search downstream

Return type

BaseOperator

Returns

the operator found

static get_digraph_from_airflow_dag(dag)[source]

Construct a DiGraph from the given airflow DAG

Parameters

dag (DAG) – the airflow DAG

Return type

DiGraph

Returns

the networkx DiGraph

static get_digraph_from_matcher_dag(matcher_roots)[source]

Construct a DiGraph from the given TaskMatcher dag

Parameters

dag – the matcher DAG

Return type

DiGraph

Returns

the networkx DiGraph

classmethod find_sub_dag(dag, matcher_roots)[source]

The problem is to find a sub-DAG in a DAG where the sub-DAG’s nodes are matcher functions which test nodes

It can be generalized to: find if a DAG or DiGraph G1 is isomorphic with a DAG G2, with the node comparison function being running of the matchers in G1 on nodes in G2

Note

This uses python’s NetworkX graph library which uses the VF2 algorithm for graph isomorphism.

Note

We are trying to find an exact sub-DAG match. In graph theory, this is called a node-induced subgraph. A subgraph 𝐻 of 𝐺 is called INDUCED, if for any two vertices 𝑢,𝑣 in 𝐻, 𝑢 and 𝑣 are adjacent in 𝐻 if and only if they are adjacent in 𝐺. In other words, 𝐻 has the same edges as 𝐺 between the vertices in 𝐻.

See also

This is an NP-complete problem: https://en.wikipedia.org/wiki/Subgraph_isomorphism_problem

Parameters
  • task – the DAG where the sub-dag has to be found

  • matcher – the root task matcher of the [TaskMatcher] dag

Return type

Tuple[DiGraph, List[DiGraph]]

Returns

a tuple containing the DiGraph of the souce DAG and the list of matching subdag DiGraph’s

static remove_task_from_dag(dag, dag_nodes, task)[source]

Removes the given list of BaseOperator’s from the given DAG

Parameters
classmethod find_matching_tasks(subdag, matcher)[source]

Find matching tasks in a DiGraph of operators

Parameters
  • subdag (DiGraph) – the dag to search for matches

  • matcher (TaskMatcher) – the task matcher to use

Returns

matching nodes

static assign_task_to_dag(op, dag)[source]

Assigns the given BaseOperator and all its downstream tasks to the given DAG

Parameters
  • op (BaseOperator) – the task to assign

  • dag (DAG) – the dag to assign the task and its downstream to

classmethod add_dag_fragment_to_dag(dag, frag)[source]

Traverses and assigns all the tasks in this fragment to the given DAG using assign_task_to_dag()

Parameters
  • dag (DAG) – the dag to assign the fragment’s tasks to

  • frag (DAGFragment) – the dag fragment to assign