Source code for ditto.transformers.s3.s3_key_sensor_blob_transformer

from typing import List

from airflow import DAG
from airflow.contrib.sensors.wasb_sensor import WasbPrefixSensor
from airflow.models import BaseOperator
from airflow.operators.sensors import S3KeySensor

from ditto.api import OperatorTransformer, TransformerDefaults, DAGFragment, TransformerException
from airflowhdi.sensors import WasbWildcardPrefixSensor


[docs]class S3KeySensorBlobOperatorTransformer(OperatorTransformer[S3KeySensor]): """ Transforms a :class:`~airflow.operators.sensors.S3KeySensor` into either a :class:`~airflowhdi.sensors.WasbWildcardPrefixSensor` or a :class:`~airflow.contrib.sensors.wasb_sensor.WasbPrefixSensor` """ def __init__(self, target_dag: DAG, defaults: TransformerDefaults): super().__init__(target_dag, defaults)
[docs] def transform(self, src_operator: BaseOperator, parent_fragment: DAGFragment, upstream_fragments: List[DAGFragment]) -> DAGFragment: """ You need to add the ``wasb_conn_id`` to the source operator (or preferably DAG) for this to work. The ``container_name`` and ``prefix`` for the blob based sensors are coped from the ``bucket_name`` and ``bucket_key`` of the s3 sensor, so make sure they are templatized for changing between `s3://` and `wasb://` paths, etc. using config """ s3_key_sensor: S3KeySensor = src_operator wasb_conn_id = s3_key_sensor.params.get('wasb_conn_id', None) if not wasb_conn_id: wasb_conn_id = self.dag.params.get('wasb_conn_id', None) if not wasb_conn_id: raise TransformerException("Could not find wasb_conn_id in operator or DAG params") if s3_key_sensor.wildcard_match: wasb_sensor_op = WasbWildcardPrefixSensor( task_id=src_operator.task_id, wasb_conn_id=wasb_conn_id, container_name=s3_key_sensor.bucket_name, wildcard_prefix=s3_key_sensor.bucket_key, dag=self.dag ) else: wasb_sensor_op = WasbPrefixSensor( task_id=src_operator.task_id, wasb_conn_id=wasb_conn_id, container_name=s3_key_sensor.bucket_name, prefix=s3_key_sensor.bucket_key, dag=self.dag ) self.copy_op_attrs(wasb_sensor_op, src_operator) self.sign_op(wasb_sensor_op) return DAGFragment([wasb_sensor_op])