Source code for gokart.utils

from __future__ import annotations

import os
from collections.abc import Callable, Iterable
from io import BytesIO
from typing import Any, Literal, Protocol, TypeAlias, TypeVar, cast, get_args, get_origin

import dill
import luigi
import pandas as pd


class FileLike(Protocol):
    def read(self, n: int) -> bytes: ...

    def readline(self) -> bytes: ...

    def seek(self, offset: int) -> None: ...

    def seekable(self) -> bool: ...


[docs] def add_config(file_path: str) -> None: _, ext = os.path.splitext(file_path) luigi.configuration.core.parser = ext # type: ignore assert luigi.configuration.add_config_path(file_path)
T = TypeVar('T') FlattenableItems: TypeAlias = T | list['FlattenableItems[T]'] | tuple['FlattenableItems[T]', ...] | dict[str, 'FlattenableItems[T]'] def flatten(targets: FlattenableItems[T]) -> list[T]: """ Creates a flat list of all items in structured output (dicts, lists, items): .. code-block:: python >>> sorted(flatten({'a': 'foo', 'b': 'bar'})) ['bar', 'foo'] >>> sorted(flatten(['foo', ['bar', 'troll']])) ['bar', 'foo', 'troll'] >>> flatten('foo') ['foo'] >>> flatten(42) [42] This method is copied and modified from [luigi.task.flatten](https://github.com/spotify/luigi/blob/367edc2e3a099b8a0c2d15b1676269e33ad06117/luigi/task.py#L958) in accordance with [Apache License 2.0](https://github.com/spotify/luigi/blob/367edc2e3a099b8a0c2d15b1676269e33ad06117/LICENSE). """ if targets is None: return [] flat = [] if isinstance(targets, dict): for _, result in targets.items(): flat += flatten(result) return flat if isinstance(targets, str): return [targets] # type: ignore if not isinstance(targets, Iterable): return [targets] for result in targets: flat += flatten(result) return flat K = TypeVar('K') def map_flattenable_items(func: Callable[[T], K], items: FlattenableItems[T]) -> FlattenableItems[K]: if isinstance(items, dict): return {k: map_flattenable_items(func, v) for k, v in items.items()} if isinstance(items, tuple): return tuple(map_flattenable_items(func, i) for i in items) if isinstance(items, str): return func(items) # type: ignore if isinstance(items, list): return list(map(lambda item: map_flattenable_items(func, item), items)) return func(items) def load_dill_with_pandas_backward_compatibility(file: FileLike | BytesIO) -> Any: """Load binary dumped by dill with pandas backward compatibility. pd.read_pickle can load binary dumped in backward pandas version, and also any objects dumped by pickle. It is unclear whether all objects dumped by dill can be loaded by pd.read_pickle, we use dill.load as a fallback. """ try: return dill.load(file) except Exception: assert file.seekable(), f'{file} is not seekable.' file.seek(0) return pd.read_pickle(cast(Any, file)) def _resolve_type_var(type_arg: Any, substitutions: dict[TypeVar, Any]) -> Any: # Follow a chain of TypeVar substitutions (e.g. T_a -> T_b -> pl.DataFrame) to a concrete type. # seen guards against cyclic substitutions. seen: set[TypeVar] = set() while isinstance(type_arg, TypeVar) and type_arg in substitutions and type_arg not in seen: seen.add(type_arg) type_arg = substitutions[type_arg] return type_arg def get_dataframe_type_from_task(task: Any) -> Literal['pandas', 'polars', 'polars-lazy']: """ Extract DataFrame type from TaskOnKart[T] type parameter. Examines the type parameter T of a TaskOnKart subclass to determine whether it uses pandas or polars DataFrames/LazyFrames. Args: task: A TaskOnKart instance or class Returns: 'pandas', 'polars', or 'polars-lazy' (defaults to 'pandas' if type cannot be determined) Examples: >>> class MyTask(TaskOnKart[pd.DataFrame]): pass >>> get_dataframe_type_from_task(MyTask()) 'pandas' >>> class MyPolarsTask(TaskOnKart[pl.DataFrame]): pass >>> get_dataframe_type_from_task(MyPolarsTask()) 'polars' """ task_class = task if isinstance(task, type) else task.__class__ # Walk the MRO to find TaskOnKart[...] even when defined on a parent class mro = task_class.mro() if hasattr(task_class, 'mro') else [task_class] # Collect TypeVar bindings so TaskOnKart[T] bound through an intermediate generic class # (e.g. class Base(TaskOnKart[T]) with class Concrete(Base[pl.DataFrame])) resolves instead of # falling back to pandas. The MRO runs most-derived first, so bindings are recorded before the # TaskOnKart base that needs them, letting us collect and resolve in a single pass. type_var_substitutions: dict[TypeVar, Any] = {} for cls in mro: for base in getattr(cls, '__orig_bases__', ()): origin = get_origin(base) if origin is None: continue for parameter, arg in zip(getattr(origin, '__parameters__', ()), get_args(base), strict=False): if isinstance(parameter, TypeVar) and parameter not in type_var_substitutions: type_var_substitutions[parameter] = arg if hasattr(origin, '__name__') and origin.__name__ == 'TaskOnKart': args = get_args(base) if not args: continue df_type = _resolve_type_var(args[0], type_var_substitutions) module = getattr(df_type, '__module__', '') # Check module name to determine DataFrame type if 'polars' in module: name = getattr(df_type, '__name__', '') if name == 'LazyFrame': return 'polars-lazy' return 'polars' elif 'pandas' in module: return 'pandas' return 'pandas' # Default to pandas for backward compatibility