Source code for ablator.mp.utils

import copy
import getpass
import io
import logging
import os
import socket
import typing as ty
from collections import OrderedDict, abc
from dataclasses import dataclass, field
from pathlib import Path

import numpy as np
import paramiko
import psutil
import ray
import torch
from paramiko import PKey
from ray.util.state import list_tasks
from ray.runtime_context import RuntimeContext
from ablator.utils._nvml import get_gpu_mem


[docs]@dataclass class Resource: """ A Resource dataclass used to group resources together Attributes ---------- mem : float percentage of total memory utilization cpu_usage : list[float] percentage of cpu usage per core cpu_count : int number of cores available on the system. gpu_free_mem : list[int] free GPU memory in MiB listed by ascending device number running_Tasks : list[str] the list of ray processes running on the system is_active : bool whether the resource is currently active """ mem: float cpu_usage: list[float] cpu_count: int gpu_free_mem: list[int] running_tasks: list[str] = field(default_factory=lambda: []) is_active: bool = True
[docs]def ray_init(**kwargs: ty.Any) -> RuntimeContext: """ initialize ray with some reasonable defaults Parameters ---------- **kwargs : ty.Any the keyword arguments to provide to initialize ray with. For full details please consult https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html Returns ------- RuntimeContext the ray run-time context """ env_cuda = ( "CUDA_VISIBLE_DEVICES" not in os.environ or os.environ["CUDA_VISIBLE_DEVICES"] != "" ) sys_cuda = torch.cuda.is_available() remote_connect = ( "address" not in kwargs or kwargs["address"] is None or kwargs["address"] == "local" ) if env_cuda and sys_cuda and remote_connect: # this is because WSL and other systems work poorly # with ray. kwargs["num_gpus"] = 1 elif "address" in kwargs: kwargs["ignore_reinit_error"] = True return ray.init(**kwargs)
[docs]def get_ray_address() -> str: """ the current ray address Returns ------- str the address of the current ray cluster """ return ray.get_runtime_context().gcs_address
[docs]def get_username() -> str: """ The executing process system username. Returns ------- str the system username """ return getpass.getuser()
[docs]def get_node_ip() -> str: """ The executing process ray node IP Returns ------- str the ray node IP """ return ray.util.get_node_ip_address()
[docs]def get_node_id() -> str: """ The executing process ray node ID Returns ------- str the ray node ID """ return ray.get_runtime_context().get_node_id()
[docs]def sort_resource_gpu_util(resources: list[Resource]) -> list[int]: """ helper function to sort a list of ``Resource`` based on the free GPU memory. Parameters ---------- resources : list[Resource] the resources to sort. Returns ------- list[int] the sorted list of the index from the least to the most used ``Resource`` """ free_gpu = np.array( [ np.max(_resources.gpu_free_mem) if len(_resources.gpu_free_mem) else 0 for _resources in resources ] ) if (free_gpu[0] == free_gpu).all(): return list(np.array([np.nan] * len(free_gpu))) return list(np.argsort(free_gpu)[::-1])
[docs]def sort_resource_cpu_util(resources: list[Resource]) -> list[int]: """ helper function to sort a list of ``Resource`` based on the CPU utilization. Parameters ---------- resources : list[Resource] the resources to sort. Returns ------- list[int] the sorted list of the index from the least to the most used ``Resource`` """ cpu_usage = np.array([np.mean(_resources.cpu_usage) for _resources in resources]) if (cpu_usage[0] == cpu_usage).all(): return list(np.array([np.nan] * len(cpu_usage))) return list(np.argsort(cpu_usage))
[docs]def sort_resource_mem_util(resources: list[Resource]) -> list[int]: """ helper function to sort a list of ``Resource`` based on the memory utilization. Parameters ---------- resources : list[Resource] the resources to sort. Returns ------- list[int] the sorted list of the index from the least to the most used ``Resource`` """ mem_arr = np.array([_resources.mem for _resources in resources]) if (mem_arr[0] == mem_arr).all(): return list(np.array([np.nan] * len(mem_arr))) return list(np.argsort(mem_arr))
[docs]def sort_resource_task_util(resources: list[Resource]) -> list[int]: """ helper function to sort a list of ``Resource`` based on the number of tasks running. Parameters ---------- resources : list[Resource] the resources to sort. Returns ------- list[int] the sorted list of index from the least to the most used ``Resource`` """ n_running_tasks = np.array( [len(_resources.running_tasks) for _resources in resources] ) if (n_running_tasks[0] == n_running_tasks).all(): return list(np.array([np.nan] * len(n_running_tasks))) return list(np.argsort(n_running_tasks))
[docs]def sort_resources_by_util( resources: dict[str, Resource], eval_gpu: bool ) -> OrderedDict[str, Resource]: """ Sort resources equally weighing between cpu_util, mem_util, number of tasks running and gpu_util, if `eval_gpu=True`. Parameters ---------- resources : dict[str, Resource] the resources to order eval_gpu : bool whether to evaluate the Resource by their GPU utilization Returns ------- OrderedDict[str, Resource] A dictionary of the same resources sorted by utilization. """ node_ips = list(resources.keys()) resources_list = list(resources.values()) sorted_resources = OrderedDict() for _ in range(len(node_ips)): usage_lists = [] if eval_gpu: usage_lists.append(sort_resource_gpu_util(resources_list)) usage_lists.append(sort_resource_cpu_util(resources_list)) usage_lists.append(sort_resource_mem_util(resources_list)) usage_lists.append(sort_resource_task_util(resources_list)) np_usage_lists = np.array(usage_lists) # remove inconclusive np_usage_lists = np_usage_lists[~np.isnan(np_usage_lists).all(1)] # usage_list x node_ip grid if len(np_usage_lists) > 0: least_used_idx = np_usage_lists[:, 0] least_used_idx, least_used_freq = np.unique( least_used_idx, return_counts=True ) idx = int(least_used_idx[np.argmax(least_used_freq)]) else: idx = 0 node_ip = node_ips[idx] sorted_resources[node_ip] = resources[node_ip] del resources_list[idx] del node_ips[idx] return sorted_resources
[docs]def sort_resources( resources: dict[str, Resource], gpu_util_requirement: int | None = None, memory_perc_limit: int = 80, cpu_util_perc_limit: int = 80, ) -> OrderedDict[str, Resource]: """ Sorts the nodes based on their available resources from the least used to the most used node. If a node does not meet the `gpu_util_requirement` or `memory_perc_limit` and `cpu_util_perc_limit` it is excluded from the list. Parameters ---------- resources : dict[str, Resource] a dictionary of the nodes with their available resources gpu_util_requirement : int | None the GPU requirement for the task, by default ``None``. memory_perc_limit : int the percentage upper limit to memory utilization, by default ``80``. cpu_util_perc_limit : int the percentage upper limit to CPU utilization, by default 80 Returns ------- OrderedDict[str, Resource] the sorted list of Node IPs arranged from the least to most used. """ sorted_resources = sort_resources_by_util( resources, gpu_util_requirement is not None ) def _should_sample(node_ip): ray_cluster_gpu_limit = gpu_util_requirement is None or any( np.array(resources[node_ip].gpu_free_mem) > gpu_util_requirement ) ray_cluster_cpu_limit = ( np.mean(resources[node_ip].cpu_usage) < cpu_util_perc_limit ) ray_cluster_mem_limit = resources[node_ip].mem < memory_perc_limit return ray_cluster_mem_limit and ray_cluster_cpu_limit and ray_cluster_gpu_limit available_sorted_nodes = OrderedDict() for node_ip in copy.deepcopy(sorted_resources): if _should_sample(node_ip): available_sorted_nodes[node_ip] = sorted_resources[node_ip] return available_sorted_nodes
[docs]def make_private_key(home_path: Path) -> tuple[PKey, str]: """ creates a private key named `ablator_id_rsa` that can be used for ablator specific functionality, such as SSH between cluster-nodes. Parameters ---------- home_path : Path the home directory of where to store the private key. The private key is added in the folder `.ssh` and is named `ablator_id_rsa` Returns ------- tuple[PKey, str] the private and public key that was added to `home_path`. """ pkey_path = Path(home_path).joinpath(".ssh", "ablator_id_rsa") pkey_path.parent.mkdir(exist_ok=True) if not pkey_path.exists(): pkey = paramiko.RSAKey.generate(bits=2048) with open( os.open( pkey_path.as_posix(), flags=(os.O_WRONLY | os.O_CREAT | os.O_TRUNC), mode=0o600, ), "w", encoding="utf-8", ) as p: pkey.write_private_key(p) else: pkey = paramiko.RSAKey.from_private_key_file(pkey_path.as_posix()) name = pkey.get_name() public_key = pkey.get_base64() hostname = socket.gethostname() node_ip = socket.gethostbyname(hostname) key = f"{name} {public_key} ablator-{hostname}@{node_ip}" return pkey, key
[docs]def register_public_key( public_key: str, ) -> str: """ registers a public-key to be used to access the same machine and current user via SSH. It adds the `public_key` to `authorized_keys`. Parameters ---------- public_key : str the public key to add to the system. Returns ------- str the username corresponding to the added public key. """ username = get_username() # check if key in authorized keys ssh_dir = Path.home().joinpath(".ssh") ssh_dir.mkdir(exist_ok=True) authorized_keys = ssh_dir.joinpath("authorized_keys") if authorized_keys.exists() and public_key in authorized_keys.read_text( encoding="utf-8" ): return username with authorized_keys.open("a", encoding="utf-8") as f: f.write(f"{public_key}\n") return username
[docs]def private_key_to_str(key: PKey) -> str: """ Convert a private key object to a string. Parameters ---------- key : PKey the private key object to convert to a string/ Returns ------- str the string representation of the private key. """ strbuffer = io.StringIO() key.write_private_key(strbuffer) private_key = strbuffer.getvalue() return private_key
[docs]def get_ray_tasks(ray_address: str, node_id: str, timeout: int) -> list[str]: """ list of running ray tasks on the specified node ID. Parameters ---------- ray_address : str the cluster ray address node_id : str the node ID to discover the running tasks timeout : int the timeout to apply when waiting for the ray cluster to respond. Returns ------- list[str] a list of the ray task names running on the ``Node``. """ return [ task.name for task in list_tasks( address=ray_address, filters=[ ("state", "=", "RUNNING"), ("node_id", "=", node_id), ], timeout=timeout, ) ]
[docs]def utilization() -> dict[str, dict[int, int] | float | list[float] | int]: """ the system utilization Returns ------- dict[str, dict[int, int] | float | list[float] | int] a dictionary with keys: - gpu_free_mem : dict[int, int] corresponding to the free GPU memory on the system. - mem : float the percentage of available system memory - cpu_usage : list[float] the usage of each CPU core/thread on the system. - cpu_count : int the number of CPU cores on the system. """ free_gpu = get_gpu_mem("free") mem_usage = psutil.virtual_memory().percent cpu_usage = psutil.cpu_percent(interval=2, percpu=True) cpu_count = psutil.cpu_count() return { "gpu_free_mem": free_gpu, "mem": mem_usage, "cpu_usage": cpu_usage, "cpu_count": cpu_count, }
[docs]def run_lambda( fn: abc.Callable, node_ip: str | None = None, timeout: int | None = None, run_async: bool = False, cuda: bool | None = None, fn_kwargs: dict[str, ty.Any] | None = None, name: str | None = None, max_calls: int | None = 1, **options: float | str | dict, ) -> ty.Any | ray.ObjectRef: """ run a Python lambda function on the Node and scheduled using ray. When running asynchronously a reference to the ray object is provided. Otherwise, it will return the function output. Parameters ---------- fn : abc.Callable the function to run. Must be pickable. node_ip : str | None, optional the IP of the node to run the ray function to, by default None timeout : int | None, optional the timeout to apply when waiting for the results, by default None run_async : bool, optional whether to wait for the lambda output or schedule asynchronously, by default False cuda : bool | None, optional whether the function requires CUDA, by default False fn_kwargs : dict[str, ty.Any] | None, optional the keyword arguments to pass to the function, by default None name : str | None, optional the name of the scheduled function, by default None max_calls : int | None, optional used to de-allocate memory for remotes (but is not applicable for ray actors), by default 1 **options : float | str | dict Additional kwarg options to supply as run-time configurations to the remote function. Returns ------- ty.Any | ray.ObjectRef the result of the function or a reference to the ray object. """ if fn_kwargs is None: fn_kwargs = {} options["num_cpus"] = 0.001 if cuda is None: cuda = torch.cuda.is_available() if not cuda: options["num_gpus"] = 0 else: options["num_gpus"] = 0.001 if name is not None: options["name"] = name if max_calls is not None: options["max_calls"] = max_calls if node_ip is not None: options["resources"] = {f"node:{node_ip}": 0.001} remote_fn = ray.remote(**options)(fn).remote(**fn_kwargs) if not run_async: return ray.get( remote_fn, timeout=timeout, ) return remote_fn
[docs]def run_ssh_cmd( node_ip: str, username: str, private_key: PKey, cmd: str, timeout: int, run_async: bool = False, ) -> str | None: """ executes a bash command via ssh and optionally returns the `stdout` of the command. When `stderr` is present in the command output it will log it as an error. Parameters ---------- node_ip : str the IP of where to execute the command. username : str the username of where to execute the command. private_key : PKey the private key of where to connect to execute the command cmd : str the command to execute timeout : int the timeout in establishing a connection with SSH server run_async : bool, optional whether to run the command asynchronously and not wait for stdout/stderr but instead return None, by default False Returns ------- str | None a string of the `stdout` from the command or `None` when the command is run asynchronously """ client = paramiko.SSHClient() policy = paramiko.AutoAddPolicy() client.set_missing_host_key_policy(policy) client.connect( node_ip, username=username, pkey=private_key, timeout=timeout, banner_timeout=timeout, auth_timeout=timeout, channel_timeout=timeout, ) _, _stdout, _stderr = client.exec_command(cmd) if run_async: return None error_msg = _stderr.read().decode().strip("\n") if len(error_msg) > 0: logging.error("Error for %s: %s", node_ip, error_msg) return _stdout.read().decode().strip("\n")