import itertools
import os
import pathlib
from logging import getLogger
import luigi
import gokart
logger = getLogger(__name__)
def _get_all_output_file_paths(task: gokart.TaskOnKart):
output_paths = [t.path() for t in luigi.task.flatten(task.output())]
children = luigi.task.flatten(task.requires())
output_paths.extend(itertools.chain.from_iterable([_get_all_output_file_paths(child) for child in children]))
return output_paths
[docs]def delete_local_unnecessary_outputs(task: gokart.TaskOnKart):
task.make_unique_id() # this is required to make unique ids.
all_files = {str(path) for path in pathlib.Path(task.workspace_directory).rglob('*.*')}
log_files = {str(path) for path in pathlib.Path(os.path.join(task.workspace_directory, 'log')).rglob('*.*')}
necessary_files = set(_get_all_output_file_paths(task))
unnecessary_files = all_files - necessary_files - log_files
if len(unnecessary_files) == 0:
logger.info(f'all files are necessary for this task.')
else:
logger.info(f'remove following files: {os.linesep} {os.linesep.join(unnecessary_files)}')
for file in unnecessary_files:
os.remove(file)