ablator.main package#

Subpackages#

Submodules#

ablator.main.mp module#

class ablator.main.mp.ParallelTrainer(wrapper: ModelWrapper, run_config: ParallelConfig)[source]#

Bases: ProtoTrainer

A class for parallelizing multiple training processes of models of different configurations with ray.

Parameters:
wrapperModelWrapper

The model wrapper for the ParallelTrainer.

run_configParallelConfig

The runtime configuration for this trainer.

Examples

Below is a complete workflow on how to launch a parallel experiment with ParallelTrainer, from defining config, getting the model wrapper ready, to launching the experiment:

  • Define training config:

>>> my_optimizer_config = OptimizerConfig("sgd", {"lr": 0.5, "weight_decay": 0.5})
>>> my_scheduler_config = SchedulerConfig("step", arguments={"step_size": 1, "gamma": 0.99})
>>> train_config = TrainConfig(
...     dataset="[Dataset Name]",
...     batch_size=32,
...     epochs=10,
...     optimizer_config = my_optimizer_config,
...     scheduler_config = my_scheduler_config
... )
  • Define model config, we want to run HPO on activation functions and model hidden size:

>>> @configclass
>>> class CustomModelConfig(ModelConfig):
>>>     hidden_size: int
>>>     activation: str
>>> model_config = CustomModelConfig(hidden_size=100, activation="relu")
  • Define search space:

>>> search_space = {
...     "train_config.optimizer_config.arguments.lr": SearchSpace(
...         value_range = [0.001, 0.01],
...         value_type = 'float'
...         ),
...     "model_config.hidden_size": SearchSpace(value_range = [32, 64], value_type = 'int'),
...     "model_config.activation": SearchSpace(categorical_values = ["relu", "elu", "leakyRelu"]),
... }
  • Define run config (remember to redefine the parallel config to update the model config type to be CustomModelConfig):

>>> @configclass
>>> class CustomParallelConfig(ParallelConfig):
...    model_config: CustomModelConfig
>>>
>>> parallel_config = CustomParallelConfig(
...     train_config=train_config,
...     model_config=model_config,
...     metrics_n_batches = 800,
...     experiment_dir = "/tmp/experiments/",
...     device="cuda",
...     amp=True,
...     random_seed = 42,
...     total_trials = 20,
...     concurrent_trials = 3,
...     search_space = search_space,
...     optim_metrics = {"val_loss": "min"},
...     optim_metric_name = "val_loss",
...     gpu_mb_per_experiment = 1024
... )
  • Create model wrapper:

>>> class MyModelWrapper(ModelWrapper):
>>>     def __init__(self, *args, **kwargs):
>>>         super().__init__(*args, **kwargs)
>>>
>>>     def make_dataloader_train(self, run_config: CustomParallelConfig):
>>>         return torch.utils.data.DataLoader(<train_dataset>, batch_size=32, shuffle=True)
>>>
>>>     def make_dataloader_val(self, run_config: CustomParallelConfig):
>>>         return torch.utils.data.DataLoader(<val_dataset>, batch_size=32, shuffle=False)
  • After gathering all configurations and model wrapper, we can initialize and launch the parallel trainer:

>>> wrapper = MyModelWrapper(
...     model_class=<your_ModelModule_class>,
... )
>>> ablator = ParallelTrainer(
...     wrapper=wrapper,
...     run_config=parallel_config,
... )
>>> ablator.launch(working_directory = os.getcwd(), ray_head_address=None)
Attributes:
run_configParallelConfig

Running configuration for parallel training.

loggerRemoteFileLogger

A centralized logger that writes messages to a file and prints them to the console.

experiment_stateExperimentState

This attribute manages optuna trials.

gpu_managerty.Optional[GPUManager]

A GPU manager that manages GPU resources in the cluster.

available_resourcesdict[str, Resource]

A dictionary of available resources on each node.

node_managerNodeManager

A node manager that manages nodes and their resources.

ray_addressstr

The address of the ray cluster.

total_trialsint

Total number of trials to run.

gpu_mem_bottleneckint

The minimum memory capacity of all available gpus.

cpufloat

The number of cpu used per trial.

gpufloat

The number of gpu used per trial.

running_futuresdict[str, list]

A dictionary with keys the Node IP and values a list of Ray remote tasks executing on the node aka futures.

cluster_managerClusterManager

The cluster manager responsible for scheduling tasks and managing resources

launch(working_directory: str, auxilary_modules: list[module] | None = None, ray_head_address: str | None = None, resume: bool = False, excluding_files: list[str] | None = None, debug: bool = False)[source]#

Set up and launch the parallel ablation experiment. This sets up a ray cluster, and trials of different configuration initialized (or retrieved) will be pushed to the ray cluster to run in parallel.

Parameters:
working_directorystr

The working directory that stores codes and modules that will be used by ray.

auxilary_moduleslist[tys.ModuleType] | None

A list of modules to be used as ray clusters’ working environment.

ray_head_addressstr | None

Ray cluster address.

resumebool

Whether to resume training the model from existing checkpoints and existing experiment state, by default False.

excluding_fileslist[str] | None

A list of files in .gitignore format, that will be excluded from being uploaded to the ray cluster. If unspecified it ignores .git/** folder.

debugbool, optional

Whether to train model in debug mode. By default False

Raises:
RuntimeError

If the config.experiment_id is unspecified but resuming an experiment or the experiment directory is not empty but uses a remote storage configuration.

pre_train_setup()[source]#

Used to prepare resources to avoid stalling during training or when resources are shared between trainers.

stop()[source]#
property total_trials: int#

ablator.main.proto module#

class ablator.main.proto.ProtoTrainer(wrapper: ModelWrapper, run_config: RunConfig)[source]#

Bases: object

Manages resources for Prototyping. This trainer runs an experiment of a single prototype model (Therefore no ablation study nor HPO).

Parameters:
wrapperModelWrapper

The main model wrapper.

run_configRunConfig

Running configuration for the model.

Raises:
RuntimeError

If the experiment directory is not defined in the running configuration.

Examples

Below is a complete workflow on how to launch a prototype experiment with ProtoTrainer, from defining the config to launching the experiment:

  • Define training config:

>>> my_optimizer_config = OptimizerConfig("sgd", {"lr": 0.5, "weight_decay": 0.5})
>>> my_scheduler_config = SchedulerConfig("step", arguments={"step_size": 1, "gamma": 0.99})
>>> train_config = TrainConfig(
...     dataset="[Dataset Name]",
...     batch_size=32,
...     epochs=10,
...     optimizer_config = my_optimizer_config,
...     scheduler_config = my_scheduler_config
... )
  • Define model config: we use the default one with no custom hyperparameters (sometimes you would want to customize it to run ablation study/ HPO on the model’s hyperparameters in a parallel experiment, which needs ParallelTrainer and ParallelConfig instead of ProtoTrainer and RunConfig):

>>> model_config = ModelConfig()
  • Define run config:

>>> run_config = RunConfig(
...     train_config=train_config,
...     model_config=model_config,
...     metrics_n_batches = 800,
...     experiment_dir = "/tmp/experiments",
...     device="cpu",
...     amp=False,
...     random_seed = 42
... )
  • Create model wrapper:

>>> class MyModelWrapper(ModelWrapper):
>>>     def __init__(self, *args, **kwargs):
>>>         super().__init__(*args, **kwargs)
>>>
>>>     def make_dataloader_train(self, run_config: RunConfig):
>>>         return torch.utils.data.DataLoader(<train_dataset>, batch_size=32, shuffle=True)
>>>
>>>     def make_dataloader_val(self, run_config: RunConfig):
>>>         return torch.utils.data.DataLoader(<val_dataset>, batch_size=32, shuffle=False)
  • After gathering all configurations and model wrapper, it’s time we initialize and launch the prototype trainer. When launching the experiment, we must provide a working directory, which points to a git repository that is used for keeping track of the code differences:

>>> wrapper = MyModelWrapper(
...     model_class=<your_ModelModule_class>,
... )
>>> ablator = ProtoTrainer(
...     wrapper=wrapper,
...     run_config=run_config,
... )
>>> metrics = ablator.launch(working_directory=os.getcwd())  # suppose current directory is tracked by git
Attributes:
wrapperModelWrapper

The main model wrapper.

run_configRunConfig

Running configuration for the model.

experiment_dirPath

The path object to the experiment directory.

evaluate() dict[str, dict[str, Any]][source]#

Run model evaluation on the training results, sync evaluation results to external logging services (e.g. Google cloud storage, other remote servers).

Returns:
dict[str, dict[str, ty.Any]]

Metrics returned after evaluation.

launch(working_directory: str, resume: bool = False, debug: bool = False) dict[str, float][source]#

Launch the prototype experiment (train, evaluate the single prototype model) and return metrics.

Parameters:
working_directorystr

The working directory points to a git repository that is used for keeping track of the code differences.

resumebool

Whether to resume training the model from existing checkpoints and existing experiment state. By default False

debugbool, optional

Whether to train models in debug mode, by default False.

Returns:
dict[str, float]

Metrics returned after training.

Raises:
RuntimeError

If the config.experiment_id is unspecified but resuming an experiment or the experiment directory is not empty but using a remote storage configuration.

pre_train_setup()[source]#

Used to prepare resources to avoid stalling during training or when resources are shared between trainers.

smoke_test(config: RunConfig | None = None) bool[source]#

Run a smoke test training process on the model.

Parameters:
configRunConfig | None

Running configuration for the model.

Returns:
bool

Whether the smoke test was successful.

Examples

>>> try:
...    ablator.smoke_test(run_config)
... except err:
...    raise err
stop()[source]#

Module contents#