import hashlib
import os
import shutil
from abc import abstractmethod
from datetime import datetime
from glob import glob
from logging import getLogger
from typing import Any, Optional
import luigi
import numpy as np
import pandas as pd
from tqdm import tqdm
from gokart.file_processor import FileProcessor, make_file_processor
from gokart.object_storage import ObjectStorage
from gokart.redis_lock import RedisParams, make_redis_params, wrap_with_dump_lock, wrap_with_load_lock, wrap_with_remove_lock, wrap_with_run_lock
from gokart.zip_client_util import make_zip_client
logger = getLogger(__name__)
[docs]class TargetOnKart(luigi.Target):
[docs] def exists(self) -> bool:
return self._exists()
[docs] def load(self) -> Any:
return wrap_with_load_lock(func=self._load, redis_params=self._get_redis_params())()
[docs] def dump(self, obj, lock_at_dump: bool = True) -> None:
if lock_at_dump:
wrap_with_dump_lock(func=self._dump, redis_params=self._get_redis_params(), exist_check=self.exists)(obj)
else:
self._dump(obj)
[docs] def remove(self) -> None:
if self.exists():
wrap_with_remove_lock(self._remove, redis_params=self._get_redis_params())()
[docs] def last_modification_time(self) -> datetime:
return self._last_modification_time()
[docs] def path(self) -> str:
return self._path()
[docs] def wrap_with_lock(self, func):
return wrap_with_run_lock(func=func, redis_params=self._get_redis_params())
@abstractmethod
def _exists(self) -> bool:
pass
@abstractmethod
def _get_redis_params(self) -> RedisParams:
pass
@abstractmethod
def _load(self) -> Any:
pass
@abstractmethod
def _dump(self, obj) -> None:
pass
@abstractmethod
def _remove(self) -> None:
pass
@abstractmethod
def _last_modification_time(self) -> datetime:
pass
@abstractmethod
def _path(self) -> str:
pass
[docs]class SingleFileTarget(TargetOnKart):
def __init__(
self,
target: luigi.target.FileSystemTarget,
processor: FileProcessor,
redis_params: RedisParams,
) -> None:
self._target = target
self._processor = processor
self._redis_params = redis_params
def _exists(self) -> bool:
return self._target.exists()
def _get_redis_params(self) -> RedisParams:
return self._redis_params
def _load(self) -> Any:
with self._target.open('r') as f:
return self._processor.load(f)
def _dump(self, obj) -> None:
with self._target.open('w') as f:
self._processor.dump(obj, f)
def _remove(self) -> None:
self._target.remove()
def _last_modification_time(self) -> datetime:
return _get_last_modification_time(self._target.path)
def _path(self) -> str:
return self._target.path
[docs]class ModelTarget(TargetOnKart):
def __init__(
self,
file_path: str,
temporary_directory: str,
load_function,
save_function,
redis_params: RedisParams,
) -> None:
self._zip_client = make_zip_client(file_path, temporary_directory)
self._temporary_directory = temporary_directory
self._save_function = save_function
self._load_function = load_function
self._redis_params = redis_params
def _exists(self) -> bool:
return self._zip_client.exists()
def _get_redis_params(self) -> RedisParams:
return self._redis_params
def _load(self) -> Any:
self._zip_client.unpack_archive()
self._load_function = self._load_function or make_target(self._load_function_path()).load()
model = self._load_function(self._model_path())
self._remove_temporary_directory()
return model
def _dump(self, obj) -> None:
self._make_temporary_directory()
self._save_function(obj, self._model_path())
make_target(self._load_function_path()).dump(self._load_function)
self._zip_client.make_archive()
self._remove_temporary_directory()
def _remove(self) -> None:
self._zip_client.remove()
def _last_modification_time(self) -> datetime:
return _get_last_modification_time(self._zip_client.path)
def _path(self) -> str:
return self._zip_client.path
def _model_path(self):
return os.path.join(self._temporary_directory, 'model.pkl')
def _load_function_path(self):
return os.path.join(self._temporary_directory, 'load_function.pkl')
def _remove_temporary_directory(self):
shutil.rmtree(self._temporary_directory)
def _make_temporary_directory(self):
os.makedirs(self._temporary_directory, exist_ok=True)
[docs]class LargeDataFrameProcessor(object):
def __init__(self, max_byte: int):
self.max_byte = int(max_byte)
[docs] def save(self, df: pd.DataFrame, file_path: str):
dir_path = os.path.dirname(file_path)
os.makedirs(dir_path, exist_ok=True)
if df.empty:
df.to_pickle(os.path.join(dir_path, 'data_0.pkl'))
return
split_size = df.values.nbytes // self.max_byte + 1
logger.info(f'saving a large pdDataFrame with split_size={split_size}')
for i, idx in tqdm(list(enumerate(np.array_split(range(df.shape[0]), split_size)))):
df.iloc[idx[0]:idx[-1] + 1].to_pickle(os.path.join(dir_path, f'data_{i}.pkl'))
[docs] @staticmethod
def load(file_path: str) -> pd.DataFrame:
dir_path = os.path.dirname(file_path)
return pd.concat([pd.read_pickle(file_path) for file_path in glob(os.path.join(dir_path, 'data_*.pkl'))])
def _make_file_system_target(file_path: str, processor: Optional[FileProcessor] = None, store_index_in_feather: bool = True) -> luigi.target.FileSystemTarget:
processor = processor or make_file_processor(file_path, store_index_in_feather=store_index_in_feather)
if ObjectStorage.if_object_storage_path(file_path):
return ObjectStorage.get_object_storage_target(file_path, processor.format())
return luigi.LocalTarget(file_path, format=processor.format())
def _make_file_path(original_path: str, unique_id: Optional[str] = None) -> str:
if unique_id is not None:
[base, extension] = os.path.splitext(original_path)
return base + '_' + unique_id + extension
return original_path
def _get_last_modification_time(path: str) -> datetime:
if ObjectStorage.if_object_storage_path(path):
if ObjectStorage.exists(path):
return ObjectStorage.get_timestamp(path)
raise FileNotFoundError(f'No such file or directory: {path}')
return datetime.fromtimestamp(os.path.getmtime(path))
[docs]def make_target(file_path: str,
unique_id: Optional[str] = None,
processor: Optional[FileProcessor] = None,
redis_params: RedisParams = None,
store_index_in_feather: bool = True) -> TargetOnKart:
_redis_params = redis_params if redis_params is not None else make_redis_params(file_path=file_path, unique_id=unique_id)
file_path = _make_file_path(file_path, unique_id)
processor = processor or make_file_processor(file_path, store_index_in_feather=store_index_in_feather)
file_system_target = _make_file_system_target(file_path, processor=processor, store_index_in_feather=store_index_in_feather)
return SingleFileTarget(target=file_system_target, processor=processor, redis_params=_redis_params)
[docs]def make_model_target(file_path: str,
temporary_directory: str,
save_function,
load_function,
unique_id: Optional[str] = None,
redis_params: RedisParams = None) -> TargetOnKart:
_redis_params = redis_params if redis_params is not None else make_redis_params(file_path=file_path, unique_id=unique_id)
file_path = _make_file_path(file_path, unique_id)
temporary_directory = os.path.join(temporary_directory, hashlib.md5(file_path.encode()).hexdigest())
return ModelTarget(file_path=file_path,
temporary_directory=temporary_directory,
save_function=save_function,
load_function=load_function,
redis_params=_redis_params)