ablator.mp package#
Submodules#
ablator.mp.node_manager module#
ablator.mp.utils module#
- class ablator.mp.utils.Resource(mem: float, cpu_usage: list[float], cpu_count: int, gpu_free_mem: list[int], running_tasks: list[str] = <factory>, is_active: bool = True)[source]#
Bases:
objectA Resource dataclass used to group resources together
- Attributes:
- memfloat
percentage of total memory utilization
- cpu_usagelist[float]
percentage of cpu usage per core
- cpu_countint
number of cores available on the system.
- gpu_free_memlist[int]
free GPU memory in MiB listed by ascending device number
- running_Taskslist[str]
the list of ray processes running on the system
- is_activebool
whether the resource is currently active
- cpu_count: int#
- cpu_usage: list[float]#
- gpu_free_mem: list[int]#
- is_active: bool = True#
- mem: float#
- running_tasks: list[str]#
- ablator.mp.utils.get_node_id() str[source]#
The executing process ray node ID
- Returns:
- str
the ray node ID
- ablator.mp.utils.get_node_ip() str[source]#
The executing process ray node IP
- Returns:
- str
the ray node IP
- ablator.mp.utils.get_ray_address() str[source]#
the current ray address
- Returns:
- str
the address of the current ray cluster
- ablator.mp.utils.get_ray_tasks(ray_address: str, node_id: str, timeout: int) list[str][source]#
list of running ray tasks on the specified node ID.
- Parameters:
- ray_addressstr
the cluster ray address
- node_idstr
the node ID to discover the running tasks
- timeoutint
the timeout to apply when waiting for the ray cluster to respond.
- Returns:
- list[str]
a list of the ray task names running on the
Node.
- ablator.mp.utils.get_username() str[source]#
The executing process system username.
- Returns:
- str
the system username
- ablator.mp.utils.make_private_key(home_path: Path) tuple[paramiko.pkey.PKey, str][source]#
creates a private key named ablator_id_rsa that can be used for ablator specific functionality, such as SSH between cluster-nodes.
- Parameters:
- home_pathPath
the home directory of where to store the private key. The private key is added in the folder .ssh and is named ablator_id_rsa
- Returns:
- tuple[PKey, str]
the private and public key that was added to home_path.
- ablator.mp.utils.private_key_to_str(key: PKey) str[source]#
Convert a private key object to a string.
- Parameters:
- keyPKey
the private key object to convert to a string/
- Returns:
- str
the string representation of the private key.
- ablator.mp.utils.ray_init(**kwargs: Any) RuntimeContext[source]#
initialize ray with some reasonable defaults
- Parameters:
- **kwargsty.Any
the keyword arguments to provide to initialize ray with. For full details please consult https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html
- Returns:
- RuntimeContext
the ray run-time context
- ablator.mp.utils.register_public_key(public_key: str) str[source]#
registers a public-key to be used to access the same machine and current user via SSH. It adds the public_key to authorized_keys.
- Parameters:
- public_keystr
the public key to add to the system.
- Returns:
- str
the username corresponding to the added public key.
- ablator.mp.utils.run_lambda(fn: Callable, node_ip: str | None = None, timeout: int | None = None, run_async: bool = False, cuda: bool | None = None, fn_kwargs: dict[str, Any] | None = None, name: str | None = None, max_calls: int | None = 1, **options: float | str | dict) Any | ObjectRef[source]#
run a Python lambda function on the Node and scheduled using ray. When running asynchronously a reference to the ray object is provided. Otherwise, it will return the function output.
- Parameters:
- fnabc.Callable
the function to run. Must be pickable.
- node_ipstr | None, optional
the IP of the node to run the ray function to, by default None
- timeoutint | None, optional
the timeout to apply when waiting for the results, by default None
- run_asyncbool, optional
whether to wait for the lambda output or schedule asynchronously, by default False
- cudabool | None, optional
whether the function requires CUDA, by default False
- fn_kwargsdict[str, ty.Any] | None, optional
the keyword arguments to pass to the function, by default None
- namestr | None, optional
the name of the scheduled function, by default None
- max_callsint | None, optional
used to de-allocate memory for remotes (but is not applicable for ray actors), by default 1
- **optionsfloat | str | dict
Additional kwarg options to supply as run-time configurations to the remote function.
- Returns:
- ty.Any | ray.ObjectRef
the result of the function or a reference to the ray object.
- ablator.mp.utils.run_ssh_cmd(node_ip: str, username: str, private_key: PKey, cmd: str, timeout: int, run_async: bool = False) str | None[source]#
executes a bash command via ssh and optionally returns the stdout of the command. When stderr is present in the command output it will log it as an error.
- Parameters:
- node_ipstr
the IP of where to execute the command.
- usernamestr
the username of where to execute the command.
- private_keyPKey
the private key of where to connect to execute the command
- cmdstr
the command to execute
- timeoutint
the timeout in establishing a connection with SSH server
- run_asyncbool, optional
whether to run the command asynchronously and not wait for stdout/stderr but instead return None, by default False
- Returns:
- str | None
a string of the stdout from the command or None when the command is run asynchronously
- ablator.mp.utils.sort_resource_cpu_util(resources: list[ablator.mp.utils.Resource]) list[int][source]#
helper function to sort a list of
Resourcebased on the CPU utilization.- Parameters:
- resourceslist[Resource]
the resources to sort.
- Returns:
- list[int]
the sorted list of the index from the least to the most used
Resource
- ablator.mp.utils.sort_resource_gpu_util(resources: list[ablator.mp.utils.Resource]) list[int][source]#
helper function to sort a list of
Resourcebased on the free GPU memory.- Parameters:
- resourceslist[Resource]
the resources to sort.
- Returns:
- list[int]
the sorted list of the index from the least to the most used
Resource
- ablator.mp.utils.sort_resource_mem_util(resources: list[ablator.mp.utils.Resource]) list[int][source]#
helper function to sort a list of
Resourcebased on the memory utilization.- Parameters:
- resourceslist[Resource]
the resources to sort.
- Returns:
- list[int]
the sorted list of the index from the least to the most used
Resource
- ablator.mp.utils.sort_resource_task_util(resources: list[ablator.mp.utils.Resource]) list[int][source]#
helper function to sort a list of
Resourcebased on the number of tasks running.- Parameters:
- resourceslist[Resource]
the resources to sort.
- Returns:
- list[int]
the sorted list of index from the least to the most used
Resource
- ablator.mp.utils.sort_resources(resources: dict[str, ablator.mp.utils.Resource], gpu_util_requirement: int | None = None, memory_perc_limit: int = 80, cpu_util_perc_limit: int = 80) OrderedDict[str, Resource][source]#
Sorts the nodes based on their available resources from the least used to the most used node. If a node does not meet the gpu_util_requirement or memory_perc_limit and cpu_util_perc_limit it is excluded from the list.
- Parameters:
- resourcesdict[str, Resource]
a dictionary of the nodes with their available resources
- gpu_util_requirementint | None
the GPU requirement for the task, by default
None.- memory_perc_limitint
the percentage upper limit to memory utilization, by default
80.- cpu_util_perc_limitint
the percentage upper limit to CPU utilization, by default 80
- Returns:
- OrderedDict[str, Resource]
the sorted list of Node IPs arranged from the least to most used.
- ablator.mp.utils.sort_resources_by_util(resources: dict[str, ablator.mp.utils.Resource], eval_gpu: bool) OrderedDict[str, Resource][source]#
Sort resources equally weighing between cpu_util, mem_util, number of tasks running and gpu_util, if eval_gpu=True.
- Parameters:
- resourcesdict[str, Resource]
the resources to order
- eval_gpubool
whether to evaluate the Resource by their GPU utilization
- Returns:
- OrderedDict[str, Resource]
A dictionary of the same resources sorted by utilization.
- ablator.mp.utils.utilization() dict[str, dict[int, int] | float | list[float] | int][source]#
the system utilization
- Returns:
- dict[str, dict[int, int] | float | list[float] | int]
- a dictionary with keys:
- gpu_free_memdict[int, int]
corresponding to the free GPU memory on the system.
- memfloat
the percentage of available system memory
- cpu_usagelist[float]
the usage of each CPU core/thread on the system.
- cpu_countint
the number of CPU cores on the system.