from __future__ import annotations
import os
from collections.abc import Callable, Iterable
from io import BytesIO
from typing import Any, Literal, Protocol, TypeAlias, TypeVar, cast, get_args, get_origin
import dill
import luigi
import pandas as pd
class FileLike(Protocol):
def read(self, n: int) -> bytes: ...
def readline(self) -> bytes: ...
def seek(self, offset: int) -> None: ...
def seekable(self) -> bool: ...
[docs]
def add_config(file_path: str) -> None:
_, ext = os.path.splitext(file_path)
luigi.configuration.core.parser = ext # type: ignore
assert luigi.configuration.add_config_path(file_path)
T = TypeVar('T')
FlattenableItems: TypeAlias = T | list['FlattenableItems[T]'] | tuple['FlattenableItems[T]', ...] | dict[str, 'FlattenableItems[T]']
def flatten(targets: FlattenableItems[T]) -> list[T]:
"""
Creates a flat list of all items in structured output (dicts, lists, items):
.. code-block:: python
>>> sorted(flatten({'a': 'foo', 'b': 'bar'}))
['bar', 'foo']
>>> sorted(flatten(['foo', ['bar', 'troll']]))
['bar', 'foo', 'troll']
>>> flatten('foo')
['foo']
>>> flatten(42)
[42]
This method is copied and modified from [luigi.task.flatten](https://github.com/spotify/luigi/blob/367edc2e3a099b8a0c2d15b1676269e33ad06117/luigi/task.py#L958) in accordance with [Apache License 2.0](https://github.com/spotify/luigi/blob/367edc2e3a099b8a0c2d15b1676269e33ad06117/LICENSE).
"""
if targets is None:
return []
flat = []
if isinstance(targets, dict):
for _, result in targets.items():
flat += flatten(result)
return flat
if isinstance(targets, str):
return [targets] # type: ignore
if not isinstance(targets, Iterable):
return [targets]
for result in targets:
flat += flatten(result)
return flat
K = TypeVar('K')
def map_flattenable_items(func: Callable[[T], K], items: FlattenableItems[T]) -> FlattenableItems[K]:
if isinstance(items, dict):
return {k: map_flattenable_items(func, v) for k, v in items.items()}
if isinstance(items, tuple):
return tuple(map_flattenable_items(func, i) for i in items)
if isinstance(items, str):
return func(items) # type: ignore
if isinstance(items, list):
return list(map(lambda item: map_flattenable_items(func, item), items))
return func(items)
def load_dill_with_pandas_backward_compatibility(file: FileLike | BytesIO) -> Any:
"""Load binary dumped by dill with pandas backward compatibility.
pd.read_pickle can load binary dumped in backward pandas version, and also any objects dumped by pickle.
It is unclear whether all objects dumped by dill can be loaded by pd.read_pickle, we use dill.load as a fallback.
"""
try:
return dill.load(file)
except Exception:
assert file.seekable(), f'{file} is not seekable.'
file.seek(0)
return pd.read_pickle(cast(Any, file))
def _resolve_type_var(type_arg: Any, substitutions: dict[TypeVar, Any]) -> Any:
# Follow a chain of TypeVar substitutions (e.g. T_a -> T_b -> pl.DataFrame) to a concrete type.
# seen guards against cyclic substitutions.
seen: set[TypeVar] = set()
while isinstance(type_arg, TypeVar) and type_arg in substitutions and type_arg not in seen:
seen.add(type_arg)
type_arg = substitutions[type_arg]
return type_arg
def get_dataframe_type_from_task(task: Any) -> Literal['pandas', 'polars', 'polars-lazy']:
"""
Extract DataFrame type from TaskOnKart[T] type parameter.
Examines the type parameter T of a TaskOnKart subclass to determine
whether it uses pandas or polars DataFrames/LazyFrames.
Args:
task: A TaskOnKart instance or class
Returns:
'pandas', 'polars', or 'polars-lazy' (defaults to 'pandas' if type cannot be determined)
Examples:
>>> class MyTask(TaskOnKart[pd.DataFrame]): pass
>>> get_dataframe_type_from_task(MyTask())
'pandas'
>>> class MyPolarsTask(TaskOnKart[pl.DataFrame]): pass
>>> get_dataframe_type_from_task(MyPolarsTask())
'polars'
"""
task_class = task if isinstance(task, type) else task.__class__
# Walk the MRO to find TaskOnKart[...] even when defined on a parent class
mro = task_class.mro() if hasattr(task_class, 'mro') else [task_class]
# Collect TypeVar bindings so TaskOnKart[T] bound through an intermediate generic class
# (e.g. class Base(TaskOnKart[T]) with class Concrete(Base[pl.DataFrame])) resolves instead of
# falling back to pandas. The MRO runs most-derived first, so bindings are recorded before the
# TaskOnKart base that needs them, letting us collect and resolve in a single pass.
type_var_substitutions: dict[TypeVar, Any] = {}
for cls in mro:
for base in getattr(cls, '__orig_bases__', ()):
origin = get_origin(base)
if origin is None:
continue
for parameter, arg in zip(getattr(origin, '__parameters__', ()), get_args(base), strict=False):
if isinstance(parameter, TypeVar) and parameter not in type_var_substitutions:
type_var_substitutions[parameter] = arg
if hasattr(origin, '__name__') and origin.__name__ == 'TaskOnKart':
args = get_args(base)
if not args:
continue
df_type = _resolve_type_var(args[0], type_var_substitutions)
module = getattr(df_type, '__module__', '')
# Check module name to determine DataFrame type
if 'polars' in module:
name = getattr(df_type, '__name__', '')
if name == 'LazyFrame':
return 'polars-lazy'
return 'polars'
elif 'pandas' in module:
return 'pandas'
return 'pandas' # Default to pandas for backward compatibility