ditto.utils¶
-
class
ditto.utils.
TransformerUtils
[source]¶ Bases:
object
-
static
get_list_index_from_xcom_pull
(xcom_template)[source]¶ Parses an airflow template variable and finds the list index accessed from the return value of an
xcom_pull()
- Example
>>> task_instance.xcom_pull("add_steps_to_cluster", key="return_value")[3] will return "3" >>> {{ ti.xcom_pull("add_steps_to_cluster", key="return_value")[2] }} will return "2"
- Parameters
xcom_template¶ (
str
) – airflow template string to parse- Return type
str
- Returns
the list index accessed
-
static
get_task_id_from_xcom_pull
(xcom_template)[source]¶ Parses an airflow template variable and finds the task ID from which an
xcom_pull()
is being done- Example
>>> {{ ti.xcom_pull("add_steps_to_cluster", key="return_value")[0] }} will return "add_steps_to_cluster" >>> {{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }} will return "add_steps"
- Parameters
xcom_template¶ (
str
) – airflow template string to parse- Return type
str
- Returns
the task_id
-
static
add_downstream_dag_fragment
(fragment_up, fragment_down)[source]¶ Attaches the roots of the fragment_down to the leaves of fragment_up,
Note
The leaves of fragment_up are found by traversing its operator DAG, not its
DAGFragment
dag. This does not join fragment DAGS but only operator DAGS in twoDAGFragment
’sSee the documentation of
DAGFragment
for understanding what that means.- Parameters
fragment_up¶ (
DAGFragment
) – the upstreamDAGFragment
to which fragment_down has to be addedfragment_down¶ (
DAGFragment
) – the downstreamDAGFragment
-
classmethod
find_op_in_parent_fragment_chain
(parent_fragment, operator_type=None, task_id=None)[source]¶ Finds the operator matched by the operator_type class and having task ID task_id in the passed linked-list referenced by the parent_fragment
DAGFragment
Uses the
find_op_in_dag_fragment()
for understanding how the search is done.- Parameters
parent_fragment¶ (
DAGFragment
) – Seeparent_fragment
operator_type¶ (
Optional
[Type
[BaseOperator
]]) – the type of operator to findtask_id¶ (
Optional
[str
]) – the task_id of the operator to find
- Return type
- Returns
the operator found
-
classmethod
find_op_in_fragment_list
(fragment_list, operator_type=None, task_id=None)[source]¶ Lenient version of
find_op_in_fragment_list_strict()
- Parameters
fragment_list¶ (
List
[DAGFragment
]) – the list ofDAGFragment
’s to search inoperator_type¶ (
Optional
[Type
[BaseOperator
]]) – the type of operator to findtask_id¶ (
Optional
[str
]) – the task_id of the operator to find
- Return type
- Returns
the operator found
-
classmethod
find_op_in_fragment_list_strict
(fragment_list, operator_type=None, task_id=None)[source]¶ Uses
find_op_in_dag_fragment()
to find an operator in a list ofDAGFragment
’s- Parameters
fragment_list¶ (
List
[DAGFragment
]) – the list ofDAGFragment
’s to search inoperator_type¶ (
Optional
[Type
[BaseOperator
]]) – the type of operator to findtask_id¶ (
Optional
[str
]) – the task_id of the operator to find
- Return type
- Returns
the operator found
-
static
find_op_in_dag_fragment
(dag_fragment, operator_type=None, task_id=None, upstream=False)[source]¶ Traverses the operator dag of the given
DAGFragment
and finds aBaseOperator
matching the given operator_type and task_id. First matches using the operator_type and subsequently using the task_id. Can search upstream or downstream of the tasks in the givenDAGFragment
- Parameters
dag_fragment¶ (
DAGFragment
) – fragment whose operator dag has to be searchedoperator_type¶ (
Optional
[Type
[BaseOperator
]]) – the type of operator to findtask_id¶ (
Optional
[str
]) – the task_id of the operator to findupstream¶ – search upstream if True otherwise search downstream
- Return type
- Returns
the operator found
-
static
get_digraph_from_matcher_dag
(matcher_roots)[source]¶ Construct a
DiGraph
from the givenTaskMatcher
dag- Parameters
dag¶ – the matcher DAG
- Return type
DiGraph
- Returns
the networkx DiGraph
-
classmethod
find_sub_dag
(dag, matcher_roots)[source]¶ The problem is to find a sub-DAG in a DAG where the sub-DAG’s nodes are matcher functions which test nodes
It can be generalized to: find if a DAG or DiGraph G1 is isomorphic with a DAG G2, with the node comparison function being running of the matchers in G1 on nodes in G2
Note
This uses python’s NetworkX graph library which uses the VF2 algorithm for graph isomorphism.
Note
We are trying to find an exact sub-DAG match. In graph theory, this is called a node-induced subgraph. A subgraph 𝐻 of 𝐺 is called INDUCED, if for any two vertices 𝑢,𝑣 in 𝐻, 𝑢 and 𝑣 are adjacent in 𝐻 if and only if they are adjacent in 𝐺. In other words, 𝐻 has the same edges as 𝐺 between the vertices in 𝐻.
See also
This is an NP-complete problem: https://en.wikipedia.org/wiki/Subgraph_isomorphism_problem
-
static
remove_task_from_dag
(dag, dag_nodes, task)[source]¶ Removes the given list of
BaseOperator
’s from the givenDAG
- Parameters
dag_nodes¶ (
List
[BaseOperator
]) – the list of nodes in the source DAGtask¶ (
BaseOperator
) – the task to remove
-
classmethod
find_matching_tasks
(subdag, matcher)[source]¶ Find matching tasks in a
DiGraph
of operators- Parameters
subdag¶ (
DiGraph
) – the dag to search for matchesmatcher¶ (
TaskMatcher
) – the task matcher to use
- Returns
matching nodes
-
static
assign_task_to_dag
(op, dag)[source]¶ Assigns the given
BaseOperator
and all its downstream tasks to the givenDAG
- Parameters
op¶ (
BaseOperator
) – the task to assigndag¶ (
DAG
) – the dag to assign the task and its downstream to
-
classmethod
add_dag_fragment_to_dag
(dag, frag)[source]¶ Traverses and assigns all the tasks in this fragment to the given DAG using
assign_task_to_dag()
- Parameters
frag¶ (
DAGFragment
) – the dag fragment to assign
-
static