Source code for ditto.resolvers.python_call_transformer_resolver

import inspect
from typing import Dict, Callable, Type

from airflow.models import BaseOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator

from ditto.api import TransformerResolver, OperatorTransformer


[docs]class PythonCallTransformerResolver(TransformerResolver): """ Similar to the :class:`~ditto.matchers.PythonCallTaskMatcher`, but finds a transformer for a operator using the same matching pattern. """ def __init__(self, callable_transformers: Dict[Callable, Type[OperatorTransformer]], nested_search: bool = True): """ :param callable_transformers: a map of python callables and matching transformers :param nested_search: whether to search the method source code itself for the callable """ self.transformers_for_callables = callable_transformers self.nested_search = nested_search
[docs] def resolve_transformer(self, task: BaseOperator) -> Type[OperatorTransformer]: """ see the behavior of :meth:`~ditto.matchers.PythonCallTaskMatcher.does_match` :param task: the task to resolve the transformer for :return: the resolved transformer """ if isinstance(task, PythonOperator) or isinstance(task, BranchPythonOperator): python_op: PythonOperator = task py_callable = python_op.python_callable src_lines = inspect.getsourcelines(py_callable)[0] if self.nested_search: for line in src_lines: for find_callable in self.transformers_for_callables: if find_callable.__name__ in line: return self.transformers_for_callables[find_callable] else: for find_callable in self.transformers_for_callables: if find_callable is py_callable: return self.transformers_for_callables[find_callable]