Source code for ditto.transformers.emr.emr_create_job_flow_op_transformer
from typing import List
from airflow import DAG, settings
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.models import Connection, BaseOperator
from ditto.api import OperatorTransformer, TransformerDefaults, DAGFragment
from airflowhdi.operators import AzureHDInsightCreateClusterOperator
from airflowhdi.sensors.azure_hdinsight_cluster_sensor import AzureHDInsightClusterSensor
[docs]class EmrCreateJobFlowOperatorTransformer(OperatorTransformer[EmrCreateJobFlowOperator]):
"""
Transforms the operator :class:`~airflow.contrib.operators.emr_create_job_flow_operator.EmrCreateJobFlowOperator`
"""
def __init__(self, target_dag: DAG, defaults: TransformerDefaults):
super().__init__(target_dag, defaults)
[docs] def get_cluster_name(self, src_operator: EmrCreateJobFlowOperator):
"""
get the cluster name from the EMR job flow and it's connection
"""
emr_conn_id = src_operator.emr_conn_id
session = settings.Session()
emr_conn = session.query(Connection).filter(Connection.conn_id == emr_conn_id).first()
config = emr_conn.extra_dejson.copy()
config.update(src_operator.job_flow_overrides)
return config['Name']
[docs] def transform(self, src_operator: BaseOperator, parent_fragment: DAGFragment, upstream_fragments: List[DAGFragment] = None) -> DAGFragment:
"""
Copies the :class:`~airflowhdi.operators.AzureHDInsightCreateClusterOperator` from the given
:paramref:`~ditto.api.TransformerDefaults.default_operator` provided and attaches it to the
target DAG after transferring airflow attributes from the source
:class:`~airflow.contrib.operators.emr_create_job_flow_operator.EmrCreateJobFlowOperator`
Relies on the default op as it is not possible to translate an EMR cluster spec to an HDInsight
cluster spec automatically, so it is best to accept that operator from the user itself
What it does do though is attach a :class:`~airflowhdi.sensors.AzureHDInsightClusterSensor`
to monitor the provisioning of this newly created cluster.
Since the HDInsight management client is idempotent, it does not matter if the cluster already exists
and the operator simply moves on if that is the case.
"""
create_cluster_op: AzureHDInsightCreateClusterOperator = self.get_default_op(src_operator)
if create_cluster_op is None:
raise Exception("This transformer needs a default output operator")
create_cluster_op.dag = self.dag
create_cluster_op.cluster_name = self.get_cluster_name(src_operator)
self.sign_op(create_cluster_op)
monitor_provisioning_op = AzureHDInsightClusterSensor(create_cluster_op.cluster_name,
azure_conn_id=create_cluster_op.azure_conn_id,
poke_interval=5,
provisioning_only=True,
task_id=f"{create_cluster_op.task_id}_monitor_provisioning",
dag=self.dag)
self.copy_op_attrs(monitor_provisioning_op, src_operator)
self.sign_op(monitor_provisioning_op)
create_cluster_op.set_downstream(monitor_provisioning_op)
return DAGFragment([create_cluster_op])