ablator.mp package#

Submodules#

ablator.mp.node_manager module#

ablator.mp.utils module#

class ablator.mp.utils.Resource(mem: float, cpu_usage: list[float], cpu_count: int, gpu_free_mem: list[int], running_tasks: list[str] = <factory>, is_active: bool = True)[source]#

Bases: object

A Resource dataclass used to group resources together

Attributes:
memfloat

percentage of total memory utilization

cpu_usagelist[float]

percentage of cpu usage per core

cpu_countint

number of cores available on the system.

gpu_free_memlist[int]

free GPU memory in MiB listed by ascending device number

running_Taskslist[str]

the list of ray processes running on the system

is_activebool

whether the resource is currently active

cpu_count: int#
cpu_usage: list[float]#
gpu_free_mem: list[int]#
is_active: bool = True#
mem: float#
running_tasks: list[str]#
ablator.mp.utils.get_node_id() str[source]#

The executing process ray node ID

Returns:
str

the ray node ID

ablator.mp.utils.get_node_ip() str[source]#

The executing process ray node IP

Returns:
str

the ray node IP

ablator.mp.utils.get_ray_address() str[source]#

the current ray address

Returns:
str

the address of the current ray cluster

ablator.mp.utils.get_ray_tasks(ray_address: str, node_id: str, timeout: int) list[str][source]#

list of running ray tasks on the specified node ID.

Parameters:
ray_addressstr

the cluster ray address

node_idstr

the node ID to discover the running tasks

timeoutint

the timeout to apply when waiting for the ray cluster to respond.

Returns:
list[str]

a list of the ray task names running on the Node.

ablator.mp.utils.get_username() str[source]#

The executing process system username.

Returns:
str

the system username

ablator.mp.utils.make_private_key(home_path: Path) tuple[paramiko.pkey.PKey, str][source]#

creates a private key named ablator_id_rsa that can be used for ablator specific functionality, such as SSH between cluster-nodes.

Parameters:
home_pathPath

the home directory of where to store the private key. The private key is added in the folder .ssh and is named ablator_id_rsa

Returns:
tuple[PKey, str]

the private and public key that was added to home_path.

ablator.mp.utils.private_key_to_str(key: PKey) str[source]#

Convert a private key object to a string.

Parameters:
keyPKey

the private key object to convert to a string/

Returns:
str

the string representation of the private key.

ablator.mp.utils.ray_init(**kwargs: Any) RuntimeContext[source]#

initialize ray with some reasonable defaults

Parameters:
**kwargsty.Any

the keyword arguments to provide to initialize ray with. For full details please consult https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html

Returns:
RuntimeContext

the ray run-time context

ablator.mp.utils.register_public_key(public_key: str) str[source]#

registers a public-key to be used to access the same machine and current user via SSH. It adds the public_key to authorized_keys.

Parameters:
public_keystr

the public key to add to the system.

Returns:
str

the username corresponding to the added public key.

ablator.mp.utils.run_lambda(fn: Callable, node_ip: str | None = None, timeout: int | None = None, run_async: bool = False, cuda: bool | None = None, fn_kwargs: dict[str, Any] | None = None, name: str | None = None, max_calls: int | None = 1, **options: float | str | dict) Any | ObjectRef[source]#

run a Python lambda function on the Node and scheduled using ray. When running asynchronously a reference to the ray object is provided. Otherwise, it will return the function output.

Parameters:
fnabc.Callable

the function to run. Must be pickable.

node_ipstr | None, optional

the IP of the node to run the ray function to, by default None

timeoutint | None, optional

the timeout to apply when waiting for the results, by default None

run_asyncbool, optional

whether to wait for the lambda output or schedule asynchronously, by default False

cudabool | None, optional

whether the function requires CUDA, by default False

fn_kwargsdict[str, ty.Any] | None, optional

the keyword arguments to pass to the function, by default None

namestr | None, optional

the name of the scheduled function, by default None

max_callsint | None, optional

used to de-allocate memory for remotes (but is not applicable for ray actors), by default 1

**optionsfloat | str | dict

Additional kwarg options to supply as run-time configurations to the remote function.

Returns:
ty.Any | ray.ObjectRef

the result of the function or a reference to the ray object.

ablator.mp.utils.run_ssh_cmd(node_ip: str, username: str, private_key: PKey, cmd: str, timeout: int, run_async: bool = False) str | None[source]#

executes a bash command via ssh and optionally returns the stdout of the command. When stderr is present in the command output it will log it as an error.

Parameters:
node_ipstr

the IP of where to execute the command.

usernamestr

the username of where to execute the command.

private_keyPKey

the private key of where to connect to execute the command

cmdstr

the command to execute

timeoutint

the timeout in establishing a connection with SSH server

run_asyncbool, optional

whether to run the command asynchronously and not wait for stdout/stderr but instead return None, by default False

Returns:
str | None

a string of the stdout from the command or None when the command is run asynchronously

ablator.mp.utils.sort_resource_cpu_util(resources: list[ablator.mp.utils.Resource]) list[int][source]#

helper function to sort a list of Resource based on the CPU utilization.

Parameters:
resourceslist[Resource]

the resources to sort.

Returns:
list[int]

the sorted list of the index from the least to the most used Resource

ablator.mp.utils.sort_resource_gpu_util(resources: list[ablator.mp.utils.Resource]) list[int][source]#

helper function to sort a list of Resource based on the free GPU memory.

Parameters:
resourceslist[Resource]

the resources to sort.

Returns:
list[int]

the sorted list of the index from the least to the most used Resource

ablator.mp.utils.sort_resource_mem_util(resources: list[ablator.mp.utils.Resource]) list[int][source]#

helper function to sort a list of Resource based on the memory utilization.

Parameters:
resourceslist[Resource]

the resources to sort.

Returns:
list[int]

the sorted list of the index from the least to the most used Resource

ablator.mp.utils.sort_resource_task_util(resources: list[ablator.mp.utils.Resource]) list[int][source]#

helper function to sort a list of Resource based on the number of tasks running.

Parameters:
resourceslist[Resource]

the resources to sort.

Returns:
list[int]

the sorted list of index from the least to the most used Resource

ablator.mp.utils.sort_resources(resources: dict[str, ablator.mp.utils.Resource], gpu_util_requirement: int | None = None, memory_perc_limit: int = 80, cpu_util_perc_limit: int = 80) OrderedDict[str, Resource][source]#

Sorts the nodes based on their available resources from the least used to the most used node. If a node does not meet the gpu_util_requirement or memory_perc_limit and cpu_util_perc_limit it is excluded from the list.

Parameters:
resourcesdict[str, Resource]

a dictionary of the nodes with their available resources

gpu_util_requirementint | None

the GPU requirement for the task, by default None.

memory_perc_limitint

the percentage upper limit to memory utilization, by default 80.

cpu_util_perc_limitint

the percentage upper limit to CPU utilization, by default 80

Returns:
OrderedDict[str, Resource]

the sorted list of Node IPs arranged from the least to most used.

ablator.mp.utils.sort_resources_by_util(resources: dict[str, ablator.mp.utils.Resource], eval_gpu: bool) OrderedDict[str, Resource][source]#

Sort resources equally weighing between cpu_util, mem_util, number of tasks running and gpu_util, if eval_gpu=True.

Parameters:
resourcesdict[str, Resource]

the resources to order

eval_gpubool

whether to evaluate the Resource by their GPU utilization

Returns:
OrderedDict[str, Resource]

A dictionary of the same resources sorted by utilization.

ablator.mp.utils.utilization() dict[str, dict[int, int] | float | list[float] | int][source]#

the system utilization

Returns:
dict[str, dict[int, int] | float | list[float] | int]
a dictionary with keys:
  • gpu_free_memdict[int, int]

    corresponding to the free GPU memory on the system.

  • memfloat

    the percentage of available system memory

  • cpu_usagelist[float]

    the usage of each CPU core/thread on the system.

  • cpu_countint

    the number of CPU cores on the system.

Module contents#