Source code for ditto.transformers.identity_transformer
from typing import List
from airflow.models import BaseOperator
from ditto.api import OperatorTransformer, DAGFragment
[docs]class IdentityTransformer(OperatorTransformer):
"""
Re-uses the source operator as is as the transformed operator
Just clears its connections to the source DAG and adds it
to the target DAG
.. warning::
Use this transformer with caution. Reusing an operator
can have weird side effects
"""
[docs] def transform(self, input_operator: BaseOperator, parent_fragment: DAGFragment, upstream_fragments: List[DAGFragment]) -> DAGFragment:
input_operator._dag = None
input_operator._upstream_task_ids.clear()
input_operator._downstream_task_ids.clear()
input_operator.dag = self.dag
return DAGFragment([input_operator])