Source code for ditto.transformers.emr.emr_job_flow_sensor_transformer

from typing import List

from airflow import DAG
from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor
from airflow.models import BaseOperator

from ditto.api import OperatorTransformer, TransformerDefaults, DAGFragment, UpstreamOperatorNotFoundException
from ditto.utils import TransformerUtils
from airflowhdi.operators import ConnectedAzureHDInsightCreateClusterOperator
from airflowhdi.sensors import AzureHDInsightClusterSensor


[docs]class EmrJobFlowSensorTransformer(OperatorTransformer[EmrJobFlowSensor]): """ Transforms the sensor :class:`~airflow.contrib.operators.emr_job_flow_sensor.EmrJobFlowSensor` """ def __init__(self, target_dag: DAG, defaults: TransformerDefaults): super().__init__(target_dag, defaults)
[docs] def transform(self, src_operator: BaseOperator, parent_fragment: DAGFragment, upstream_fragments: List[DAGFragment]) -> DAGFragment: """ This transformer assumes and relies on the fact that an upstream transformation of a :class:`~airflow.contrib.operators.emr_create_job_flow_operator.EmrCreateJobFlowOperator` has already taken place, since it needs to find the output of that transformation to get the `cluster_name` and `azure_conn_id` from that operator (which should have been a :class:`~airflowhdi.operators.AzureHDInsightCreateClusterOperator`) Creates a :class:`~airflowhdi.sensors.AzureHDInsightClusterSensor` in non-provisioning mode to monitor the cluster till it reaches a terminal state (cluster shutdown by user or failed). .. warning:: We do not have a way to tell the HDInsight cluster to halt if a job has failed, unlike EMR. So the cluster will continue to run even on job failure. You have to add a terminate cluster operator on step failure through ditto itself. """ create_op_task_id = TransformerUtils.get_task_id_from_xcom_pull(src_operator.job_flow_id) create_op: BaseOperator = \ TransformerUtils.find_op_in_fragment_list( upstream_fragments, operator_type=ConnectedAzureHDInsightCreateClusterOperator, task_id=create_op_task_id) if not create_op: raise UpstreamOperatorNotFoundException(ConnectedAzureHDInsightCreateClusterOperator, EmrJobFlowSensor) monitor_cluster_op = AzureHDInsightClusterSensor(create_op.cluster_name, azure_conn_id=create_op.azure_conn_id, poke_interval=5, task_id=f"{create_op.task_id}_monitor_cluster", dag=self.dag) self.copy_op_attrs(monitor_cluster_op, src_operator) self.sign_op(monitor_cluster_op) return DAGFragment([monitor_cluster_op])