Source code for ditto.transformers.copy_transformer

import copy
from typing import List

from airflow import DAG
from airflow.models import BaseOperator

from ditto.api import OperatorTransformer, TransformerDefaults, DAGFragment


[docs]class CopyTransformer(OperatorTransformer): """ Deep copies the source operator, clears it of its connections to the source DAG, and any upstream/downstream tasks and assigns it to the `target_dag`. .. note:: If you face issues with serialization of the transformed operator by airflow, you can exclude some attributes to be deep copied by overriding the following attribute of the source operator: >>> input_operator.shallow_copy_attrs += ('_dag',) """ def __init__(self, target_dag: DAG, defaults: TransformerDefaults): super().__init__(target_dag, defaults)
[docs] def transform(self, input_operator: BaseOperator, parent_fragment: DAGFragment, upstream_fragments: List[DAGFragment]) -> DAGFragment: copied_op = copy.deepcopy(input_operator) copied_op._dag = None copied_op._upstream_task_ids.clear() copied_op._downstream_task_ids.clear() copied_op.dag = self.dag return DAGFragment([copied_op])