Source code for gokart.run

from __future__ import annotations

import logging
import os
import sys
from logging import getLogger
from typing import Any, cast

import luigi
import luigi.cmdline
import luigi.cmdline_parser
import luigi.execution_summary
import luigi.interface
import luigi.retcodes
import luigi.setup_logging
from luigi.cmdline_parser import CmdlineParser

import gokart
import gokart.slack
from gokart.build import WorkerSchedulerFactory
from gokart.object_storage import ObjectStorage

logger = getLogger(__name__)


def _run_tree_info(cmdline_args, details):
    with CmdlineParser.global_instance(cmdline_args) as cp:
        gokart.tree_info().output().dump(gokart.make_tree_info(cp.get_task_obj(), details=details))


def _try_tree_info(cmdline_args):
    with CmdlineParser.global_instance(cmdline_args):
        mode = gokart.tree_info().mode
        output_path = gokart.tree_info().output().path()

    # do nothing if `mode` is empty.
    if mode == '':
        return

    # output tree info and exit.
    if mode == 'simple':
        _run_tree_info(cmdline_args, details=False)
    elif mode == 'all':
        _run_tree_info(cmdline_args, details=True)
    else:
        raise ValueError(f'--tree-info-mode must be "simple" or "all", but "{mode}" is passed.')
    logger.info(f'output tree info: {output_path}')
    sys.exit()


def _try_to_delete_unnecessary_output_file(cmdline_args: list[str]) -> None:
    with CmdlineParser.global_instance(cmdline_args) as cp:
        task: gokart.TaskOnKart[Any] = cp.get_task_obj()
        if task.delete_unnecessary_output_files:
            if ObjectStorage.if_object_storage_path(task.workspace_directory):
                logger.info('delete-unnecessary-output-files is not support s3/gcs.')
            else:
                gokart.delete_local_unnecessary_outputs(task)
            sys.exit()


def _try_get_slack_api(cmdline_args: list[str]) -> gokart.slack.SlackAPI | None:
    with CmdlineParser.global_instance(cmdline_args):
        config = gokart.slack.SlackConfig()
        token = os.getenv(config.token_name, '')
        channel = config.channel
        to_user = config.to_user
        if token and channel:
            logger.info('Slack notification is activated.')
            return gokart.slack.SlackAPI(token=token, channel=channel, to_user=to_user)
    logger.info('Slack notification is not activated.')
    return None


def _try_to_send_event_summary_to_slack(
    slack_api: gokart.slack.SlackAPI | None, event_aggregator: gokart.slack.EventAggregator, cmdline_args: list[str]
) -> None:
    if slack_api is None:
        # do nothing
        return
    options = gokart.slack.SlackConfig()
    with CmdlineParser.global_instance(cmdline_args) as cp:
        task = cp.get_task_obj()
        tree_info = gokart.make_tree_info(task, details=True) if options.send_tree_info else 'Please add SlackConfig.send_tree_info to include tree-info'
        task_name = type(task).__name__

    comment = f'Report of {task_name}' + os.linesep + event_aggregator.get_summary()
    content = os.linesep.join(['===== Event List ====', event_aggregator.get_event_list(), os.linesep, '==== Tree Info ====', tree_info])
    slack_api.send_snippet(comment=comment, title='event.txt', content=content)


def _run_with_retcodes(argv):
    """run_with_retcodes equivalent that uses gokart's WorkerSchedulerFactory."""
    retcode_logger = logging.getLogger('luigi-interface')
    with luigi.cmdline_parser.CmdlineParser.global_instance(argv):
        retcodes = luigi.retcodes.retcode()

    worker = None
    try:
        worker = luigi.interface._run(argv, worker_scheduler_factory=WorkerSchedulerFactory()).worker
    except luigi.interface.PidLockAlreadyTakenExit:
        sys.exit(retcodes.already_running)
    except Exception:
        env_params = luigi.interface.core()
        luigi.setup_logging.InterfaceLogging.setup(cast(Any, env_params))
        retcode_logger.exception('Uncaught exception in luigi')
        sys.exit(retcodes.unhandled_exception)

    with luigi.cmdline_parser.CmdlineParser.global_instance(argv):
        task_sets = luigi.execution_summary._summary_dict(worker)
        root_task = luigi.execution_summary._root_task(worker)
        non_empty_categories = {k: v for k, v in task_sets.items() if v}.keys()

    def has(status):
        assert status in luigi.execution_summary._ORDERED_STATUSES
        return status in non_empty_categories

    codes_and_conds = (
        (retcodes.missing_data, has('still_pending_ext')),
        (retcodes.task_failed, has('failed')),
        (retcodes.already_running, has('run_by_other_worker')),
        (retcodes.scheduling_error, has('scheduling_error')),
        (retcodes.not_run, has('not_run')),
    )
    expected_ret_code = max(code * (1 if cond else 0) for code, cond in codes_and_conds)

    if expected_ret_code == 0 and root_task not in task_sets['completed'] and root_task not in task_sets['already_done']:
        sys.exit(retcodes.not_run)
    else:
        sys.exit(expected_ret_code)


[docs] def run(cmdline_args=None, set_retcode=True): cmdline_args = cmdline_args or sys.argv[1:] if set_retcode: luigi.retcodes.retcode.already_running = 10 # type: ignore luigi.retcodes.retcode.missing_data = 20 # type: ignore luigi.retcodes.retcode.not_run = 30 # type: ignore luigi.retcodes.retcode.task_failed = 40 # type: ignore luigi.retcodes.retcode.scheduling_error = 50 # type: ignore _try_tree_info(cmdline_args) _try_to_delete_unnecessary_output_file(cmdline_args) gokart.testing.try_to_run_test_for_empty_data_frame(cmdline_args) # pyright: ignore[reportAttributeAccessIssue] slack_api = _try_get_slack_api(cmdline_args) event_aggregator = gokart.slack.EventAggregator() try: event_aggregator.set_handlers() _run_with_retcodes(cmdline_args) except SystemExit as e: _try_to_send_event_summary_to_slack(slack_api, event_aggregator, cmdline_args) sys.exit(e.code)