Source code for ditto.transformers.s3.s3_key_sensor_adlsgen1_transformer
from typing import List
from airflow import DAG
from airflow.models import BaseOperator
from airflow.operators.sensors import S3KeySensor
from ditto.api import OperatorTransformer, TransformerDefaults, DAGFragment, TransformerException
from airflowhdi.sensors import AzureDataLakeStorageGen1WebHdfsSensor
[docs]class S3KeySensorAdlsGen1OperatorTransformer(OperatorTransformer[S3KeySensor]):
"""
Transforms a :class:`~airflow.operators.sensors.S3KeySensor`
into a :class:`~airflowhdi.sensors.AzureDataLakeStorageGen1WebHdfsSensor`
"""
def __init__(self, target_dag: DAG, defaults: TransformerDefaults):
super().__init__(target_dag, defaults)
[docs] def transform(self, src_operator: BaseOperator, parent_fragment: DAGFragment, upstream_fragments: List[DAGFragment]) -> DAGFragment:
"""
You need to add the ``adls_conn_id`` to the source operator (or preferably DAG) for this to work.
The ``glob_path`` for the ADLS sensor is coped from the ``bucket_key`` of the s3 sensor,
so make sure that is templatized for changing between `s3://` and `adls://` paths using config
"""
s3_key_sensor: S3KeySensor = src_operator
adls_conn_id = s3_key_sensor.params.get('adls_conn_id', None)
if not adls_conn_id:
adls_conn_id = self.dag.params.get('adls_conn_id', None)
if not adls_conn_id:
raise TransformerException("Could not find adls_conn_id in operator or DAG params")
adls_gen1_sensor_op = AzureDataLakeStorageGen1WebHdfsSensor(
task_id=src_operator.task_id,
azure_data_lake_conn_id=adls_conn_id,
glob_path=s3_key_sensor.bucket_key,
dag=self.dag
)
self.copy_op_attrs(adls_gen1_sensor_op, src_operator)
self.sign_op(adls_gen1_sensor_op)
return DAGFragment([adls_gen1_sensor_op])