Source code for ditto.transformers.emr.emr_add_steps_op_transformer
from typing import List
from airflow import DAG
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.models import BaseOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
from ditto.api import OperatorTransformer, TransformerDefaults, DAGFragment, UpstreamOperatorNotFoundException
from ditto.utils import TransformerUtils
from airflowhdi.operators.azure_hdinsight_create_cluster_operator import ConnectedAzureHDInsightCreateClusterOperator
from airflowhdi.operators.azure_hdinsight_ssh_operator import AzureHDInsightSshOperator
from airflowlivy.operators.livy_batch_operator import LivyBatchOperator
[docs]class EmrAddStepsOperatorTransformer(OperatorTransformer[EmrAddStepsOperator]):
"""
Transforms the operator :class:`~airflow.contrib.operators.emr_add_steps_operator.EmrAddStepsOperator`
"""
#: default livy proxy user
DEFAULT_PROXY_USER = "admin"
#: default spark configuration to use
DEFAULT_SPARK_CONF = {'spark.shuffle.compress': 'false'}
#: number of mappers to use in case its a distcp
HADOOP_DISTCP_DEFAULT_MAPPERS = 100
def __init__(self, target_dag: DAG, defaults: TransformerDefaults):
super().__init__(target_dag, defaults)
self.proxy_user = self.defaults.other_defaults.get('proxy_user', EmrAddStepsOperatorTransformer.DEFAULT_PROXY_USER)
self.spark_conf = self.defaults.other_defaults.get('spark_conf', EmrAddStepsOperatorTransformer.DEFAULT_SPARK_CONF)
[docs] @staticmethod
def get_target_step_task_id(add_step_task_id, add_step_step_id):
"""
generates a task_id for the transformed output operators
for each input EMR step
"""
return f"{add_step_task_id}_{add_step_step_id}"
[docs] def transform(self, src_operator: BaseOperator, parent_fragment: DAGFragment, upstream_fragments: List[DAGFragment]) -> DAGFragment:
"""
This transformer assumes and relies on the fact that an upstream transformation
of a :class:`~airflow.contrib.operators.emr_create_job_flow_operator.EmrCreateJobFlowOperator`
has already taken place, since it needs to find the output of that transformation
to get the `cluster_name` and `azure_conn_id` from that operator (which should have been a
:class:`~airflowhdi.operators.AzureHDInsightCreateClusterOperator`)
It then goes through the EMR steps of this :class:`~airflow.contrib.operators.emr_add_steps_operator.EmrAddStepsOperator`
and creates a :class:`~airflowhdi.operators.LivyBatchOperator` or an :class:`~airflowhdi.operators.AzureHDInsightSshOperator`
for each corresponding step, based on grokking the step's params and figuring out whether its a spark job being run
on an arbitrary hadoop command like `distcp`, `hdfs` or the like.
.. note::
This transformer creates multiple operators from a single source operator
.. note::
The spark configuration for the livy spark job are derived from `step['HadoopJarStep']['Properties']` of the EMR step,
or could even be specified at the cluster level itself when transforming the job flow
"""
create_op_task_id = TransformerUtils.get_task_id_from_xcom_pull(src_operator.job_flow_id)
create_op: BaseOperator = \
TransformerUtils.find_op_in_fragment_list(
upstream_fragments,
operator_type=ConnectedAzureHDInsightCreateClusterOperator,
task_id=create_op_task_id)
if not create_op:
raise UpstreamOperatorNotFoundException(ConnectedAzureHDInsightCreateClusterOperator,
EmrAddStepsOperator)
emr_add_steps_op: EmrAddStepsOperator = src_operator
dag_fragment_steps = []
steps_added_op = DummyOperator(
task_id=f"{emr_add_steps_op.task_id}_added",
dag=self.dag)
self.sign_op(steps_added_op)
for step in emr_add_steps_op.steps:
name = step['Name']
ssh_command = None
livy_file = None
livy_arguments = None
livy_main_class = None
if 'command-runner' in step['HadoopJarStep']['Jar']:
command_runner_cmd = step['HadoopJarStep']['Args']
if '/usr/bin/spark-submit' in command_runner_cmd[0]:
livy_file = command_runner_cmd[1]
livy_arguments = command_runner_cmd[2:]
elif 's3-dist-cp' in command_runner_cmd[0]:
src = None
dest = None
for arg in command_runner_cmd[1:]:
if arg.startswith('--src='):
src = arg.split("--src=", 1)[1]
if arg.startswith('--dest='):
dest = arg.split("--dest=", 1)[1]
mappers = EmrAddStepsOperatorTransformer.HADOOP_DISTCP_DEFAULT_MAPPERS
ssh_command = f"hadoop distcp -m {mappers} {src} {dest}"
elif 'hdfs' in command_runner_cmd[0]:
ssh_command = " ".join(command_runner_cmd)
else:
raise Exception("This kind of step is not supported right now", command_runner_cmd[0])
else:
livy_file = step['HadoopJarStep']['Jar']
livy_arguments = step['HadoopJarStep']['Args']
livy_main_class = step['HadoopJarStep'].get('MainClass', None)
if 'Properties' in step['HadoopJarStep']:
properties = ""
for key, val in step['HadoopJarStep']['Properties']:
properties += f"-D{key}={val} "
self.spark_conf['spark.executor.extraJavaOptions'] = properties
self.spark_conf['spark.driver.extraJavaOptions'] = properties
target_step_task_id = EmrAddStepsOperatorTransformer.get_target_step_task_id(emr_add_steps_op.task_id, emr_add_steps_op.steps.index(step))
if ssh_command is not None:
step_op = AzureHDInsightSshOperator(
cluster_name=create_op.cluster_name,
azure_conn_id=create_op.azure_conn_id,
command=ssh_command,
task_id=target_step_task_id,
dag=self.dag
)
else:
step_op = LivyBatchOperator(
name=name,
file=livy_file,
arguments=livy_arguments,
class_name=livy_main_class,
azure_conn_id=create_op.azure_conn_id,
cluster_name=create_op.cluster_name,
proxy_user=self.proxy_user,
conf=self.spark_conf,
task_id=target_step_task_id,
dag=self.dag
)
self.copy_op_attrs(step_op, emr_add_steps_op)
self.sign_op(step_op)
step_op.trigger_rule = TriggerRule.ALL_SUCCESS
step_op.set_downstream(steps_added_op)
dag_fragment_steps.append(step_op)
return DAGFragment(dag_fragment_steps)