Source code for gokart.file_processor

"""File processor module with support for multiple DataFrame backends."""

from __future__ import annotations

import os
from typing import Any, Literal

# Export common processors and types from base
from gokart.file_processor.base import (
    BinaryFileProcessor,
    DataFrameType,
    FileProcessor,
    GzipFileProcessor,
    NpzFileProcessor,
    PickleFileProcessor,
    TextFileProcessor,
    XmlFileProcessor,
)

# Import backend-specific implementations
from gokart.file_processor.pandas import (
    CsvFileProcessorPandas,
    FeatherFileProcessorPandas,
    JsonFileProcessorPandas,
    ParquetFileProcessorPandas,
)
from gokart.file_processor.polars import (
    CsvFileProcessorPolars,
    FeatherFileProcessorPolars,
    JsonFileProcessorPolars,
    ParquetFileProcessorPolars,
)


[docs] class CsvFileProcessor(FileProcessor): """CSV file processor with automatic backend selection based on dataframe_type.""" def __init__(self, sep: str = ',', encoding: str = 'utf-8', dataframe_type: DataFrameType = 'pandas') -> None: """ CSV file processor with support for both pandas and polars DataFrames. Args: sep: CSV delimiter (default: ',') encoding: File encoding (default: 'utf-8') dataframe_type: DataFrame library to use for load() - 'pandas', 'polars', or 'polars-lazy' (default: 'pandas') """ self._sep = sep self._encoding = encoding self._dataframe_type = dataframe_type # Store for tests if dataframe_type == 'polars-lazy': self._impl: FileProcessor = CsvFileProcessorPolars(sep=sep, encoding=encoding, lazy=True) elif dataframe_type == 'polars': self._impl = CsvFileProcessorPolars(sep=sep, encoding=encoding, lazy=False) else: self._impl = CsvFileProcessorPandas(sep=sep, encoding=encoding)
[docs] def format(self): return self._impl.format()
[docs] def load(self, file): return self._impl.load(file)
[docs] def dump(self, obj, file): return self._impl.dump(obj, file)
[docs] class JsonFileProcessor(FileProcessor): """JSON file processor with automatic backend selection based on dataframe_type.""" def __init__(self, orient: Literal['split', 'records', 'index', 'table', 'columns', 'values'] | None = None, dataframe_type: DataFrameType = 'pandas'): """ JSON file processor with support for both pandas and polars DataFrames. Args: orient: JSON orientation. 'records' for newline-delimited JSON. dataframe_type: DataFrame library to use for load() - 'pandas', 'polars', or 'polars-lazy' (default: 'pandas') """ self._orient = orient self._dataframe_type = dataframe_type # Store for tests if dataframe_type == 'polars-lazy': self._impl: FileProcessor = JsonFileProcessorPolars(orient=orient, lazy=True) elif dataframe_type == 'polars': self._impl = JsonFileProcessorPolars(orient=orient, lazy=False) else: self._impl = JsonFileProcessorPandas(orient=orient)
[docs] def format(self): return self._impl.format()
[docs] def load(self, file): return self._impl.load(file)
[docs] def dump(self, obj, file): return self._impl.dump(obj, file)
[docs] class ParquetFileProcessor(FileProcessor): """Parquet file processor with automatic backend selection based on dataframe_type.""" def __init__(self, engine: Any = 'pyarrow', compression: Any = None, dataframe_type: DataFrameType = 'pandas') -> None: """ Parquet file processor with support for both pandas and polars DataFrames. Args: engine: Parquet engine (pandas-specific, ignored for polars). compression: Compression type. dataframe_type: DataFrame library to use for load() - 'pandas', 'polars', or 'polars-lazy' (default: 'pandas') """ self._engine = engine self._compression = compression self._dataframe_type = dataframe_type # Store for tests if dataframe_type == 'polars-lazy': self._impl: FileProcessor = ParquetFileProcessorPolars(engine=engine, compression=compression, lazy=True) elif dataframe_type == 'polars': self._impl = ParquetFileProcessorPolars(engine=engine, compression=compression, lazy=False) else: self._impl = ParquetFileProcessorPandas(engine=engine, compression=compression)
[docs] def format(self): return self._impl.format()
[docs] def load(self, file): return self._impl.load(file)
[docs] def dump(self, obj, file): # Use the configured implementation (pandas by default) return self._impl.dump(obj, file)
[docs] class FeatherFileProcessor(FileProcessor): """Feather file processor with automatic backend selection based on dataframe_type.""" def __init__(self, store_index_in_feather: bool, dataframe_type: DataFrameType = 'pandas'): """ Feather file processor with support for both pandas and polars DataFrames. Args: store_index_in_feather: Whether to store pandas index (pandas-only feature). dataframe_type: DataFrame library to use for load() - 'pandas', 'polars', or 'polars-lazy' (default: 'pandas') """ self._store_index_in_feather = store_index_in_feather self._dataframe_type = dataframe_type # Store for tests if dataframe_type == 'polars-lazy': self._impl: FileProcessor = FeatherFileProcessorPolars(store_index_in_feather=store_index_in_feather, lazy=True) elif dataframe_type == 'polars': self._impl = FeatherFileProcessorPolars(store_index_in_feather=store_index_in_feather, lazy=False) else: self._impl = FeatherFileProcessorPandas(store_index_in_feather=store_index_in_feather)
[docs] def format(self): return self._impl.format()
[docs] def load(self, file): return self._impl.load(file)
[docs] def dump(self, obj, file): # Use the configured implementation (pandas by default) return self._impl.dump(obj, file)
[docs] def make_file_processor(file_path: str, store_index_in_feather: bool = True, *, dataframe_type: DataFrameType = 'pandas') -> FileProcessor: """Create a file processor based on file extension with default parameters.""" extension2processor = { '.txt': TextFileProcessor(), '.ini': TextFileProcessor(), '.csv': CsvFileProcessor(sep=',', dataframe_type=dataframe_type), '.tsv': CsvFileProcessor(sep='\t', dataframe_type=dataframe_type), '.pkl': PickleFileProcessor(), '.gz': GzipFileProcessor(), '.json': JsonFileProcessor(dataframe_type=dataframe_type), '.ndjson': JsonFileProcessor(dataframe_type=dataframe_type, orient='records'), '.xml': XmlFileProcessor(), '.npz': NpzFileProcessor(), '.parquet': ParquetFileProcessor(compression='gzip', dataframe_type=dataframe_type), '.feather': FeatherFileProcessor(store_index_in_feather=store_index_in_feather, dataframe_type=dataframe_type), '.png': BinaryFileProcessor(), '.jpg': BinaryFileProcessor(), } extension = os.path.splitext(file_path)[1] assert extension in extension2processor, f'{extension} is not supported. The supported extensions are {list(extension2processor.keys())}.' return extension2processor[extension]
__all__ = [ # Base classes and types 'FileProcessor', 'DataFrameType', # Common processors 'BinaryFileProcessor', 'PickleFileProcessor', 'TextFileProcessor', 'GzipFileProcessor', 'XmlFileProcessor', 'NpzFileProcessor', # DataFrame processors (with factory pattern) 'CsvFileProcessor', 'JsonFileProcessor', 'ParquetFileProcessor', 'FeatherFileProcessor', # Utility functions 'make_file_processor', ]