from __future__ import annotations
from abc import abstractmethod
from logging import getLogger
from typing import Any
import luigi
import numpy as np
import pandas as pd
from luigi.task_register import Register
logger = getLogger(__name__)
class PandasTypeError(Exception):
"""Raised when the type of the pandas DataFrame column is not as expected."""
[docs]
class PandasTypeConfig(luigi.Config):
[docs]
@classmethod
@abstractmethod
def type_dict(cls) -> dict[str, Any]:
pass
[docs]
@classmethod
def check(cls, df: pd.DataFrame) -> None:
for column_name, column_type in cls.type_dict().items():
cls._check_column(df, column_name, column_type)
@classmethod
def _check_column(cls, df: pd.DataFrame, column_name: str, column_type: type) -> None:
if column_name not in df.columns:
return
if not np.all(list(map(lambda x: isinstance(x, column_type), df[column_name]))):
not_match = next(filter(lambda x: not isinstance(x, column_type), df[column_name]))
raise PandasTypeError(f'expected type is "{column_type}", but "{type(not_match)}" is passed in column "{column_name}".')
class PandasTypeConfigMap(luigi.Config):
"""To initialize this class only once, this inherits luigi.Config."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
task_names = Register.task_names()
task_classes = [Register.get_task_cls(task_name) for task_name in task_names]
self._map = {
task_class.task_namespace: task_class for task_class in task_classes if issubclass(task_class, PandasTypeConfig) and task_class != PandasTypeConfig
}
def check(self, obj: Any, task_namespace: str) -> None:
if isinstance(obj, pd.DataFrame) and task_namespace in self._map:
self._map[task_namespace].check(obj)