Ray作业调度和调优#

Isaac Lab 支持 Ray ,用于简化多个训练任务的调度(包括并行和串行),以及超参数调优,适用于本地和远程配置。

这个 独立社区贡献的操作视频 演示了本概述中介绍的 Ray 集成功能的一些核心内容。尽管自视频制作以来,代码库中可能存在一些差异(例如文件名被简化),但总体工作流程是相同的。

注意

此功能为实验性功能,仅在 Linux 上进行过测试。

概述#

Ray 集成对于以下内容非常有用:

  • 以最小的交互并行或顺序调度多个训练作业。

  • 调优超参数;支持多 GPU 和/或多个 GPU 节点的并行或顺序调优。

  • 在各个环境中使用相同的训练设置(云端和本地),并尽量减少开销。

  • 训练任务的资源隔离(资源包装作业)。

Ray 工作流的核心功能由两个主要脚本组成,这些脚本使得资源封装和调优聚合作业的编排成为可能。在资源封装的聚合作业中,每个子作业及其资源需求都是手动定义的,从而实现资源隔离。对于调优聚合作业,个别作业是根据超参数搜索配置自动生成的。

资源包装作业和调优聚合作业将单个作业调度到指定的 Ray 集群,该集群利用集群的资源(例如,单个工作站节点或多个节点)以并行和/或顺序方式执行这些作业。

默认情况下,作业会在每个 可用的 GPU 启用节点上为每个子作业工作者使用所有可用资源。这可以通过为资源封装作业指定 --num_workers 参数,或为调优作业指定 --num_workers_per_node 参数来更改,这对于在本地/虚拟多 GPU 机器上进行并行聚合作业处理尤其重要。调优作业假设具有 GPU 的节点具有同质的节点资源组成。

以下两个文件包含了 Ray 集成的核心功能。

scripts/reinforcement_learning/ray/wrap_resources.py
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse

import ray
import util
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

"""
This script dispatches sub-job(s) (individual jobs, use :file:`tuner.py` for tuning jobs)
to worker(s) on GPU-enabled node(s) of a specific cluster as part of an resource-wrapped aggregate
job. If no desired compute resources for each sub-job are specified,
this script creates one worker per available node for each node with GPU(s) in the cluster.
If the desired resources for each sub-job is specified,
the maximum number of workers possible with the desired resources are created for each node
with GPU(s) in the cluster. It is also possible to split available node resources for each node
into the desired number of workers with the ``--num_workers`` flag, to be able to easily
parallelize sub-jobs on multi-GPU nodes. Due to Isaac Lab requiring a GPU,
this ignores all CPU only nodes such as loggers.

Sub-jobs are matched with node(s) in a cluster via the following relation:
sorted_nodes = Node sorted by descending GPUs, then descending CPUs, then descending RAM, then node ID
node_submitted_to = sorted_nodes[job_index % total_node_count]

To check the ordering of sorted nodes, supply the ``--test`` argument and run the script.

Sub-jobs are separated by the + delimiter. The ``--sub_jobs`` argument must be the last
argument supplied to the script.

If there is more than one available worker, and more than one sub-job,
sub-jobs will be executed in parallel. If there are more sub-jobs than workers, sub-jobs will
be dispatched to workers as they become available. There is no limit on the number
of sub-jobs that can be near-simultaneously submitted.

This script is meant to be executed on a Ray cluster head node as an aggregate cluster job.
To submit aggregate cluster jobs such as this script to one or more remote clusters,
see :file:`../submit_isaac_ray_job.py`.

KubeRay clusters on Google GKE can be created with :file:`../launch.py`

Usage:

.. code-block:: bash
    # **Ensure that sub-jobs are separated by the ``+`` delimiter.**
    # Generic Templates-----------------------------------
    ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py -h
    # No resource isolation; no parallelization:
    ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py
    --sub_jobs <JOB0>+<JOB1>+<JOB2>
    # Automatic Resource Isolation; Example A: needed for parallelization
    ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py \
    --num_workers <NUM_TO_DIVIDE_TOTAL_RESOURCES_BY> \
    --sub_jobs <JOB0>+<JOB1>
    # Manual Resource Isolation; Example B:  needed for parallelization
    ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py --num_cpu_per_worker <CPU> \
    --gpu_per_worker <GPU> --ram_gb_per_worker <RAM> --sub_jobs <JOB0>+<JOB1>
    # Manual Resource Isolation; Example C: Needed for parallelization, for heterogeneous workloads
    ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py --num_cpu_per_worker <CPU> \
    --gpu_per_worker <GPU1> <GPU2> --ram_gb_per_worker <RAM> --sub_jobs <JOB0>+<JOB1>
    # to see all arguments
    ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py -h
"""


def wrap_resources_to_jobs(jobs: list[str], args: argparse.Namespace) -> None:
    """
    Provided a list of jobs, dispatch jobs to one worker per available node,
    unless otherwise specified by resource constraints.

    Args:
        jobs: bash commands to execute on a Ray cluster
        args: The arguments for resource allocation

    """
    if not ray.is_initialized():
        ray.init(address=args.ray_address, log_to_driver=True)
    job_results = []
    gpu_node_resources = util.get_gpu_node_resources(include_id=True, include_gb_ram=True)

    if any([args.gpu_per_worker, args.cpu_per_worker, args.ram_gb_per_worker]) and args.num_workers:
        raise ValueError("Either specify only num_workers or only granular resources(GPU,CPU,RAM_GB).")

    num_nodes = len(gpu_node_resources)
    # Populate arguments
    formatted_node_resources = {
        "gpu_per_worker": [gpu_node_resources[i]["GPU"] for i in range(num_nodes)],
        "cpu_per_worker": [gpu_node_resources[i]["CPU"] for i in range(num_nodes)],
        "ram_gb_per_worker": [gpu_node_resources[i]["ram_gb"] for i in range(num_nodes)],
        "num_workers": args.num_workers,  # By default, 1 worker por node
    }
    args = util.fill_in_missing_resources(args, resources=formatted_node_resources, policy=min)
    print(f"[INFO]: Number of GPU nodes found: {num_nodes}")
    if args.test:
        jobs = ["nvidia-smi"] * num_nodes
    for i, job in enumerate(jobs):
        gpu_node = gpu_node_resources[i % num_nodes]
        print(f"[INFO]: Submitting job {i + 1} of {len(jobs)} with job '{job}' to node {gpu_node}")
        print(
            f"[INFO]: Resource parameters: GPU: {args.gpu_per_worker[i]}"
            f" CPU: {args.cpu_per_worker[i]} RAM {args.ram_gb_per_worker[i]}"
        )
        print(f"[INFO] For the node parameters, creating {args.num_workers[i]} workers")
        num_gpus = args.gpu_per_worker[i] / args.num_workers[i]
        num_cpus = args.cpu_per_worker[i] / args.num_workers[i]
        memory = (args.ram_gb_per_worker[i] * 1024**3) / args.num_workers[i]
        print(f"[INFO]: Requesting {num_gpus=} {num_cpus=} {memory=} id={gpu_node['id']}")
        job = util.remote_execute_job.options(
            num_gpus=num_gpus,
            num_cpus=num_cpus,
            memory=memory,
            scheduling_strategy=NodeAffinitySchedulingStrategy(gpu_node["id"], soft=False),
        ).remote(job, f"Job {i}", args.test)
        job_results.append(job)

    results = ray.get(job_results)
    for i, result in enumerate(results):
        print(f"[INFO]: Job {i} result: {result}")
    print("[INFO]: All jobs completed.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Submit multiple jobs with optional GPU testing.")
    parser = util.add_resource_arguments(arg_parser=parser)
    parser.add_argument("--ray_address", type=str, default="auto", help="the Ray address.")
    parser.add_argument(
        "--test",
        action="store_true",
        help=(
            "Run nvidia-smi test instead of the arbitrary job,"
            "can use as a sanity check prior to any jobs to check "
            "that GPU resources are correctly isolated."
        ),
    )
    parser.add_argument(
        "--sub_jobs",
        type=str,
        nargs=argparse.REMAINDER,
        help="This should be last wrapper argument. Jobs separated by the + delimiter to run on a cluster.",
    )
    args = parser.parse_args()
    if args.sub_jobs is not None:
        jobs = " ".join(args.sub_jobs)
        formatted_jobs = jobs.split("+")
    else:
        formatted_jobs = []
    print(f"[INFO]: Isaac Ray Wrapper received jobs {formatted_jobs=}")
    wrap_resources_to_jobs(jobs=formatted_jobs, args=args)
scripts/reinforcement_learning/ray/tuner.py
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import argparse
import importlib.util
import os
import sys
from time import sleep

import ray
import util
from ray import air, tune
from ray.tune.search.optuna import OptunaSearch
from ray.tune.search.repeater import Repeater

"""
This script breaks down an aggregate tuning job, as defined by a hyperparameter sweep configuration,
into individual jobs (shell commands) to run on the GPU-enabled nodes of the cluster.
By default, one worker is created for each GPU-enabled node in the cluster for each individual job.
To use more than one worker per node (likely the case for multi-GPU machines), supply the
num_workers_per_node argument.

Each hyperparameter sweep configuration should include the workflow,
runner arguments, and hydra arguments to vary.

This assumes that all workers in a cluster are homogeneous. For heterogeneous workloads,
create several heterogeneous clusters (with homogeneous nodes in each cluster),
then submit several overall-cluster jobs with :file:`../submit_job.py`.
KubeRay clusters on Google GKE can be created with :file:`../launch.py`

To report tune metrics on clusters, a running MLFlow server with a known URI that the cluster has
access to is required. For KubeRay clusters configured with :file:`../launch.py`, this is included
automatically, and can be easily found with with :file:`grok_cluster_with_kubectl.py`

Usage:

.. code-block:: bash

    ./isaaclab.sh -p scripts/reinforcement_learning/ray/tuner.py -h

    # Examples
    # Local
    ./isaaclab.sh -p scripts/reinforcement_learning/ray/tuner.py --run_mode local \
    --cfg_file scripts/reinforcement_learning/ray/hyperparameter_tuning/vision_cartpole_cfg.py \
    --cfg_class CartpoleTheiaJobCfg
    # Remote (run grok cluster or create config file mentioned in :file:`submit_job.py`)
    ./isaaclab.sh -p scripts/reinforcement_learning/ray/submit_job.py \
    --aggregate_jobs tuner.py \
    --cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \
    --cfg_class CartpoleTheiaJobCfg --mlflow_uri <MLFLOW_URI_FROM_GROK_OR_MANUAL>

"""

DOCKER_PREFIX = "/workspace/isaaclab/"
BASE_DIR = os.path.expanduser("~")
PYTHON_EXEC = "./isaaclab.sh -p"
WORKFLOW = "scripts/reinforcement_learning/rl_games/train.py"
NUM_WORKERS_PER_NODE = 1  # needed for local parallelism


class IsaacLabTuneTrainable(tune.Trainable):
    """The Isaac Lab Ray Tune Trainable.
    This class uses the standalone workflows to start jobs, along with the hydra integration.
    This class achieves Ray-based logging through reading the tensorboard logs from
    the standalone workflows. This depends on a config generated in the format of
    :class:`JobCfg`
    """

    def setup(self, config: dict) -> None:
        """Get the invocation command, return quick for easy scheduling."""
        self.data = None
        self.invoke_cmd = util.get_invocation_command_from_cfg(cfg=config, python_cmd=PYTHON_EXEC, workflow=WORKFLOW)
        print(f"[INFO]: Recovered invocation with {self.invoke_cmd}")
        self.experiment = None

    def reset_config(self, new_config: dict):
        """Allow environments to be re-used by fetching a new invocation command"""
        self.setup(new_config)
        return True

    def step(self) -> dict:
        if self.experiment is None:  # start experiment
            # When including this as first step instead of setup, experiments get scheduled faster
            # Don't want to block the scheduler while the experiment spins up
            print(f"[INFO]: Invoking experiment as first step with {self.invoke_cmd}...")
            experiment = util.execute_job(
                self.invoke_cmd,
                identifier_string="",
                extract_experiment=True,
                persistent_dir=BASE_DIR,
            )
            self.experiment = experiment
            print(f"[INFO]: Tuner recovered experiment info {experiment}")
            self.proc = experiment["proc"]
            self.experiment_name = experiment["experiment_name"]
            self.isaac_logdir = experiment["logdir"]
            self.tensorboard_logdir = self.isaac_logdir + "/" + self.experiment_name
            self.done = False

        if self.proc is None:
            raise ValueError("Could not start trial.")
        proc_status = self.proc.poll()
        if proc_status is not None:  # process finished, signal finish
            self.data["done"] = True
            print(f"[INFO]: Process finished with {proc_status}, returning...")
        else:  # wait until the logs are ready or fresh
            data = util.load_tensorboard_logs(self.tensorboard_logdir)

            while data is None:
                data = util.load_tensorboard_logs(self.tensorboard_logdir)
                sleep(2)  # Lazy report metrics to avoid performance overhead

            if self.data is not None:
                while util._dicts_equal(data, self.data):
                    data = util.load_tensorboard_logs(self.tensorboard_logdir)
                    sleep(2)  # Lazy report metrics to avoid performance overhead

            self.data = data
            self.data["done"] = False
        return self.data

    def default_resource_request(self):
        """How many resources each trainable uses. Assumes homogeneous resources across gpu nodes,
        and that each trainable is meant for one node, where it uses all available resources."""
        resources = util.get_gpu_node_resources(one_node_only=True)
        if NUM_WORKERS_PER_NODE != 1:
            print("[WARNING]: Splitting node into more than one worker")
        return tune.PlacementGroupFactory(
            [{"CPU": resources["CPU"] / NUM_WORKERS_PER_NODE, "GPU": resources["GPU"] / NUM_WORKERS_PER_NODE}],
            strategy="STRICT_PACK",
        )


def invoke_tuning_run(cfg: dict, args: argparse.Namespace) -> None:
    """Invoke an Isaac-Ray tuning run.

    Log either to a local directory or to MLFlow.
    Args:
        cfg: Configuration dictionary extracted from job setup
        args: Command-line arguments related to tuning.
    """
    # Allow for early exit
    os.environ["TUNE_DISABLE_STRICT_METRIC_CHECKING"] = "1"

    print("[WARNING]: Not saving checkpoints, just running experiment...")
    print("[INFO]: Model parameters and metrics will be preserved.")
    print("[WARNING]: For homogeneous cluster resources only...")
    # Get available resources
    resources = util.get_gpu_node_resources()
    print(f"[INFO]: Available resources {resources}")

    if not ray.is_initialized():
        ray.init(
            address=args.ray_address,
            log_to_driver=True,
            num_gpus=len(resources),
        )

    print(f"[INFO]: Using config {cfg}")

    # Configure the search algorithm and the repeater
    searcher = OptunaSearch(
        metric=args.metric,
        mode=args.mode,
    )
    repeat_search = Repeater(searcher, repeat=args.repeat_run_count)

    if args.run_mode == "local":  # Standard config, to file
        run_config = air.RunConfig(
            storage_path="/tmp/ray",
            name=f"IsaacRay-{args.cfg_class}-tune",
            verbose=1,
            checkpoint_config=air.CheckpointConfig(
                checkpoint_frequency=0,  # Disable periodic checkpointing
                checkpoint_at_end=False,  # Disable final checkpoint
            ),
        )

    elif args.run_mode == "remote":  # MLFlow, to MLFlow server
        mlflow_callback = MLflowLoggerCallback(
            tracking_uri=args.mlflow_uri,
            experiment_name=f"IsaacRay-{args.cfg_class}-tune",
            save_artifact=False,
            tags={"run_mode": "remote", "cfg_class": args.cfg_class},
        )

        run_config = ray.train.RunConfig(
            name="mlflow",
            storage_path="/tmp/ray",
            callbacks=[mlflow_callback],
            checkpoint_config=ray.train.CheckpointConfig(checkpoint_frequency=0, checkpoint_at_end=False),
        )
    else:
        raise ValueError("Unrecognized run mode.")

    # Configure the tuning job
    tuner = tune.Tuner(
        IsaacLabTuneTrainable,
        param_space=cfg,
        tune_config=tune.TuneConfig(
            search_alg=repeat_search,
            num_samples=args.num_samples,
            reuse_actors=True,
        ),
        run_config=run_config,
    )

    # Execute the tuning
    tuner.fit()

    # Save results to mounted volume
    if args.run_mode == "local":
        print("[DONE!]: Check results with tensorboard dashboard")
    else:
        print("[DONE!]: Check results with MLFlow dashboard")


class JobCfg:
    """To be compatible with :meth: invoke_tuning_run and :class:IsaacLabTuneTrainable,
    at a minimum, the tune job should inherit from this class."""

    def __init__(self, cfg: dict):
        """
        Runner args include command line arguments passed to the task.
        For example:
        cfg["runner_args"]["headless_singleton"] = "--headless"
        cfg["runner_args"]["enable_cameras_singleton"] = "--enable_cameras"
        """
        assert "runner_args" in cfg, "No runner arguments specified."
        """
        Task is the desired task to train on. For example:
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"])
        """
        assert "--task" in cfg["runner_args"], "No task specified."
        """
        Hydra args define the hyperparameters varied within the sweep. For example:
        cfg["hydra_args"]["agent.params.network.cnn.activation"] = tune.choice(["relu", "elu"])
        """
        assert "hydra_args" in cfg, "No hyperparameters specified."
        self.cfg = cfg


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Tune Isaac Lab hyperparameters.")
    parser.add_argument("--ray_address", type=str, default="auto", help="the Ray address.")
    parser.add_argument(
        "--cfg_file",
        type=str,
        default="hyperparameter_tuning/vision_cartpole_cfg.py",
        required=False,
        help="The relative filepath where a hyperparameter sweep is defined",
    )
    parser.add_argument(
        "--cfg_class",
        type=str,
        default="CartpoleRGBNoTuneJobCfg",
        required=False,
        help="Name of the hyperparameter sweep class to use",
    )
    parser.add_argument(
        "--run_mode",
        choices=["local", "remote"],
        default="remote",
        help=(
            "Set to local to use ./isaaclab.sh -p python, set to "
            "remote to use /workspace/isaaclab/isaaclab.sh -p python"
        ),
    )
    parser.add_argument(
        "--workflow",
        default=None,  # populated with RL Games
        help="The absolute path of the workflow to use for the experiment. By default, RL Games is used.",
    )
    parser.add_argument(
        "--mlflow_uri",
        type=str,
        default=None,
        required=False,
        help="The MLFlow Uri.",
    )
    parser.add_argument(
        "--num_workers_per_node",
        type=int,
        default=1,
        help="Number of workers to run on each GPU node. Only supply for parallelism on multi-gpu nodes",
    )

    parser.add_argument("--metric", type=str, default="rewards/time", help="What metric to tune for.")

    parser.add_argument(
        "--mode",
        choices=["max", "min"],
        default="max",
        help="What to optimize the metric to while tuning",
    )
    parser.add_argument(
        "--num_samples",
        type=int,
        default=100,
        help="How many hyperparameter runs to try total.",
    )
    parser.add_argument(
        "--repeat_run_count",
        type=int,
        default=3,
        help="How many times to repeat each hyperparameter config.",
    )

    args = parser.parse_args()
    NUM_WORKERS_PER_NODE = args.num_workers_per_node
    print(f"[INFO]: Using {NUM_WORKERS_PER_NODE} workers per node.")
    if args.run_mode == "remote":
        BASE_DIR = DOCKER_PREFIX  # ensure logs are dumped to persistent location
        PYTHON_EXEC = DOCKER_PREFIX + PYTHON_EXEC[2:]
        if args.workflow is None:
            WORKFLOW = DOCKER_PREFIX + WORKFLOW
        else:
            WORKFLOW = args.workflow
        print(f"[INFO]: Using remote mode {PYTHON_EXEC=} {WORKFLOW=}")

        if args.mlflow_uri is not None:
            import mlflow

            mlflow.set_tracking_uri(args.mlflow_uri)
            from ray.air.integrations.mlflow import MLflowLoggerCallback
        else:
            raise ValueError("Please provide a result MLFLow URI server.")
    else:  # local
        PYTHON_EXEC = os.getcwd() + "/" + PYTHON_EXEC[2:]
        if args.workflow is None:
            WORKFLOW = os.getcwd() + "/" + WORKFLOW
        else:
            WORKFLOW = args.workflow
        BASE_DIR = os.getcwd()
        print(f"[INFO]: Using local mode {PYTHON_EXEC=} {WORKFLOW=}")
    file_path = args.cfg_file
    class_name = args.cfg_class
    print(f"[INFO]: Attempting to use sweep config from {file_path=} {class_name=}")
    module_name = os.path.splitext(os.path.basename(file_path))[0]

    spec = importlib.util.spec_from_file_location(module_name, file_path)
    module = importlib.util.module_from_spec(spec)
    sys.modules[module_name] = module
    spec.loader.exec_module(module)
    print(f"[INFO]: Successfully imported {module_name} from {file_path}")
    if hasattr(module, class_name):
        ClassToInstantiate = getattr(module, class_name)
        print(f"[INFO]: Found correct class {ClassToInstantiate}")
        instance = ClassToInstantiate()
        print(f"[INFO]: Successfully instantiated class '{class_name}' from {file_path}")
        cfg = instance.cfg
        print(f"[INFO]: Grabbed the following hyperparameter sweep config: \n {cfg}")
        invoke_tuning_run(cfg, args)

    else:
        raise AttributeError(f"[ERROR]:Class '{class_name}' not found in {file_path}")

以下脚本可用于向一个或多个 Ray 集群提交聚合任务,这些任务可用于在远程集群上运行作业或同时运行具有异构资源要求的作业。

脚本 reinforcement_learning/ray/submit_job.py
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse
import os
import time
from concurrent.futures import ThreadPoolExecutor

from ray import job_submission

"""
This script submits aggregate job(s) to cluster(s) described in a
config file containing ``name: <NAME> address: http://<IP>:<PORT>`` on
a new line for each cluster. For KubeRay clusters, this file
can be automatically created with :file:`grok_cluster_with_kubectl.py`

Aggregate job(s) are matched with cluster(s) via the following relation:
cluster_line_index_submitted_to = job_index % total_cluster_count

Aggregate jobs are separated by the * delimiter. The ``--aggregate_jobs`` argument must be
the last argument supplied to the script.

An aggregate job could be a :file:`../tuner.py` tuning job, which automatically
creates several individual jobs when started on a cluster. Alternatively, an aggregate job
could be a :file:'../wrap_resources.py` resource-wrapped job,
which may contain several individual sub-jobs separated by
the + delimiter.

If there are more aggregate jobs than cluster(s), aggregate jobs will be submitted
as clusters become available via the defined relation above. If there are less aggregate job(s)
than clusters, some clusters will not receive aggregate job(s). The maximum number of
aggregate jobs that can be run simultaneously is equal to the number of workers created by
default by a ThreadPoolExecutor on the machine submitting jobs due to fetching the log output after
jobs finish, which is unlikely to constrain overall-job submission.

Usage:

.. code-block:: bash

    # Example; submitting a tuning job
    python3 scripts/reinforcement_learning/ray/submit_job.py \
    --aggregate_jobs /workspace/isaaclab/scripts/reinforcement_learning/ray/tuner.py \
        --cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \
        --cfg_class CartpoleTheiaJobCfg --mlflow_uri <ML_FLOW_URI>

    # Example: Submitting resource wrapped job
    python3 scripts/reinforcement_learning/ray/submit_job.py --aggregate_jobs wrap_resources.py --test

    # For all command line arguments
    python3 scripts/reinforcement_learning/ray/submit_job.py -h
"""
script_directory = os.path.dirname(os.path.abspath(__file__))
CONFIG = {"working_dir": script_directory, "executable": "/workspace/isaaclab/isaaclab.sh -p"}


def read_cluster_spec(fn: str | None = None) -> list[dict]:
    if fn is None:
        cluster_spec_path = os.path.expanduser("~/.cluster_config")
    else:
        cluster_spec_path = os.path.expanduser(fn)

    if not os.path.exists(cluster_spec_path):
        raise FileNotFoundError(f"Cluster spec file not found at {cluster_spec_path}")

    clusters = []
    with open(cluster_spec_path) as f:
        for line in f:
            parts = line.strip().split(" ")
            http_address = parts[3]
            cluster_info = {"name": parts[1], "address": http_address}
            print(f"[INFO] Setting {cluster_info['name']}")  # with {cluster_info['num_gpu']} GPUs.")
            clusters.append(cluster_info)

    return clusters


def submit_job(cluster: dict, job_command: str) -> None:
    """
    Submits a job to a single cluster, prints the final result and Ray dashboard URL at the end.
    """
    address = cluster["address"]
    cluster_name = cluster["name"]
    print(f"[INFO]: Submitting job to cluster '{cluster_name}' at {address}")  # with {num_gpus} GPUs.")
    client = job_submission.JobSubmissionClient(address)
    runtime_env = {"working_dir": CONFIG["working_dir"], "executable": CONFIG["executable"]}
    print(f"[INFO]: Checking contents of the directory: {CONFIG['working_dir']}")
    try:
        dir_contents = os.listdir(CONFIG["working_dir"])
        print(f"[INFO]: Directory contents: {dir_contents}")
    except Exception as e:
        print(f"[INFO]: Failed to list directory contents: {str(e)}")
    entrypoint = f"{CONFIG['executable']} {job_command}"
    print(f"[INFO]: Attempting entrypoint {entrypoint=} in cluster {cluster}")
    job_id = client.submit_job(entrypoint=entrypoint, runtime_env=runtime_env)
    status = client.get_job_status(job_id)
    while status in [job_submission.JobStatus.PENDING, job_submission.JobStatus.RUNNING]:
        time.sleep(5)
        status = client.get_job_status(job_id)

    final_logs = client.get_job_logs(job_id)
    print("----------------------------------------------------")
    print(f"[INFO]: Cluster {cluster_name} Logs: \n")
    print(final_logs)
    print("----------------------------------------------------")


def submit_jobs_to_clusters(jobs: list[str], clusters: list[dict]) -> None:
    """
    Submit all jobs to their respective clusters, cycling through clusters if there are more jobs than clusters.
    """
    if not clusters:
        raise ValueError("No clusters available for job submission.")

    if len(jobs) < len(clusters):
        print("[INFO]: Less jobs than clusters, some clusters will not receive jobs")
    elif len(jobs) == len(clusters):
        print("[INFO]: Exactly one job per cluster")
    else:
        print("[INFO]: More jobs than clusters, jobs submitted as clusters become available.")
    with ThreadPoolExecutor() as executor:
        for idx, job_command in enumerate(jobs):
            # Cycle through clusters using modulus to wrap around if there are more jobs than clusters
            cluster = clusters[idx % len(clusters)]
            executor.submit(submit_job, cluster, job_command)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Submit multiple GPU jobs to multiple Ray clusters.")
    parser.add_argument("--config_file", default="~/.cluster_config", help="The cluster config path.")
    parser.add_argument(
        "--aggregate_jobs",
        type=str,
        nargs=argparse.REMAINDER,
        help="This should be last argument. The aggregate jobs to submit separated by the * delimiter.",
    )
    args = parser.parse_args()
    if args.aggregate_jobs is not None:
        jobs = " ".join(args.aggregate_jobs)
        formatted_jobs = jobs.split("*")
        if len(formatted_jobs) > 1:
            print("Warning; Split jobs by cluster with the * delimiter")
    else:
        formatted_jobs = []
    print(f"[INFO]: Isaac Ray Wrapper received jobs {formatted_jobs=}")
    clusters = read_cluster_spec(args.config_file)
    submit_jobs_to_clusters(formatted_jobs, clusters)

以下脚本可用于提取 KubeRay 集群信息,以便进行聚合作业提交。

scripts/reinforcement_learning/ray/grok_cluster_with_kubectl.py
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse
import os
import re
import subprocess
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

"""
This script requires that kubectl is installed and KubeRay was used to create the cluster.

Creates a config file containing ``name: <NAME> address: http://<IP>:<PORT>`` on
a new line for each cluster, and also fetches the MLFlow URI.

Usage:

.. code-block:: bash

    python3 scripts/reinforcement_learning/ray/grok_cluster_with_kubectl.py
    # For options, supply -h arg
"""


def get_namespace() -> str:
    """Get the current Kubernetes namespace from the context, fallback to default if not set"""
    try:
        namespace = (
            subprocess.check_output(["kubectl", "config", "view", "--minify", "--output", "jsonpath={..namespace}"])
            .decode()
            .strip()
        )
        if not namespace:
            namespace = "default"
    except subprocess.CalledProcessError:
        namespace = "default"
    return namespace


def get_pods(namespace: str = "default") -> list[tuple]:
    """Get a list of all of the pods in the namespace"""
    cmd = ["kubectl", "get", "pods", "-n", namespace, "--no-headers"]
    output = subprocess.check_output(cmd).decode()
    pods = []
    for line in output.strip().split("\n"):
        fields = line.split()
        pod_name = fields[0]
        status = fields[2]
        pods.append((pod_name, status))
    return pods


def get_clusters(pods: list, cluster_name_prefix: str) -> set:
    """
    Get unique cluster name(s). Works for one or more clusters, based off of the number of head nodes.
    Excludes MLflow deployments.
    """
    clusters = set()
    for pod_name, _ in pods:
        # Skip MLflow pods
        if "-mlflow" in pod_name:
            continue

        match = re.match(r"(" + re.escape(cluster_name_prefix) + r"[-\w]+)", pod_name)
        if match:
            # Get base name without head/worker suffix (skip workers)
            if "head" in pod_name:
                base_name = match.group(1).split("-head")[0]
                clusters.add(base_name)
    return sorted(clusters)


def get_mlflow_info(namespace: str = None, cluster_prefix: str = "isaacray") -> str:
    """
    Get MLflow service information if it exists in the namespace with the given prefix.
    Only works for a single cluster instance.
    Args:
        namespace: Kubernetes namespace
        cluster_prefix: Base cluster name (without -head/-worker suffixes)
    Returns:
        MLflow service URL
    """
    # Strip any -head or -worker suffixes to get base name
    if namespace is None:
        namespace = get_namespace()
    pods = get_pods(namespace=namespace)
    clusters = get_clusters(pods=pods, cluster_name_prefix=cluster_prefix)
    if len(clusters) > 1:
        raise ValueError("More than one cluster matches prefix, could not automatically determine mlflow info.")
    mlflow_name = f"{cluster_prefix}-mlflow"

    cmd = ["kubectl", "get", "svc", mlflow_name, "-n", namespace, "--no-headers"]
    try:
        output = subprocess.check_output(cmd).decode()
        fields = output.strip().split()

        # Get cluster IP
        cluster_ip = fields[2]
        port = "5000"  # Default MLflow port
        # This needs to be http to be resolved. HTTPS can't be resolved
        # This should be fine as it is on a subnet on the cluster regardless
        return f"http://{cluster_ip}:{port}"
    except subprocess.CalledProcessError as e:
        raise ValueError(f"Could not grok MLflow: {e}")  # Fixed f-string


def check_clusters_running(pods: list, clusters: set) -> bool:
    """
    Check that all of the pods in all provided clusters are running.

    Args:
        pods (list): A list of tuples where each tuple contains the pod name and its status.
        clusters (set): A set of cluster names to check.

    Returns:
        bool: True if all pods in any of the clusters are running, False otherwise.
    """
    clusters_running = False
    for cluster in clusters:
        cluster_pods = [p for p in pods if p[0].startswith(cluster)]
        total_pods = len(cluster_pods)
        running_pods = len([p for p in cluster_pods if p[1] == "Running"])
        if running_pods == total_pods and running_pods > 0:
            clusters_running = True
            break
    return clusters_running


def get_ray_address(head_pod: str, namespace: str = "default", ray_head_name: str = "head") -> str:
    """
    Given a cluster head pod, check its logs, which should include the ray address which can accept job requests.

    Args:
        head_pod (str): The name of the head pod.
        namespace (str, optional): The Kubernetes namespace. Defaults to "default".
        ray_head_name (str, optional): The name of the ray head container. Defaults to "head".

    Returns:
        str: The ray address if found, None otherwise.

    Raises:
        ValueError: If the logs cannot be retrieved or the ray address is not found.
    """
    cmd = ["kubectl", "logs", head_pod, "-c", ray_head_name, "-n", namespace]
    try:
        output = subprocess.check_output(cmd).decode()
    except subprocess.CalledProcessError as e:
        raise ValueError(
            f"Could not enter head container with cmd {cmd}: {e}Perhaps try a different namespace or ray head name."
        )
    match = re.search(r"RAY_ADDRESS='([^']+)'", output)
    if match:
        return match.group(1)
    else:
        return None


def process_cluster(cluster_info: dict, ray_head_name: str = "head") -> str:
    """
    For each cluster, check that it is running, and get the Ray head address that will accept jobs.

    Args:
        cluster_info (dict): A dictionary containing cluster information with keys 'cluster', 'pods', and 'namespace'.
        ray_head_name (str, optional): The name of the ray head container. Defaults to "head".

    Returns:
        str: A string containing the cluster name and its Ray head address, or an error message if the head pod or Ray address is not found.
    """
    cluster, pods, namespace = cluster_info
    head_pod = None
    for pod_name, status in pods:
        if pod_name.startswith(cluster + "-head"):
            head_pod = pod_name
            break
    if not head_pod:
        return f"Error: Could not find head pod for cluster {cluster}\n"

    # Get RAY_ADDRESS and status
    ray_address = get_ray_address(head_pod, namespace=namespace, ray_head_name=ray_head_name)
    if not ray_address:
        return f"Error: Could not find RAY_ADDRESS for cluster {cluster}\n"

    # Return only cluster and ray address
    return f"name: {cluster} address: {ray_address}\n"


def main():
    # Parse command-line arguments
    parser = argparse.ArgumentParser(description="Process Ray clusters and save their specifications.")
    parser.add_argument("--prefix", default="isaacray", help="The prefix for the cluster names.")
    parser.add_argument("--output", default="~/.cluster_config", help="The file to save cluster specifications.")
    parser.add_argument("--ray_head_name", default="head", help="The metadata name for the ray head container")
    parser.add_argument(
        "--namespace", help="Kubernetes namespace to use. If not provided, will detect from current context."
    )
    args = parser.parse_args()

    # Get namespace from args or detect it
    current_namespace = args.namespace if args.namespace else get_namespace()
    print(f"Using namespace: {current_namespace}")

    cluster_name_prefix = args.prefix
    cluster_spec_file = os.path.expanduser(args.output)

    # Get all pods
    pods = get_pods(namespace=current_namespace)

    # Get clusters
    clusters = get_clusters(pods, cluster_name_prefix)
    if not clusters:
        print(f"No clusters found with prefix {cluster_name_prefix}")
        return

    # Wait for clusters to be running
    while True:
        pods = get_pods(namespace=current_namespace)
        if check_clusters_running(pods, clusters):
            break
        print("Waiting for all clusters to spin up...")
        time.sleep(5)

    print("Checking for MLflow:")
    # Check MLflow status for each cluster
    for cluster in clusters:
        try:
            mlflow_address = get_mlflow_info(current_namespace, cluster)
            print(f"MLflow address for {cluster}: {mlflow_address}")
        except ValueError as e:
            print(f"ML Flow not located: {e}")
    print()

    # Prepare cluster info for parallel processing
    cluster_infos = []
    for cluster in clusters:
        cluster_pods = [p for p in pods if p[0].startswith(cluster)]
        cluster_infos.append((cluster, cluster_pods, current_namespace))

    # Use ThreadPoolExecutor to process clusters in parallel
    results = []
    results_lock = threading.Lock()

    with ThreadPoolExecutor() as executor:
        future_to_cluster = {
            executor.submit(process_cluster, info, args.ray_head_name): info[0] for info in cluster_infos
        }
        for future in as_completed(future_to_cluster):
            cluster_name = future_to_cluster[future]
            try:
                result = future.result()
                with results_lock:
                    results.append(result)
            except Exception as exc:
                print(f"{cluster_name} generated an exception: {exc}")

    # Sort results alphabetically by cluster name
    results.sort()

    # Write sorted results to the output file (Ray info only)
    with open(cluster_spec_file, "w") as f:
        for result in results:
            f.write(result)

    print(f"Cluster spec information saved to {cluster_spec_file}")
    # Display the contents of the config file
    with open(cluster_spec_file) as f:
        print(f.read())


if __name__ == "__main__":
    main()

以下脚本可以用来轻松在 Google GKE 上创建集群。

scripts/reinforcement_learning/ray/launch.py
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse
import pathlib
import subprocess
import yaml

import util
from jinja2 import Environment, FileSystemLoader
from kubernetes import config

"""This script helps create one or more KubeRay clusters.

Usage:

.. code-block:: bash
    # If the head node is stuck on container creating, make sure to create a secret
    python3 scripts/reinforcement_learning/ray/launch.py -h

    # Examples

    # The following creates 8 GPUx1 nvidia l4 workers
    python3 scripts/reinforcement_learning/ray/launch.py --cluster_host google_cloud \
    --namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
    --num_workers 8 --num_clusters 1 --worker_accelerator nvidia-l4 --gpu_per_worker 1

    # The following creates 1 GPUx1 nvidia l4 worker, 2 GPUx2 nvidia-tesla-t4 workers,
    # and 2 GPUx4 nvidia-tesla-t4 GPU workers
    python3 scripts/reinforcement_learning/ray/launch.py --cluster_host google_cloud \
    --namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
    --num_workers 1 2 --num_clusters 1 \
    --worker_accelerator nvidia-l4 nvidia-tesla-t4 --gpu_per_worker 1 2 4
"""
RAY_DIR = pathlib.Path(__file__).parent


def apply_manifest(args: argparse.Namespace) -> None:
    """Provided a Jinja templated ray.io/v1alpha1 file,
    populate the arguments and create the cluster. Additionally, create
    kubernetes containers for resources separated by '---' from the rest
    of the file.

    Args:
        args: Possible arguments concerning cluster parameters.
    """
    # Load Kubernetes configuration
    config.load_kube_config()

    # Set up Jinja2 environment for loading templates
    templates_dir = RAY_DIR / "cluster_configs" / args.cluster_host
    file_loader = FileSystemLoader(str(templates_dir))
    jinja_env = Environment(loader=file_loader, keep_trailing_newline=True, autoescape=True)

    # Define template filename
    template_file = "kuberay.yaml.jinja"

    # Convert args namespace to a dictionary
    template_params = vars(args)

    # Load and render the template
    template = jinja_env.get_template(template_file)
    file_contents = template.render(template_params)

    # Parse all YAML documents in the rendered template
    all_yamls = []
    for doc in yaml.safe_load_all(file_contents):
        all_yamls.append(doc)

    # Convert back to YAML string, preserving multiple documents
    cleaned_yaml_string = ""
    for i, doc in enumerate(all_yamls):
        if i > 0:
            cleaned_yaml_string += "\n---\n"
        cleaned_yaml_string += yaml.dump(doc)

    # Apply the Kubernetes manifest using kubectl
    try:
        print(cleaned_yaml_string)
        subprocess.run(["kubectl", "apply", "-f", "-"], input=cleaned_yaml_string, text=True, check=True)
    except subprocess.CalledProcessError as e:
        exit(f"An error occurred while running `kubectl`: {e}")


def parse_args() -> argparse.Namespace:
    """
    Parse command-line arguments for Kubernetes deployment script.

    Returns:
        argparse.Namespace: Parsed command-line arguments.
    """
    arg_parser = argparse.ArgumentParser(
        description="Script to apply manifests to create Kubernetes objects for Ray clusters.",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )

    arg_parser.add_argument(
        "--cluster_host",
        type=str,
        default="google_cloud",
        choices=["google_cloud"],
        help=(
            "In the cluster_configs directory, the name of the folder where a tune.yaml.jinja"
            "file exists defining the KubeRay config. Currently only google_cloud is supported."
        ),
    )

    arg_parser.add_argument(
        "--name",
        type=str,
        required=False,
        default="isaacray",
        help="Name of the Kubernetes deployment.",
    )

    arg_parser.add_argument(
        "--namespace",
        type=str,
        required=False,
        default="default",
        help="Kubernetes namespace to deploy the Ray cluster.",
    )

    arg_parser.add_argument(
        "--service_acount_name", type=str, required=False, default="default", help="The service account name to use."
    )

    arg_parser.add_argument(
        "--image",
        type=str,
        required=True,
        help="Docker image for the Ray cluster pods.",
    )

    arg_parser.add_argument(
        "--worker_accelerator",
        nargs="+",
        type=str,
        default=["nvidia-l4"],
        help="GPU accelerator name. Supply more than one for heterogeneous resources.",
    )

    arg_parser = util.add_resource_arguments(arg_parser, cluster_create_defaults=True)

    arg_parser.add_argument(
        "--num_clusters",
        type=int,
        default=1,
        help="How many Ray Clusters to create.",
    )
    arg_parser.add_argument(
        "--num_head_cpu",
        type=float,  # to be able to schedule partial CPU heads
        default=8,
        help="The number of CPUs to give the Ray head.",
    )

    arg_parser.add_argument("--head_ram_gb", type=int, default=8, help="How many gigs of ram to give the Ray head")
    args = arg_parser.parse_args()
    return util.fill_in_missing_resources(args, cluster_creation_flag=True)


def main():
    args = parse_args()

    if "head" in args.name:
        raise ValueError("For compatibility with other scripts, do not include head in the name")
    if args.num_clusters == 1:
        apply_manifest(args)
    else:
        default_name = args.name
        for i in range(args.num_clusters):
            args.name = default_name + "-" + str(i)
            apply_manifest(args)


if __name__ == "__main__":
    main()

基于 Docker 的本地快速启动#

首先,按照 Docker Guide 设置 NVIDIA Container Toolkit 和 Docker Compose。

然后,执行以下步骤以开始调整运行。

# Build the base image, but we don't need to run it
python3 docker/container.py start && python3 docker/container.py stop
# Build the tuning image with extra deps
docker build -t isaacray -f scripts/reinforcement_learning/ray/cluster_configs/Dockerfile .
# Start the tuning image - symlink so that changes in the source folder show up in the container
docker run -v $(pwd)/source:/workspace/isaaclab/source -it --gpus all --net=host --entrypoint /bin/bash isaacray
# Start the Ray server within the tuning image
echo "import ray; ray.init(); import time; [time.sleep(10) for _ in iter(int, 1)]" | ./isaaclab.sh -p

在另一个终端中,运行以下命令。

# In a new terminal (don't close the above) , enter the image with a new shell.
docker container ps
docker exec -it <ISAAC_RAY_IMAGE_ID_FROM_CONTAINER_PS> /bin/bash
# Start a tuning run, with one parallel worker per GPU
./isaaclab.sh -p scripts/reinforcement_learning/ray/tuner.py \
  --cfg_file scripts/reinforcement_learning/ray/hyperparameter_tuning/vision_cartpole_cfg.py \
  --cfg_class CartpoleTheiaJobCfg \
  --run_mode local \
  --workflow scripts/reinforcement_learning/rl_games/train.py \
  --num_workers_per_node <NUMBER_OF_GPUS_IN_COMPUTER>

要查看训练日志,请在另一个终端中运行以下命令,并在浏览器中访问 localhost:6006

# In a new terminal (don't close the above) , enter the image with a new shell.
docker container ps
docker exec -it <ISAAC_RAY_IMAGE_ID_FROM_CONTAINER_PS> /bin/bash
# Start a tuning run, with one parallel worker per GPU
tensorboard --logdir=.

提交资源包装的单个作业而不是自动调优运行的内容在以下文件中描述。

scripts/reinforcement_learning/ray/wrap_resources.py
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse

import ray
import util
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

"""
This script dispatches sub-job(s) (individual jobs, use :file:`tuner.py` for tuning jobs)
to worker(s) on GPU-enabled node(s) of a specific cluster as part of an resource-wrapped aggregate
job. If no desired compute resources for each sub-job are specified,
this script creates one worker per available node for each node with GPU(s) in the cluster.
If the desired resources for each sub-job is specified,
the maximum number of workers possible with the desired resources are created for each node
with GPU(s) in the cluster. It is also possible to split available node resources for each node
into the desired number of workers with the ``--num_workers`` flag, to be able to easily
parallelize sub-jobs on multi-GPU nodes. Due to Isaac Lab requiring a GPU,
this ignores all CPU only nodes such as loggers.

Sub-jobs are matched with node(s) in a cluster via the following relation:
sorted_nodes = Node sorted by descending GPUs, then descending CPUs, then descending RAM, then node ID
node_submitted_to = sorted_nodes[job_index % total_node_count]

To check the ordering of sorted nodes, supply the ``--test`` argument and run the script.

Sub-jobs are separated by the + delimiter. The ``--sub_jobs`` argument must be the last
argument supplied to the script.

If there is more than one available worker, and more than one sub-job,
sub-jobs will be executed in parallel. If there are more sub-jobs than workers, sub-jobs will
be dispatched to workers as they become available. There is no limit on the number
of sub-jobs that can be near-simultaneously submitted.

This script is meant to be executed on a Ray cluster head node as an aggregate cluster job.
To submit aggregate cluster jobs such as this script to one or more remote clusters,
see :file:`../submit_isaac_ray_job.py`.

KubeRay clusters on Google GKE can be created with :file:`../launch.py`

Usage:

.. code-block:: bash
    # **Ensure that sub-jobs are separated by the ``+`` delimiter.**
    # Generic Templates-----------------------------------
    ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py -h
    # No resource isolation; no parallelization:
    ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py
    --sub_jobs <JOB0>+<JOB1>+<JOB2>
    # Automatic Resource Isolation; Example A: needed for parallelization
    ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py \
    --num_workers <NUM_TO_DIVIDE_TOTAL_RESOURCES_BY> \
    --sub_jobs <JOB0>+<JOB1>
    # Manual Resource Isolation; Example B:  needed for parallelization
    ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py --num_cpu_per_worker <CPU> \
    --gpu_per_worker <GPU> --ram_gb_per_worker <RAM> --sub_jobs <JOB0>+<JOB1>
    # Manual Resource Isolation; Example C: Needed for parallelization, for heterogeneous workloads
    ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py --num_cpu_per_worker <CPU> \
    --gpu_per_worker <GPU1> <GPU2> --ram_gb_per_worker <RAM> --sub_jobs <JOB0>+<JOB1>
    # to see all arguments
    ./isaaclab.sh -p scripts/reinforcement_learning/ray/wrap_resources.py -h
"""


def wrap_resources_to_jobs(jobs: list[str], args: argparse.Namespace) -> None:
    """
    Provided a list of jobs, dispatch jobs to one worker per available node,
    unless otherwise specified by resource constraints.

    Args:
        jobs: bash commands to execute on a Ray cluster
        args: The arguments for resource allocation

    """
    if not ray.is_initialized():
        ray.init(address=args.ray_address, log_to_driver=True)
    job_results = []
    gpu_node_resources = util.get_gpu_node_resources(include_id=True, include_gb_ram=True)

    if any([args.gpu_per_worker, args.cpu_per_worker, args.ram_gb_per_worker]) and args.num_workers:
        raise ValueError("Either specify only num_workers or only granular resources(GPU,CPU,RAM_GB).")

    num_nodes = len(gpu_node_resources)
    # Populate arguments
    formatted_node_resources = {
        "gpu_per_worker": [gpu_node_resources[i]["GPU"] for i in range(num_nodes)],
        "cpu_per_worker": [gpu_node_resources[i]["CPU"] for i in range(num_nodes)],
        "ram_gb_per_worker": [gpu_node_resources[i]["ram_gb"] for i in range(num_nodes)],
        "num_workers": args.num_workers,  # By default, 1 worker por node
    }
    args = util.fill_in_missing_resources(args, resources=formatted_node_resources, policy=min)
    print(f"[INFO]: Number of GPU nodes found: {num_nodes}")
    if args.test:
        jobs = ["nvidia-smi"] * num_nodes
    for i, job in enumerate(jobs):
        gpu_node = gpu_node_resources[i % num_nodes]
        print(f"[INFO]: Submitting job {i + 1} of {len(jobs)} with job '{job}' to node {gpu_node}")
        print(
            f"[INFO]: Resource parameters: GPU: {args.gpu_per_worker[i]}"
            f" CPU: {args.cpu_per_worker[i]} RAM {args.ram_gb_per_worker[i]}"
        )
        print(f"[INFO] For the node parameters, creating {args.num_workers[i]} workers")
        num_gpus = args.gpu_per_worker[i] / args.num_workers[i]
        num_cpus = args.cpu_per_worker[i] / args.num_workers[i]
        memory = (args.ram_gb_per_worker[i] * 1024**3) / args.num_workers[i]
        print(f"[INFO]: Requesting {num_gpus=} {num_cpus=} {memory=} id={gpu_node['id']}")
        job = util.remote_execute_job.options(
            num_gpus=num_gpus,
            num_cpus=num_cpus,
            memory=memory,
            scheduling_strategy=NodeAffinitySchedulingStrategy(gpu_node["id"], soft=False),
        ).remote(job, f"Job {i}", args.test)
        job_results.append(job)

    results = ray.get(job_results)
    for i, result in enumerate(results):
        print(f"[INFO]: Job {i} result: {result}")
    print("[INFO]: All jobs completed.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Submit multiple jobs with optional GPU testing.")
    parser = util.add_resource_arguments(arg_parser=parser)
    parser.add_argument("--ray_address", type=str, default="auto", help="the Ray address.")
    parser.add_argument(
        "--test",
        action="store_true",
        help=(
            "Run nvidia-smi test instead of the arbitrary job,"
            "can use as a sanity check prior to any jobs to check "
            "that GPU resources are correctly isolated."
        ),
    )
    parser.add_argument(
        "--sub_jobs",
        type=str,
        nargs=argparse.REMAINDER,
        help="This should be last wrapper argument. Jobs separated by the + delimiter to run on a cluster.",
    )
    args = parser.parse_args()
    if args.sub_jobs is not None:
        jobs = " ".join(args.sub_jobs)
        formatted_jobs = jobs.split("+")
    else:
        formatted_jobs = []
    print(f"[INFO]: Isaac Ray Wrapper received jobs {formatted_jobs=}")
    wrap_resources_to_jobs(jobs=formatted_jobs, args=args)

从正在运行的容器中转移文件可以按照以下步骤进行。

docker container ps
docker cp <ISAAC_RAY_IMAGE_ID_FROM_CONTAINER_PS>:</path/in/container/file>  </path/on/host/>

对于调优任务,指定调优任务 / 超参数搜索为 JobCfg 的子类。由于环境入口点和 hydra 参数的差异,所包含的 JobCfg 仅支持 rl_games 工作流,尽管如果提供兼容的 JobCfg ,其他工作流也能正常工作。

scripts/reinforcement_learning/ray/tuner.py (JobCfg 定义)
class JobCfg:
    """To be compatible with :meth: invoke_tuning_run and :class:IsaacLabTuneTrainable,
    at a minimum, the tune job should inherit from this class."""

    def __init__(self, cfg: dict):
        """
        Runner args include command line arguments passed to the task.
        For example:
        cfg["runner_args"]["headless_singleton"] = "--headless"
        cfg["runner_args"]["enable_cameras_singleton"] = "--enable_cameras"
        """
        assert "runner_args" in cfg, "No runner arguments specified."
        """
        Task is the desired task to train on. For example:
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"])
        """
        assert "--task" in cfg["runner_args"], "No task specified."
        """
        Hydra args define the hyperparameters varied within the sweep. For example:
        cfg["hydra_args"]["agent.params.network.cnn.activation"] = tune.choice(["relu", "elu"])
        """
        assert "hydra_args" in cfg, "No hyperparameters specified."
        self.cfg = cfg

例如,请参阅 Cartpole 示例配置。

scripts/reinforcement_learning/ray/hyperparameter_tuning/vision_cartpole_cfg.py
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import pathlib
import sys

# Allow for import of items from the ray workflow.
CUR_DIR = pathlib.Path(__file__).parent
UTIL_DIR = CUR_DIR.parent
sys.path.extend([str(UTIL_DIR), str(CUR_DIR)])
import util
import vision_cfg
from ray import tune


class CartpoleRGBNoTuneJobCfg(vision_cfg.CameraJobCfg):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"])
        super().__init__(cfg, vary_env_count=False, vary_cnn=False, vary_mlp=False)


class CartpoleRGBCNNOnlyJobCfg(vision_cfg.CameraJobCfg):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"])
        super().__init__(cfg, vary_env_count=False, vary_cnn=True, vary_mlp=False)


class CartpoleRGBJobCfg(vision_cfg.CameraJobCfg):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"])
        super().__init__(cfg, vary_env_count=True, vary_cnn=True, vary_mlp=True)


class CartpoleResNetJobCfg(vision_cfg.ResNetCameraJob):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-ResNet18-v0"])
        super().__init__(cfg)


class CartpoleTheiaJobCfg(vision_cfg.TheiaCameraJob):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"])
        super().__init__(cfg)

远程集群#

选择以下方法之一来创建 Ray 集群,以接收和执行调度的任务。

KubeRay 安装#

如果在 Google GKE 上使用 KubeRay 集群,并且使用自带集群启动文件,则还需要以下依赖项。

python3 -p -m pip install kubernetes Jinja2

用于具有 KubeRay 的 Kubernetes 集群,如 Google Kubernetes Engine 或 Amazon Elastic Kubernetes Service,kubectl 是必需的,可以通过 Kubernetes website 安装。

Google Cloud 目前是唯一经过测试的平台,尽管任何云服务提供商只要配置以下内容也应当可以使用。

注意

ray 命令应该修改为使用 Isaac python,这可以通过类似于 sed -i "1i $(echo "#!/workspace/isaaclab/_isaac_sim/python.sh")" \ /isaac-sim/kit/python/bin/ray && ln -s /isaac-sim/kit/python/bin/ray /usr/local/bin/ray 的方式来实现。

  • 一个配置了支持 Ray 的 Isaac Lab 镜像的容器注册表(NGC、GCS artifact registry、AWS ECR 等)。查看 cluster_configs/Dockerfile 了解如何修改 isaac-lab-base 容器以兼容 Ray。Ray 应该使用 isaac sim 的 python shebang,并且 nvidia-smi 应该在容器内正常工作。这里的设置需要小心,因为路径必须正确配置才能正常工作。例子的 dockerfile 很可能开箱即用,并可以推送到注册表,只要基础镜像已经按照容器指南中的方式构建完成。

  • 一个包含可用 NVIDIA RTX(可能是 l4l40tesla-t4a10 )GPU直通节点池资源的 Kubernetes 设置,能够访问您的容器注册表/存储桶,并启用了 Ray 操作符且具有正确的 IAM 权限。通过像 Google GKE 或 AWS EKS 等服务可以轻松实现,前提是您的账户或组织已被授予 GPU 预算。建议使用手动 Kubernetes 服务,而非“自动驾驶”服务进行成本效益高的实验,因为这种方式可以在不使用时完全关闭集群,尽管这可能需要安装 Nvidia GPU Operator

  • 一个 MLFlow server ,您的集群可以访问(已为 Google Cloud 包含,可以参考其格式和 MLFlow 集成)。

  • 一个 kuberay.yaml.ninja 文件,描述了如何分配资源(已经为 Google Cloud 包含,可以参考该格式和 MLFlow 集成)。

Ray 集群(不使用 Kubernetes)安装#

注意

修改 Ray 命令以像在 KubeRay 集群中一样使用 Isaac Python,并按照相同的步骤创建镜像/集群权限。

请参阅 Ray Clusters OverviewAnyscale 以获取更多信息。

当然,创建一个 MLFlow server ,让你的本地主机和集群可以访问。

KubeRay 和纯 Ray 之间的共享步骤 第 I 部分#

1.) 在本地机器上安装 Ray。

python3 -p -m pip install ray[default]==2.31.0

2.) 构建 Isaac Ray 镜像,并将其上传到你选择的容器注册表。

# Login with NGC (nvcr.io) registry first, see docker steps in repo.
python3 docker/container.py start
# Build the special Isaac Lab Ray Image
docker build -t <REGISTRY/IMAGE_NAME> -f scripts/reinforcement_learning/ray/cluster_configs/Dockerfile .
# Push the image to your registry of choice.
docker push <REGISTRY/IMAGE_NAME>

仅限KubeRay集群#

k9s 是一个很好的工具,用于监控您的集群,可以通过 snap install k9s --devmode 简单安装。

1.) 验证集群访问权限,并确保正确的操作符已安装。

# Verify cluster access
kubectl cluster-info
# If using a manually managed cluster (not Autopilot or the like)
# verify that there are node pools
kubectl get nodes
# Check that the ray operator is installed on the cluster
# should list rayclusters.ray.io , rayjobs.ray.io , and rayservices.ray.io
kubectl get crds | grep ray
# Check that the NVIDIA Driver Operator is installed on the cluster
# should list clusterpolicies.nvidia.com
kubectl get crds | grep nvidia

2.) 创建 KubeRay 集群和一个 MLFlow 服务器,用于接收您的集群可以访问的日志。这可以通过 Google GKE 自动完成,相关说明已包含在以下创建文件中。

scripts/reinforcement_learning/ray/launch.py
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse
import pathlib
import subprocess
import yaml

import util
from jinja2 import Environment, FileSystemLoader
from kubernetes import config

"""This script helps create one or more KubeRay clusters.

Usage:

.. code-block:: bash
    # If the head node is stuck on container creating, make sure to create a secret
    python3 scripts/reinforcement_learning/ray/launch.py -h

    # Examples

    # The following creates 8 GPUx1 nvidia l4 workers
    python3 scripts/reinforcement_learning/ray/launch.py --cluster_host google_cloud \
    --namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
    --num_workers 8 --num_clusters 1 --worker_accelerator nvidia-l4 --gpu_per_worker 1

    # The following creates 1 GPUx1 nvidia l4 worker, 2 GPUx2 nvidia-tesla-t4 workers,
    # and 2 GPUx4 nvidia-tesla-t4 GPU workers
    python3 scripts/reinforcement_learning/ray/launch.py --cluster_host google_cloud \
    --namespace <NAMESPACE> --image <YOUR_ISAAC_RAY_IMAGE> \
    --num_workers 1 2 --num_clusters 1 \
    --worker_accelerator nvidia-l4 nvidia-tesla-t4 --gpu_per_worker 1 2 4
"""
RAY_DIR = pathlib.Path(__file__).parent


def apply_manifest(args: argparse.Namespace) -> None:
    """Provided a Jinja templated ray.io/v1alpha1 file,
    populate the arguments and create the cluster. Additionally, create
    kubernetes containers for resources separated by '---' from the rest
    of the file.

    Args:
        args: Possible arguments concerning cluster parameters.
    """
    # Load Kubernetes configuration
    config.load_kube_config()

    # Set up Jinja2 environment for loading templates
    templates_dir = RAY_DIR / "cluster_configs" / args.cluster_host
    file_loader = FileSystemLoader(str(templates_dir))
    jinja_env = Environment(loader=file_loader, keep_trailing_newline=True, autoescape=True)

    # Define template filename
    template_file = "kuberay.yaml.jinja"

    # Convert args namespace to a dictionary
    template_params = vars(args)

    # Load and render the template
    template = jinja_env.get_template(template_file)
    file_contents = template.render(template_params)

    # Parse all YAML documents in the rendered template
    all_yamls = []
    for doc in yaml.safe_load_all(file_contents):
        all_yamls.append(doc)

    # Convert back to YAML string, preserving multiple documents
    cleaned_yaml_string = ""
    for i, doc in enumerate(all_yamls):
        if i > 0:
            cleaned_yaml_string += "\n---\n"
        cleaned_yaml_string += yaml.dump(doc)

    # Apply the Kubernetes manifest using kubectl
    try:
        print(cleaned_yaml_string)
        subprocess.run(["kubectl", "apply", "-f", "-"], input=cleaned_yaml_string, text=True, check=True)
    except subprocess.CalledProcessError as e:
        exit(f"An error occurred while running `kubectl`: {e}")


def parse_args() -> argparse.Namespace:
    """
    Parse command-line arguments for Kubernetes deployment script.

    Returns:
        argparse.Namespace: Parsed command-line arguments.
    """
    arg_parser = argparse.ArgumentParser(
        description="Script to apply manifests to create Kubernetes objects for Ray clusters.",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )

    arg_parser.add_argument(
        "--cluster_host",
        type=str,
        default="google_cloud",
        choices=["google_cloud"],
        help=(
            "In the cluster_configs directory, the name of the folder where a tune.yaml.jinja"
            "file exists defining the KubeRay config. Currently only google_cloud is supported."
        ),
    )

    arg_parser.add_argument(
        "--name",
        type=str,
        required=False,
        default="isaacray",
        help="Name of the Kubernetes deployment.",
    )

    arg_parser.add_argument(
        "--namespace",
        type=str,
        required=False,
        default="default",
        help="Kubernetes namespace to deploy the Ray cluster.",
    )

    arg_parser.add_argument(
        "--service_acount_name", type=str, required=False, default="default", help="The service account name to use."
    )

    arg_parser.add_argument(
        "--image",
        type=str,
        required=True,
        help="Docker image for the Ray cluster pods.",
    )

    arg_parser.add_argument(
        "--worker_accelerator",
        nargs="+",
        type=str,
        default=["nvidia-l4"],
        help="GPU accelerator name. Supply more than one for heterogeneous resources.",
    )

    arg_parser = util.add_resource_arguments(arg_parser, cluster_create_defaults=True)

    arg_parser.add_argument(
        "--num_clusters",
        type=int,
        default=1,
        help="How many Ray Clusters to create.",
    )
    arg_parser.add_argument(
        "--num_head_cpu",
        type=float,  # to be able to schedule partial CPU heads
        default=8,
        help="The number of CPUs to give the Ray head.",
    )

    arg_parser.add_argument("--head_ram_gb", type=int, default=8, help="How many gigs of ram to give the Ray head")
    args = arg_parser.parse_args()
    return util.fill_in_missing_resources(args, cluster_creation_flag=True)


def main():
    args = parse_args()

    if "head" in args.name:
        raise ValueError("For compatibility with other scripts, do not include head in the name")
    if args.num_clusters == 1:
        apply_manifest(args)
    else:
        default_name = args.name
        for i in range(args.num_clusters):
            args.name = default_name + "-" + str(i)
            apply_manifest(args)


if __name__ == "__main__":
    main()

对于其他云服务,kuberay.yaml.ninja 将类似于 Google 的。

scripts/reinforcement_learning/ray/cluster_configs/google_cloud/kuberay.yaml.ninja
# Jinja is used for templating here as full helm setup is excessive for application
apiVersion: ray.io/v1alpha1
kind: RayCluster
metadata:
  name: {{ name }}
  namespace: {{ namespace }}
spec:
  rayVersion: "2.8.0"
  enableInTreeAutoscaling: true
  autoscalerOptions:
    upscalingMode: Default
    idleTimeoutSeconds: 120
    imagePullPolicy: Always
    securityContext: {}
    envFrom: []

  headGroupSpec:
    rayStartParams:
      block: "true"
      dashboard-host: 0.0.0.0
      dashboard-port: "8265"
      port: "6379"
      include-dashboard: "true"
      ray-debugger-external: "true"
      object-manager-port: "8076"
      num-gpus: "0"
      num-cpus: "0" # prevent scheduling jobs to the head node - workers only
    headService:
      apiVersion: v1
      kind: Service
      metadata:
        name: {{ name }}-head
      spec:
        type: LoadBalancer
    template:
      metadata:
        labels:
          app.kubernetes.io/instance: tuner
          app.kubernetes.io/name: kuberay
          cloud.google.com/gke-ray-node-type: head
      spec:
        serviceAccountName: {{ service_account_name }}
        affinity: {}
        securityContext:
          fsGroup: 100
        containers:
          - env:
            image: {{ image }}
            imagePullPolicy: Always
            name: head
            resources:
              limits:
                cpu: "{{ num_head_cpu }}"
                memory: {{ head_ram_gb }}G
                nvidia.com/gpu: "0"
              requests:
                cpu: "{{ num_head_cpu }}"
                memory: {{ head_ram_gb }}G
                nvidia.com/gpu: "0"
            securityContext: {}
            volumeMounts:
              - mountPath: /tmp/ray
                name: ray-logs
            command: ["/bin/bash", "-c", "ray start --head --port=6379 --object-manager-port=8076 --dashboard-host=0.0.0.0 --dashboard-port=8265 --include-dashboard=true && tail -f /dev/null"]
          - image: fluent/fluent-bit:1.9.6
            name: fluentbit
            resources:
              limits:
                cpu: 100m
                memory: 128Mi
              requests:
                cpu: 100m
                memory: 128Mi
            volumeMounts:
              - mountPath: /tmp/ray
                name: ray-logs
        imagePullSecrets: []
        nodeSelector:
          iam.gke.io/gke-metadata-server-enabled: "true"
        volumes:
          - configMap:
              name: fluentbit-config
            name: fluentbit-config
          - name: ray-logs
            emptyDir: {}

  workerGroupSpecs:
    {% for it in range(gpu_per_worker|length) %}
    - groupName: "{{ worker_accelerator[it] }}x{{ gpu_per_worker[it] }}-cpu-{{ cpu_per_worker[it] }}-ram-gb-{{ ram_gb_per_worker[it] }}"
      replicas: {{ num_workers[it] }}
      maxReplicas: {{ num_workers[it] }}
      minReplicas: {{ num_workers[it] }}
      rayStartParams:
        block: "true"
        ray-debugger-external: "true"
        replicas: "{{num_workers[it]}}"
      template:
        metadata:
          annotations: {}
          labels:
            app.kubernetes.io/instance: tuner
            app.kubernetes.io/name: kuberay
            cloud.google.com/gke-ray-node-type: worker
        spec:
          serviceAccountName: {{ service_account_name }}
          affinity: {}
          securityContext:
            fsGroup: 100
          containers:
            - env:
              - name: NVIDIA_VISIBLE_DEVICES
                value: "all"
              - name: NVIDIA_DRIVER_CAPABILITIES
                value: "compute,utility"

              image: {{ image }}
              imagePullPolicy: Always
              name: ray-worker
              resources:
                limits:
                  cpu: "{{ cpu_per_worker[it] }}"
                  memory: {{ ram_gb_per_worker[it] }}G
                  nvidia.com/gpu: "{{ gpu_per_worker[it] }}"
                requests:
                  cpu: "{{ cpu_per_worker[it] }}"
                  memory: {{ ram_gb_per_worker[it] }}G
                  nvidia.com/gpu: "{{ gpu_per_worker[it] }}"
              securityContext: {}
              volumeMounts:
                - mountPath: /tmp/ray
                  name: ray-logs
              command: ["/bin/bash", "-c", "ray start --address={{name}}-head.{{ namespace }}.svc.cluster.local:6379 && tail -f /dev/null"]
            - image: fluent/fluent-bit:1.9.6
              name: fluentbit
              resources:
                limits:
                  cpu: 100m
                  memory: 128Mi
                requests:
                  cpu: 100m
                  memory: 128Mi
              volumeMounts:
                - mountPath: /tmp/ray
                  name: ray-logs

          imagePullSecrets: []
          nodeSelector:
            cloud.google.com/gke-accelerator: {{ worker_accelerator[it] }}
            iam.gke.io/gke-metadata-server-enabled: "true"
          tolerations:
            - key: "nvidia.com/gpu"
              operator: "Exists"
              effect: "NoSchedule"
          volumes:
            - configMap:
                name: fluentbit-config
              name: fluentbit-config
            - name: ray-logs
              emptyDir: {}
    {% endfor %}

---
# ML Flow Server - for fetching logs
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{name}}-mlflow
  namespace: {{ namespace }}
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mlflow
  template:
    metadata:
      labels:
        app: mlflow
    spec:
      containers:
      - name: mlflow
        image: ghcr.io/mlflow/mlflow:v2.9.2
        ports:
        - containerPort: 5000
        command: ["mlflow"]
        args:
        - server
        - --host=0.0.0.0
        - --port=5000
        - --backend-store-uri=sqlite:///mlflow.db
---
# ML Flow Service (for port forwarding, kubectl port-forward service/{name}-mlflow 5000:5000)
apiVersion: v1
kind: Service
metadata:
  name: {{name}}-mlflow
  namespace: {{ namespace }}
spec:
  selector:
    app: mlflow
  ports:
  - port: 5000
    targetPort: 5000
  type: ClusterIP

3.) 获取 KubeRay 集群的 IP 地址以及 MLFlow 服务器的 IP。对于 KubeRay 集群,可以自动执行此操作,相关说明已包含在以下获取文件中。KubeRay 集群会被保存到文件中,但 MLFlow 服务器 IP 会被打印出来。

scripts/reinforcement_learning/ray/grok_cluster_with_kubectl.py
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse
import os
import re
import subprocess
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

"""
This script requires that kubectl is installed and KubeRay was used to create the cluster.

Creates a config file containing ``name: <NAME> address: http://<IP>:<PORT>`` on
a new line for each cluster, and also fetches the MLFlow URI.

Usage:

.. code-block:: bash

    python3 scripts/reinforcement_learning/ray/grok_cluster_with_kubectl.py
    # For options, supply -h arg
"""


def get_namespace() -> str:
    """Get the current Kubernetes namespace from the context, fallback to default if not set"""
    try:
        namespace = (
            subprocess.check_output(["kubectl", "config", "view", "--minify", "--output", "jsonpath={..namespace}"])
            .decode()
            .strip()
        )
        if not namespace:
            namespace = "default"
    except subprocess.CalledProcessError:
        namespace = "default"
    return namespace


def get_pods(namespace: str = "default") -> list[tuple]:
    """Get a list of all of the pods in the namespace"""
    cmd = ["kubectl", "get", "pods", "-n", namespace, "--no-headers"]
    output = subprocess.check_output(cmd).decode()
    pods = []
    for line in output.strip().split("\n"):
        fields = line.split()
        pod_name = fields[0]
        status = fields[2]
        pods.append((pod_name, status))
    return pods


def get_clusters(pods: list, cluster_name_prefix: str) -> set:
    """
    Get unique cluster name(s). Works for one or more clusters, based off of the number of head nodes.
    Excludes MLflow deployments.
    """
    clusters = set()
    for pod_name, _ in pods:
        # Skip MLflow pods
        if "-mlflow" in pod_name:
            continue

        match = re.match(r"(" + re.escape(cluster_name_prefix) + r"[-\w]+)", pod_name)
        if match:
            # Get base name without head/worker suffix (skip workers)
            if "head" in pod_name:
                base_name = match.group(1).split("-head")[0]
                clusters.add(base_name)
    return sorted(clusters)


def get_mlflow_info(namespace: str = None, cluster_prefix: str = "isaacray") -> str:
    """
    Get MLflow service information if it exists in the namespace with the given prefix.
    Only works for a single cluster instance.
    Args:
        namespace: Kubernetes namespace
        cluster_prefix: Base cluster name (without -head/-worker suffixes)
    Returns:
        MLflow service URL
    """
    # Strip any -head or -worker suffixes to get base name
    if namespace is None:
        namespace = get_namespace()
    pods = get_pods(namespace=namespace)
    clusters = get_clusters(pods=pods, cluster_name_prefix=cluster_prefix)
    if len(clusters) > 1:
        raise ValueError("More than one cluster matches prefix, could not automatically determine mlflow info.")
    mlflow_name = f"{cluster_prefix}-mlflow"

    cmd = ["kubectl", "get", "svc", mlflow_name, "-n", namespace, "--no-headers"]
    try:
        output = subprocess.check_output(cmd).decode()
        fields = output.strip().split()

        # Get cluster IP
        cluster_ip = fields[2]
        port = "5000"  # Default MLflow port
        # This needs to be http to be resolved. HTTPS can't be resolved
        # This should be fine as it is on a subnet on the cluster regardless
        return f"http://{cluster_ip}:{port}"
    except subprocess.CalledProcessError as e:
        raise ValueError(f"Could not grok MLflow: {e}")  # Fixed f-string


def check_clusters_running(pods: list, clusters: set) -> bool:
    """
    Check that all of the pods in all provided clusters are running.

    Args:
        pods (list): A list of tuples where each tuple contains the pod name and its status.
        clusters (set): A set of cluster names to check.

    Returns:
        bool: True if all pods in any of the clusters are running, False otherwise.
    """
    clusters_running = False
    for cluster in clusters:
        cluster_pods = [p for p in pods if p[0].startswith(cluster)]
        total_pods = len(cluster_pods)
        running_pods = len([p for p in cluster_pods if p[1] == "Running"])
        if running_pods == total_pods and running_pods > 0:
            clusters_running = True
            break
    return clusters_running


def get_ray_address(head_pod: str, namespace: str = "default", ray_head_name: str = "head") -> str:
    """
    Given a cluster head pod, check its logs, which should include the ray address which can accept job requests.

    Args:
        head_pod (str): The name of the head pod.
        namespace (str, optional): The Kubernetes namespace. Defaults to "default".
        ray_head_name (str, optional): The name of the ray head container. Defaults to "head".

    Returns:
        str: The ray address if found, None otherwise.

    Raises:
        ValueError: If the logs cannot be retrieved or the ray address is not found.
    """
    cmd = ["kubectl", "logs", head_pod, "-c", ray_head_name, "-n", namespace]
    try:
        output = subprocess.check_output(cmd).decode()
    except subprocess.CalledProcessError as e:
        raise ValueError(
            f"Could not enter head container with cmd {cmd}: {e}Perhaps try a different namespace or ray head name."
        )
    match = re.search(r"RAY_ADDRESS='([^']+)'", output)
    if match:
        return match.group(1)
    else:
        return None


def process_cluster(cluster_info: dict, ray_head_name: str = "head") -> str:
    """
    For each cluster, check that it is running, and get the Ray head address that will accept jobs.

    Args:
        cluster_info (dict): A dictionary containing cluster information with keys 'cluster', 'pods', and 'namespace'.
        ray_head_name (str, optional): The name of the ray head container. Defaults to "head".

    Returns:
        str: A string containing the cluster name and its Ray head address, or an error message if the head pod or Ray address is not found.
    """
    cluster, pods, namespace = cluster_info
    head_pod = None
    for pod_name, status in pods:
        if pod_name.startswith(cluster + "-head"):
            head_pod = pod_name
            break
    if not head_pod:
        return f"Error: Could not find head pod for cluster {cluster}\n"

    # Get RAY_ADDRESS and status
    ray_address = get_ray_address(head_pod, namespace=namespace, ray_head_name=ray_head_name)
    if not ray_address:
        return f"Error: Could not find RAY_ADDRESS for cluster {cluster}\n"

    # Return only cluster and ray address
    return f"name: {cluster} address: {ray_address}\n"


def main():
    # Parse command-line arguments
    parser = argparse.ArgumentParser(description="Process Ray clusters and save their specifications.")
    parser.add_argument("--prefix", default="isaacray", help="The prefix for the cluster names.")
    parser.add_argument("--output", default="~/.cluster_config", help="The file to save cluster specifications.")
    parser.add_argument("--ray_head_name", default="head", help="The metadata name for the ray head container")
    parser.add_argument(
        "--namespace", help="Kubernetes namespace to use. If not provided, will detect from current context."
    )
    args = parser.parse_args()

    # Get namespace from args or detect it
    current_namespace = args.namespace if args.namespace else get_namespace()
    print(f"Using namespace: {current_namespace}")

    cluster_name_prefix = args.prefix
    cluster_spec_file = os.path.expanduser(args.output)

    # Get all pods
    pods = get_pods(namespace=current_namespace)

    # Get clusters
    clusters = get_clusters(pods, cluster_name_prefix)
    if not clusters:
        print(f"No clusters found with prefix {cluster_name_prefix}")
        return

    # Wait for clusters to be running
    while True:
        pods = get_pods(namespace=current_namespace)
        if check_clusters_running(pods, clusters):
            break
        print("Waiting for all clusters to spin up...")
        time.sleep(5)

    print("Checking for MLflow:")
    # Check MLflow status for each cluster
    for cluster in clusters:
        try:
            mlflow_address = get_mlflow_info(current_namespace, cluster)
            print(f"MLflow address for {cluster}: {mlflow_address}")
        except ValueError as e:
            print(f"ML Flow not located: {e}")
    print()

    # Prepare cluster info for parallel processing
    cluster_infos = []
    for cluster in clusters:
        cluster_pods = [p for p in pods if p[0].startswith(cluster)]
        cluster_infos.append((cluster, cluster_pods, current_namespace))

    # Use ThreadPoolExecutor to process clusters in parallel
    results = []
    results_lock = threading.Lock()

    with ThreadPoolExecutor() as executor:
        future_to_cluster = {
            executor.submit(process_cluster, info, args.ray_head_name): info[0] for info in cluster_infos
        }
        for future in as_completed(future_to_cluster):
            cluster_name = future_to_cluster[future]
            try:
                result = future.result()
                with results_lock:
                    results.append(result)
            except Exception as exc:
                print(f"{cluster_name} generated an exception: {exc}")

    # Sort results alphabetically by cluster name
    results.sort()

    # Write sorted results to the output file (Ray info only)
    with open(cluster_spec_file, "w") as f:
        for result in results:
            f.write(result)

    print(f"Cluster spec information saved to {cluster_spec_file}")
    # Display the contents of the config file
    with open(cluster_spec_file) as f:
        print(f.read())


if __name__ == "__main__":
    main()

仅限Ray Clusters(不含Kubernetes)#

1.) 验证集群访问权限。

2.) 创建 ~/.cluster_config 文件,其中 name: <NAME> address: http://<IP>:<PORT> 每个唯一集群在新的一行中。对于一个集群,文件中应该只有一行。

3.) 启动 MLFlow 服务器来接收 Ray 集群访问的日志,并确定服务器 URI。

KubeRay与Pure Ray共享调度步骤 第 II 部分#

1.) 测试您的集群是否正常运行,方法如下。

# Test that NVIDIA GPUs are visible and that Ray is operation with the following command:
python3 scripts/reinforcement_learning/ray/submit_job.py --aggregate_jobs wrap_resources.py --test

2.) 提交调优和/或资源包装作业在 submit_job.py 文件中有描述。

脚本 reinforcement_learning/ray/submit_job.py
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import argparse
import os
import time
from concurrent.futures import ThreadPoolExecutor

from ray import job_submission

"""
This script submits aggregate job(s) to cluster(s) described in a
config file containing ``name: <NAME> address: http://<IP>:<PORT>`` on
a new line for each cluster. For KubeRay clusters, this file
can be automatically created with :file:`grok_cluster_with_kubectl.py`

Aggregate job(s) are matched with cluster(s) via the following relation:
cluster_line_index_submitted_to = job_index % total_cluster_count

Aggregate jobs are separated by the * delimiter. The ``--aggregate_jobs`` argument must be
the last argument supplied to the script.

An aggregate job could be a :file:`../tuner.py` tuning job, which automatically
creates several individual jobs when started on a cluster. Alternatively, an aggregate job
could be a :file:'../wrap_resources.py` resource-wrapped job,
which may contain several individual sub-jobs separated by
the + delimiter.

If there are more aggregate jobs than cluster(s), aggregate jobs will be submitted
as clusters become available via the defined relation above. If there are less aggregate job(s)
than clusters, some clusters will not receive aggregate job(s). The maximum number of
aggregate jobs that can be run simultaneously is equal to the number of workers created by
default by a ThreadPoolExecutor on the machine submitting jobs due to fetching the log output after
jobs finish, which is unlikely to constrain overall-job submission.

Usage:

.. code-block:: bash

    # Example; submitting a tuning job
    python3 scripts/reinforcement_learning/ray/submit_job.py \
    --aggregate_jobs /workspace/isaaclab/scripts/reinforcement_learning/ray/tuner.py \
        --cfg_file hyperparameter_tuning/vision_cartpole_cfg.py \
        --cfg_class CartpoleTheiaJobCfg --mlflow_uri <ML_FLOW_URI>

    # Example: Submitting resource wrapped job
    python3 scripts/reinforcement_learning/ray/submit_job.py --aggregate_jobs wrap_resources.py --test

    # For all command line arguments
    python3 scripts/reinforcement_learning/ray/submit_job.py -h
"""
script_directory = os.path.dirname(os.path.abspath(__file__))
CONFIG = {"working_dir": script_directory, "executable": "/workspace/isaaclab/isaaclab.sh -p"}


def read_cluster_spec(fn: str | None = None) -> list[dict]:
    if fn is None:
        cluster_spec_path = os.path.expanduser("~/.cluster_config")
    else:
        cluster_spec_path = os.path.expanduser(fn)

    if not os.path.exists(cluster_spec_path):
        raise FileNotFoundError(f"Cluster spec file not found at {cluster_spec_path}")

    clusters = []
    with open(cluster_spec_path) as f:
        for line in f:
            parts = line.strip().split(" ")
            http_address = parts[3]
            cluster_info = {"name": parts[1], "address": http_address}
            print(f"[INFO] Setting {cluster_info['name']}")  # with {cluster_info['num_gpu']} GPUs.")
            clusters.append(cluster_info)

    return clusters


def submit_job(cluster: dict, job_command: str) -> None:
    """
    Submits a job to a single cluster, prints the final result and Ray dashboard URL at the end.
    """
    address = cluster["address"]
    cluster_name = cluster["name"]
    print(f"[INFO]: Submitting job to cluster '{cluster_name}' at {address}")  # with {num_gpus} GPUs.")
    client = job_submission.JobSubmissionClient(address)
    runtime_env = {"working_dir": CONFIG["working_dir"], "executable": CONFIG["executable"]}
    print(f"[INFO]: Checking contents of the directory: {CONFIG['working_dir']}")
    try:
        dir_contents = os.listdir(CONFIG["working_dir"])
        print(f"[INFO]: Directory contents: {dir_contents}")
    except Exception as e:
        print(f"[INFO]: Failed to list directory contents: {str(e)}")
    entrypoint = f"{CONFIG['executable']} {job_command}"
    print(f"[INFO]: Attempting entrypoint {entrypoint=} in cluster {cluster}")
    job_id = client.submit_job(entrypoint=entrypoint, runtime_env=runtime_env)
    status = client.get_job_status(job_id)
    while status in [job_submission.JobStatus.PENDING, job_submission.JobStatus.RUNNING]:
        time.sleep(5)
        status = client.get_job_status(job_id)

    final_logs = client.get_job_logs(job_id)
    print("----------------------------------------------------")
    print(f"[INFO]: Cluster {cluster_name} Logs: \n")
    print(final_logs)
    print("----------------------------------------------------")


def submit_jobs_to_clusters(jobs: list[str], clusters: list[dict]) -> None:
    """
    Submit all jobs to their respective clusters, cycling through clusters if there are more jobs than clusters.
    """
    if not clusters:
        raise ValueError("No clusters available for job submission.")

    if len(jobs) < len(clusters):
        print("[INFO]: Less jobs than clusters, some clusters will not receive jobs")
    elif len(jobs) == len(clusters):
        print("[INFO]: Exactly one job per cluster")
    else:
        print("[INFO]: More jobs than clusters, jobs submitted as clusters become available.")
    with ThreadPoolExecutor() as executor:
        for idx, job_command in enumerate(jobs):
            # Cycle through clusters using modulus to wrap around if there are more jobs than clusters
            cluster = clusters[idx % len(clusters)]
            executor.submit(submit_job, cluster, job_command)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Submit multiple GPU jobs to multiple Ray clusters.")
    parser.add_argument("--config_file", default="~/.cluster_config", help="The cluster config path.")
    parser.add_argument(
        "--aggregate_jobs",
        type=str,
        nargs=argparse.REMAINDER,
        help="This should be last argument. The aggregate jobs to submit separated by the * delimiter.",
    )
    args = parser.parse_args()
    if args.aggregate_jobs is not None:
        jobs = " ".join(args.aggregate_jobs)
        formatted_jobs = jobs.split("*")
        if len(formatted_jobs) > 1:
            print("Warning; Split jobs by cluster with the * delimiter")
    else:
        formatted_jobs = []
    print(f"[INFO]: Isaac Ray Wrapper received jobs {formatted_jobs=}")
    clusters = read_cluster_spec(args.config_file)
    submit_jobs_to_clusters(formatted_jobs, clusters)

3.) 对于调优任务,指定调优任务 / 超参数搜索作为 JobCfg 。由于环境入口点和 Hydra 参数的差异,所包含的 JobCfg 仅支持 rl_games 工作流,尽管如果提供兼容的 JobCfg ,其他工作流也可以正常运行。

scripts/reinforcement_learning/ray/tuner.py (JobCfg 定义)
class JobCfg:
    """To be compatible with :meth: invoke_tuning_run and :class:IsaacLabTuneTrainable,
    at a minimum, the tune job should inherit from this class."""

    def __init__(self, cfg: dict):
        """
        Runner args include command line arguments passed to the task.
        For example:
        cfg["runner_args"]["headless_singleton"] = "--headless"
        cfg["runner_args"]["enable_cameras_singleton"] = "--enable_cameras"
        """
        assert "runner_args" in cfg, "No runner arguments specified."
        """
        Task is the desired task to train on. For example:
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"])
        """
        assert "--task" in cfg["runner_args"], "No task specified."
        """
        Hydra args define the hyperparameters varied within the sweep. For example:
        cfg["hydra_args"]["agent.params.network.cnn.activation"] = tune.choice(["relu", "elu"])
        """
        assert "hydra_args" in cfg, "No hyperparameters specified."
        self.cfg = cfg

例如,请参阅 Cartpole 示例配置。

scripts/reinforcement_learning/ray/hyperparameter_tuning/vision_cartpole_cfg.py
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
import pathlib
import sys

# Allow for import of items from the ray workflow.
CUR_DIR = pathlib.Path(__file__).parent
UTIL_DIR = CUR_DIR.parent
sys.path.extend([str(UTIL_DIR), str(CUR_DIR)])
import util
import vision_cfg
from ray import tune


class CartpoleRGBNoTuneJobCfg(vision_cfg.CameraJobCfg):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"])
        super().__init__(cfg, vary_env_count=False, vary_cnn=False, vary_mlp=False)


class CartpoleRGBCNNOnlyJobCfg(vision_cfg.CameraJobCfg):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"])
        super().__init__(cfg, vary_env_count=False, vary_cnn=True, vary_mlp=False)


class CartpoleRGBJobCfg(vision_cfg.CameraJobCfg):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-v0"])
        super().__init__(cfg, vary_env_count=True, vary_cnn=True, vary_mlp=True)


class CartpoleResNetJobCfg(vision_cfg.ResNetCameraJob):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-ResNet18-v0"])
        super().__init__(cfg)


class CartpoleTheiaJobCfg(vision_cfg.TheiaCameraJob):
    def __init__(self, cfg: dict = {}):
        cfg = util.populate_isaac_ray_cfg_args(cfg)
        cfg["runner_args"]["--task"] = tune.choice(["Isaac-Cartpole-RGB-TheiaTiny-v0"])
        super().__init__(cfg)

要查看调优结果,请查看您创建的服务器的 MLFlow 仪表板。对于 KubeRay,可以通过端口转发 MLFlow 仪表板来实现,方法如下:

kubectl port-forward service/isaacray-mlflow 5000:5000

然后在浏览器中访问以下地址。

localhost:5000

如果如上所述转发了 MLFlow 端口,则可以使用以下命令将其转换为 TensorBoard 日志。

./isaaclab.sh -p scripts/reinforcement_learning/ray/mlflow_to_local_tensorboard.py \ --uri http://localhost:5000 --experiment-name IsaacRay-<CLASS_JOB_CFG>-tune --download-dir test

Kubernetes集群清理#

为了节省资源,并可能为其他人在共享计算平台上使用宝贵的 GPU 资源,请在使用后销毁 Ray 集群。它们可以轻松地重新创建!对于 KubeRay 集群,可以按照以下方法进行。

kubectl get raycluster | egrep 'isaacray' | awk '{print $1}' | xargs kubectl delete raycluster &&
kubectl get deployments | egrep 'mlflow' | awk '{print $1}' | xargs kubectl delete deployment &&
kubectl get services | egrep 'mlflow' | awk '{print $1}' | xargs kubectl delete service &&
kubectl get services | egrep 'isaacray' | awk '{print $1}' | xargs kubectl delete service