gokart package

Submodules

gokart.file_processor module

class gokart.file_processor.BinaryFileProcessor[source]

Bases: FileProcessor

Pass bytes to this processor

` figure_binary = io.BytesIO() plt.savefig(figure_binary) figure_binary.seek(0) BinaryFileProcessor().dump(figure_binary.read()) `

dump(obj, file)[source]
format()[source]
load(file)[source]
class gokart.file_processor.CsvFileProcessor(sep=',', encoding: str = 'utf-8')[source]

Bases: FileProcessor

dump(obj, file)[source]
format()[source]
load(file)[source]
class gokart.file_processor.FeatherFileProcessor(store_index_in_feather: bool)[source]

Bases: FileProcessor

dump(obj, file)[source]
format()[source]
load(file)[source]
class gokart.file_processor.FileProcessor[source]

Bases: object

abstract dump(obj, file)[source]
abstract format()[source]
abstract load(file)[source]
class gokart.file_processor.GzipFileProcessor[source]

Bases: FileProcessor

dump(obj, file)[source]
format()[source]
load(file)[source]
class gokart.file_processor.JsonFileProcessor[source]

Bases: FileProcessor

dump(obj, file)[source]
format()[source]
load(file)[source]
class gokart.file_processor.NpzFileProcessor[source]

Bases: FileProcessor

dump(obj, file)[source]
format()[source]
load(file)[source]
class gokart.file_processor.ParquetFileProcessor(engine='pyarrow', compression=None)[source]

Bases: FileProcessor

dump(obj, file)[source]
format()[source]
load(file)[source]
class gokart.file_processor.PickleFileProcessor[source]

Bases: FileProcessor

dump(obj, file)[source]
format()[source]
load(file)[source]
class gokart.file_processor.TextFileProcessor[source]

Bases: FileProcessor

dump(obj, file)[source]
format()[source]
load(file)[source]
class gokart.file_processor.XmlFileProcessor[source]

Bases: FileProcessor

dump(obj, file)[source]
format()[source]
load(file)[source]
gokart.file_processor.make_file_processor(file_path: str, store_index_in_feather: bool) FileProcessor[source]

gokart.info module

gokart.info.make_tree_info(task: TaskOnKart, indent: str = '', last: bool = True, details: bool = False, abbr: bool = True, visited_tasks: Set[str] | None = None, ignore_task_names: List[str] | None = None) str[source]

Return a string representation of the tasks, their statuses/parameters in a dependency tree format

This function has moved to gokart.tree.task_info.make_task_info_as_tree_str. This code is remained for backward compatibility.

Parameters

  • task: TaskOnKart

    Root task.

  • details: bool

    Whether or not to output details.

  • abbr: bool

    Whether or not to simplify tasks information that has already appeared.

  • ignore_task_names: Optional[List[str]]

    List of task names to ignore.

Returns

  • tree_infostr

    Formatted task dependency tree.

class gokart.info.tree_info(*args, **kwargs)[source]

Bases: TaskOnKart

mode: str = <luigi.parameter.Parameter object>
output()[source]
output_path: str = <luigi.parameter.Parameter object>

gokart.parameter module

class gokart.parameter.ExplicitBoolParameter(*args, **kwargs)[source]

Bases: BoolParameter

class gokart.parameter.ListTaskInstanceParameter(expected_elements_type=None, *args, **kwargs)[source]

Bases: Parameter

parse(s)[source]

Parse an individual value from the input.

The default implementation is the identity function, but subclasses should override this method for specialized parsing.

Parameters:

x (str) – the value to parse.

Returns:

the parsed value.

serialize(x)[source]

Opposite of parse().

Converts the value x to a string.

Parameters:

x – the value to serialize.

class gokart.parameter.TaskInstanceParameter(expected_type=None, *args, **kwargs)[source]

Bases: Parameter

parse(s)[source]

Parse an individual value from the input.

The default implementation is the identity function, but subclasses should override this method for specialized parsing.

Parameters:

x (str) – the value to parse.

Returns:

the parsed value.

serialize(x)[source]

Opposite of parse().

Converts the value x to a string.

Parameters:

x – the value to serialize.

gokart.run module

gokart.run.run(cmdline_args=None, set_retcode=True)[source]

gokart.s3_config module

class gokart.s3_config.S3Config(*args, **kwargs)[source]

Bases: Config

aws_access_key_id_name = <luigi.parameter.Parameter object>
aws_secret_access_key_name = <luigi.parameter.Parameter object>
get_s3_client() S3Client[source]

gokart.target module

class gokart.target.LargeDataFrameProcessor(max_byte: int)[source]

Bases: object

static load(file_path: str) DataFrame[source]
save(df: DataFrame, file_path: str)[source]
class gokart.target.ModelTarget(file_path: str, temporary_directory: str, load_function, save_function, task_lock_params: TaskLockParams)[source]

Bases: TargetOnKart

class gokart.target.SingleFileTarget(target: FileSystemTarget, processor: FileProcessor, task_lock_params: TaskLockParams)[source]

Bases: TargetOnKart

class gokart.target.TargetOnKart[source]

Bases: Target

dump(obj, lock_at_dump: bool = True) None[source]
exists() bool[source]

Returns True if the Target exists and False otherwise.

last_modification_time() datetime[source]
load() Any[source]
path() str[source]
remove() None[source]
gokart.target.make_model_target(file_path: str, temporary_directory: str, save_function, load_function, unique_id: str | None = None, task_lock_params: TaskLockParams | None = None) TargetOnKart[source]
gokart.target.make_target(file_path: str, unique_id: str | None = None, processor: FileProcessor | None = None, task_lock_params: TaskLockParams | None = None, store_index_in_feather: bool = True) TargetOnKart[source]

gokart.task module

class gokart.task.TaskOnKart(*args, **kwargs)[source]

Bases: Task, Generic[T]

This is a wrapper class of luigi.Task.

The key methods of a TaskOnKart are:

  • make_target() - this makes output target with a relative file path.

  • make_model_target() - this makes output target for models which generate multiple files to save.

  • load() - this loads input files of this task.

  • dump() - this save a object as output of this task.

FIX_RANDOM_SEED_VALUE_NONE_MAGIC_NUMBER = -42497368
cache_unique_id: bool = <gokart.parameter.ExplicitBoolParameter object>
clone(cls=None, **kwargs)[source]

Creates a new instance from an existing instance where some of the args have changed.

There’s at least two scenarios where this is useful (see test/clone_test.py):

  • remove a lot of boiler plate when you have recursive dependencies and lots of args

  • there’s task inheritance and some logic is on the base class

Parameters:
  • cls

  • kwargs

Returns:

complete() bool[source]

If the task has any outputs, return True if all outputs exist. Otherwise, return False.

However, you may freely override this method with custom logic.

complete_check_at_run: bool = <gokart.parameter.ExplicitBoolParameter object>
delete_unnecessary_output_files: bool = <luigi.parameter.BoolParameter object>
dump(obj: T, target: None = None) None[source]
dump(obj: Any, target: str | TargetOnKart) None
fail_on_empty_dump: bool = <gokart.parameter.ExplicitBoolParameter object>
fix_random_seed_methods: tuple[str] = <luigi.parameter.ListParameter object>
fix_random_seed_value: int = <luigi.parameter.IntParameter object>
static get_code(target_class) Set[str][source]
get_info(only_significant=False)[source]
get_own_code()[source]
get_processing_time() str[source]
get_task_log() Dict[source]
get_task_params() Dict[source]
input() TargetOnKart | Iterable[T | Iterable[FlattenableItems[T]] | dict[str, FlattenableItems[T]]] | dict[str, T | Iterable[FlattenableItems[T]] | dict[str, FlattenableItems[T]]][source]

Returns the outputs of the Tasks returned by requires()

See Task.input

Returns:

a list of Target objects which are specified as outputs of all required Tasks.

static is_task_on_kart(value)[source]
load(target: None | str | TargetOnKart = None) Any[source]
load(target: TaskOnKart[K]) K
load(target: List[TaskOnKart[K]]) List[K]
load_data_frame(target: None | str | TargetOnKart = None, required_columns: Set[str] | None = None, drop_columns: bool = False) DataFrame[source]
load_generator(target: None | str | TargetOnKart = None) Generator[Any, None, None][source]
load_generator(target: List[TaskOnKart[K]]) Generator[K, None, None]
local_temporary_directory: str = <luigi.parameter.Parameter object>
make_large_data_frame_target(relative_file_path: str | None = None, use_unique_id: bool = True, max_byte=67108864) TargetOnKart[source]
make_model_target(relative_file_path: str, save_function: Callable[[Any, str], None], load_function: Callable[[str], Any], use_unique_id: bool = True)[source]

Make target for models which generate multiple files in saving, e.g. gensim.Word2Vec, Tensorflow, and so on.

Parameters:
  • relative_file_path – A file path to save.

  • save_function – A function to save a model. This takes a model object and a file path.

  • load_function – A function to load a model. This takes a file path and returns a model object.

  • use_unique_id – If this is true, add an unique id to a file base name.

make_target(relative_file_path: str | None = None, use_unique_id: bool = True, processor: FileProcessor | None = None) TargetOnKart[source]
make_task_instance_dictionary() Dict[str, TaskOnKart][source]
make_unique_id() str[source]
modification_time_check: bool = <luigi.parameter.BoolParameter object>
output() TargetOnKart | Iterable[T | Iterable[FlattenableItems[T]] | dict[str, FlattenableItems[T]]] | dict[str, T | Iterable[FlattenableItems[T]] | dict[str, FlattenableItems[T]]][source]
property priority

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.__int__(). For floating-point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by ‘+’ or ‘-’ and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal. >>> int(‘0b100’, base=0) 4

redis_host: str | None = <luigi.parameter.OptionalParameter object>
redis_port: int | None = <luigi.parameter.OptionalIntParameter object>
redis_timeout: int = <luigi.parameter.IntParameter object>
requires() TaskOnKart | Iterable[T | Iterable[FlattenableItems[T]] | dict[str, FlattenableItems[T]]] | dict[str, T | Iterable[FlattenableItems[T]] | dict[str, FlattenableItems[T]]][source]
rerun: bool = <luigi.parameter.BoolParameter object>
classmethod restore(unique_id)[source]
serialized_task_definition_check: bool = <luigi.parameter.BoolParameter object>
should_dump_supplementary_log_files: bool = <gokart.parameter.ExplicitBoolParameter object>
should_lock_run: bool = <gokart.parameter.ExplicitBoolParameter object>
significant: bool = <luigi.parameter.BoolParameter object>
store_index_in_feather: bool = <gokart.parameter.ExplicitBoolParameter object>
strict_check: bool = <luigi.parameter.BoolParameter object>
static try_set_seed(methods: List[str], random_seed: int) List[str][source]
workspace_directory: str = <luigi.parameter.Parameter object>

gokart.workspace_management module

gokart.workspace_management.delete_local_unnecessary_outputs(task: TaskOnKart)[source]

gokart.zip_client module

class gokart.zip_client.LocalZipClient(file_path: str, temporary_directory: str)[source]

Bases: ZipClient

exists() bool[source]
make_archive() None[source]
property path: str
remove() None[source]
unpack_archive() None[source]
class gokart.zip_client.ZipClient[source]

Bases: object

abstract exists() bool[source]
abstract make_archive() None[source]
abstract property path: str
abstract remove() None[source]
abstract unpack_archive() None[source]

Module contents