Source code for ditto.resolvers.class_transformer_resolver
from typing import Dict, Type
from airflow.models import BaseOperator
from ditto.api import TransformerResolver, OperatorTransformer
[docs]class ClassTransformerResolver(TransformerResolver):
"""
Finds a resolver for a transformer based on the class of the source operator given
"""
def __init__(self, operator_transformers: Dict[Type[BaseOperator], Type[OperatorTransformer]]):
"""
:param operator_transformers: a map of operator type and transformer type
"""
super().__init__()
self.operator_transformers = operator_transformers
[docs] def get_transformer_for_class(self, task_cls: Type) -> Type[OperatorTransformer]:
"""
return the transformer type for a given operator type
:param task_cls: the operator type (class)
:return: the transformer type
"""
if task_cls in self.operator_transformers:
transformer_cl = self.operator_transformers[task_cls]
return transformer_cl
[docs] def resolve_transformer(self, task: BaseOperator) -> Type[OperatorTransformer]:
return self.get_transformer_for_class(task.__class__)