Source code for ditto.transformers.emr.emr_terminate_job_flow_op_transformer

from typing import List

from airflow import DAG
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.models import BaseOperator
from airflow.utils.trigger_rule import TriggerRule

from ditto.api import OperatorTransformer, TransformerDefaults, DAGFragment, UpstreamOperatorNotFoundException
from ditto.utils import TransformerUtils
from airflowhdi.operators import ConnectedAzureHDInsightCreateClusterOperator
from airflowhdi.operators import AzureHDInsightDeleteClusterOperator


[docs]class EmrTerminateJobFlowOperatorTransformer(OperatorTransformer[EmrTerminateJobFlowOperator]): """ Transforms the operator :class:`~airflow.contrib.operators.emr_terminate_job_flow_operator.EmrTerminateJobFlowOperator` """ def __init__(self, target_dag: DAG, defaults: TransformerDefaults): super().__init__(target_dag, defaults)
[docs] def transform(self, src_operator: BaseOperator, parent_fragment: DAGFragment, upstream_fragments: List[DAGFragment]) -> DAGFragment: """ This transformer assumes and relies on the fact that an upstream transformation of a :class:`~airflow.contrib.operators.emr_create_job_flow_operator.EmrCreateJobFlowOperator` has already taken place, since it needs to find the output of that transformation to get the `cluster_name` and `azure_conn_id` from that operator (which should have been a :class:`~airflowhdi.operators.AzureHDInsightCreateClusterOperator`) Creates a :class:`~airflowhdi.operators.AzureHDInsightDeleteClusterOperator` to terminate the cluster """ create_op_task_id = TransformerUtils.get_task_id_from_xcom_pull(src_operator.job_flow_id) create_op: BaseOperator = \ TransformerUtils.find_op_in_fragment_list( upstream_fragments, operator_type=ConnectedAzureHDInsightCreateClusterOperator, task_id=create_op_task_id) if not create_op: raise UpstreamOperatorNotFoundException(ConnectedAzureHDInsightCreateClusterOperator, EmrTerminateJobFlowOperator) emr_terminate_op: EmrTerminateJobFlowOperator = src_operator terminate_cluster_op = AzureHDInsightDeleteClusterOperator(task_id=emr_terminate_op.task_id, azure_conn_id=create_op.azure_conn_id, cluster_name=create_op.cluster_name, dag=self.dag) self.copy_op_attrs(terminate_cluster_op, src_operator) self.sign_op(terminate_cluster_op) terminate_cluster_op.trigger_rule = TriggerRule.ALL_DONE return DAGFragment([terminate_cluster_op])