Source code for ditto.transformers.emr.emr_step_sensor_transformer
from typing import List
from airflow import DAG
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.models import BaseOperator
from airflow.operators.dummy_operator import DummyOperator
from ditto.api import OperatorTransformer, TransformerDefaults, DAGFragment, UpstreamOperatorNotFoundException
from ditto.utils import TransformerUtils
from ditto.transformers.emr import EmrAddStepsOperatorTransformer
from airflowhdi.operators import ConnectedAzureHDInsightCreateClusterOperator
from airflowlivy.operators.livy_batch_operator import LivyBatchOperator
from airflowlivy.sensors.livy_batch_sensor import LivyBatchSensor
[docs]class EmrStepSensorTransformer(OperatorTransformer[EmrStepSensor]):
"""
Transforms the sensor :class:`~airflow.contrib.operators.emr_step_sensor.EmrStepSensor`
"""
def __init__(self, target_dag: DAG, defaults: TransformerDefaults):
super().__init__(target_dag, defaults)
[docs] def transform(self, src_operator: BaseOperator, parent_fragment: DAGFragment, upstream_fragments: List[DAGFragment]) -> DAGFragment:
"""
This transformer assumes and relies on the fact that an upstream transformation
of a :class:`~airflow.contrib.operators.emr_create_job_flow_operator.EmrCreateJobFlowOperator`
has already taken place, since it needs to find the output of that transformation
to get the `cluster_name` and `azure_conn_id` from that operator (which should have been a
:class:`~airflowhdi.operators.AzureHDInsightCreateClusterOperator`)
This transformer also requires than there would already be transformations of
:class:`~airflow.contrib.operators.emr_add_steps_operator.EmrAddStepsOperator` to
:class:`~airflowhdi.operators.LivyBatchOperator` or :class:`~airflowhdi.operators.AzureHDInsightSshOperator`
in the `upstream_fragments` which can then be monitored by the output tasks of
this transformer. It needs to search for those ops upstream to find their task IDs
Adds :class:`~airflowhdi.sensors.LivyBatchSensor` if it was a livy spark job.
There's no sensor required for a transformed :class:`~airflowhdi.operators.AzureHDInsightSshOperator`
as it is synchronous.
"""
create_op_task_id = TransformerUtils.get_task_id_from_xcom_pull(src_operator.job_flow_id)
create_op: BaseOperator = \
TransformerUtils.find_op_in_fragment_list(
upstream_fragments,
operator_type=ConnectedAzureHDInsightCreateClusterOperator,
task_id=create_op_task_id)
if not create_op:
raise UpstreamOperatorNotFoundException(ConnectedAzureHDInsightCreateClusterOperator,
EmrStepSensor)
emr_step_sensor_op: EmrStepSensor = src_operator
emr_add_step_task_id = TransformerUtils.get_task_id_from_xcom_pull(emr_step_sensor_op.step_id)
emr_add_step_step_id = TransformerUtils.get_list_index_from_xcom_pull(emr_step_sensor_op.step_id)
target_step_task_id = EmrAddStepsOperatorTransformer.get_target_step_task_id(emr_add_step_task_id, emr_add_step_step_id)
add_step_op: BaseOperator = \
TransformerUtils.find_op_in_fragment_list_strict(
upstream_fragments,
task_id=target_step_task_id)
if isinstance(add_step_op, LivyBatchOperator):
step_sensor_op = LivyBatchSensor(
batch_id=f"{{{{ task_instance.xcom_pull('{target_step_task_id}', key='return_value') }}}}",
task_id = emr_step_sensor_op.task_id,
azure_conn_id=create_op.azure_conn_id,
cluster_name=create_op.cluster_name,
verify_in="yarn",
dag=self.dag
)
else:
# don't need a sensor for the ssh operator
step_sensor_op = DummyOperator(task_id=emr_step_sensor_op.task_id, dag=self.dag)
self.copy_op_attrs(step_sensor_op, emr_step_sensor_op)
self.sign_op(step_sensor_op)
return DAGFragment([step_sensor_op])