Curriculum 工具#
本指南介绍了常见 curriculum 帮助函数和 terms ,可用于在 Isaac Lab 中为 RL 环境创建灵活的 curriculum。这些实用程序可以传递给 CurriculumTermCfg
对象,以在训练过程中动态修改奖励权重和环境参数。
备注
本指南涵盖了三个实用程序: - 简单功能修改奖励 modify_reward_weight - 修改任何环境参数的参数 modify_env_param - 修改 term_cfg 的参数 modify_term_cfg
curriculum工具的完整源代码
# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
"""Common functions that can be used to create curriculum for the learning environment.
The functions can be passed to the :class:`isaaclab.managers.CurriculumTermCfg` object to enable
the curriculum introduced by the function.
"""
from __future__ import annotations
import re
from collections.abc import Sequence
from typing import TYPE_CHECKING
from isaaclab.managers import ManagerTermBase
if TYPE_CHECKING:
from isaaclab.envs import ManagerBasedRLEnv
def modify_reward_weight(env: ManagerBasedRLEnv, env_ids: Sequence[int], term_name: str, weight: float, num_steps: int):
"""Curriculum that modifies a reward weight a given number of steps.
Args:
env: The learning environment.
env_ids: Not used since all environments are affected.
term_name: The name of the reward term.
weight: The weight of the reward term.
num_steps: The number of steps after which the change should be applied.
"""
if env.common_step_counter > num_steps:
# obtain term settings
term_cfg = env.reward_manager.get_term_cfg(term_name)
# update term settings
term_cfg.weight = weight
env.reward_manager.set_term_cfg(term_name, term_cfg)
class modify_env_param(ManagerTermBase):
"""Curriculum term for dynamically modifying a single environment parameter at runtime.
This term compiles getter/setter accessors for a target attribute (specified by
`cfg.params["address"]`) the first time it is called, then on each invocation
reads the current value, applies a user-provided `modify_fn`, and writes back
the result. Since None in this case can sometime be desirable value to write, we
use token, NO_CHANGE, as non-modification signal to this class, see usage below.
Usage:
.. code-block:: python
def resample_bucket_range(
env, env_id, data, static_friction_range, dynamic_friction_range, restitution_range, num_steps
):
if env.common_step_counter > num_steps:
range_list = [static_friction_range, dynamic_friction_range, restitution_range]
ranges = torch.tensor(range_list, device="cpu")
new_buckets = math_utils.sample_uniform(ranges[:, 0], ranges[:, 1], (len(data), 3), device="cpu")
return new_buckets
return mdp.modify_env_param.NO_CHANGE
object_physics_material_curriculum = CurrTerm(
func=mdp.modify_env_param,
params={
"address": "event_manager.cfg.object_physics_material.func.material_buckets",
"modify_fn": resample_bucket_range,
"modify_params": {
"static_friction_range": [.5, 1.],
"dynamic_friction_range": [.3, 1.],
"restitution_range": [0.0, 0.5],
"num_step": 120000
}
}
)
"""
NO_CHANGE = object()
def __init__(self, cfg, env):
"""
Initialize the ModifyEnvParam term.
Args:
cfg: A CurriculumTermCfg whose `params` dict must contain:
- "address" (str): dotted path into the env where the parameter lives.
env: The ManagerBasedRLEnv instance this term will act upon.
"""
super().__init__(cfg, env)
self._INDEX_RE = re.compile(r"^(\w+)\[(\d+)\]$")
self.get_fn: callable = None
self.set_fn: callable = None
self.address: str = self.cfg.params.get("address")
def __call__(
self,
env: ManagerBasedRLEnv,
env_ids: Sequence[int],
address: str,
modify_fn: callable,
modify_params: dict = {},
):
"""
Apply one curriculum step to the target parameter.
On the first call, compiles and caches the getter and setter accessors.
Then, retrieves the current value, passes it through `modify_fn`, and
writes back the new value.
Args:
env: The learning environment.
env_ids: Sub-environment indices (unused by default).
address: dotted path of the value retrieved from env.
modify_fn: Function signature `fn(env, env_ids, old_value, **modify_params) -> new_value`.
modify_params: Extra keyword arguments for `modify_fn`.
"""
if not self.get_fn:
self.get_fn, self.set_fn = self._compile_accessors(self._env, self.address)
data = self.get_fn()
new_val = modify_fn(self._env, env_ids, data, **modify_params)
if new_val is not self.NO_CHANGE: # if the modify_fn return NO_CHANGE signal, do not invoke self.set_fn
self.set_fn(new_val)
def _compile_accessors(self, root, path: str):
"""
Build and return (getter, setter) functions for a dotted attribute path.
Supports nested attributes, dict keys, and sequence indexing via "name[idx]".
Args:
root: Base object (usually `self._env`) from which to resolve `path`.
path: Dotted path string, e.g. "foo.bar[2].baz".
Returns:
tuple:
- getter: () -> current value
- setter: (new_value) -> None (writes new_value back into the object)
"""
# Turn "a.b[2].c" into ["a", ("b",2), "c"] and store in parts
parts = []
for part in path.split("."):
m = self._INDEX_RE.match(part)
if m:
parts.append((m.group(1), int(m.group(2))))
else:
parts.append(part)
cur = root
for p in parts[:-1]:
if isinstance(p, tuple):
name, idx = p
container = cur[name] if isinstance(cur, dict) else getattr(cur, name)
cur = container[idx]
else:
cur = cur[p] if isinstance(cur, dict) else getattr(cur, p)
self.container = cur
self.last = parts[-1]
# build the getter and setter
if isinstance(self.container, tuple):
getter = lambda: self.container[self.last] # noqa: E731
def setter(val):
tuple_list = list(self.container)
tuple_list[self.last] = val
self.container = tuple(tuple_list)
elif isinstance(self.container, (list, dict)):
getter = lambda: self.container[self.last] # noqa: E731
def setter(val):
self.container[self.last] = val
elif isinstance(self.container, object):
getter = lambda: getattr(self.container, self.last) # noqa: E731
def setter(val):
setattr(self.container, self.last, val)
else:
raise TypeError(f"getter does not recognize the type {type(self.container)}")
return getter, setter
class modify_term_cfg(modify_env_param):
"""Subclass of ModifyEnvParam that maps a simplified 's.'-style address
to the full manager path. This is a more natural style for writing configurations
Reads `cfg.params["address"]`, replaces only the first occurrence of "s."
with "_manager.cfg.", and then behaves identically to ModifyEnvParam.
for example: command_manager.cfg.object_pose.ranges.xpos -> commands.object_pose.ranges.xpos
Usage:
.. code-block:: python
def override_value(env, env_ids, data, value, num_steps):
if env.common_step_counter > num_steps:
return value
return mdp.modify_term_cfg.NO_CHANGE
command_object_pose_xrange_adr = CurrTerm(
func=mdp.modify_term_cfg,
params={
"address": "commands.object_pose.ranges.pos_x", # note that `_manager.cfg` is omitted
"modify_fn": override_value,
"modify_params": {"value": (-.75, -.25), "num_steps": 12000}
}
)
"""
def __init__(self, cfg, env):
"""
Initialize the ModifyTermCfg term.
Args:
cfg: A CurriculumTermCfg whose `params["address"]` is a simplified
path using "s." as separator, e.g. instead of write "observation_manager.cfg", writes "observations".
env: The ManagerBasedRLEnv instance this term will act upon.
"""
super().__init__(cfg, env)
input_address: str = self.cfg.params.get("address")
self.address = input_address.replace("s.", "_manager.cfg.", 1)
修改奖励权重#
函数 modify_reward_weight()
在指定的模拟步数后更新奖励项的权重。这可以直接作为 CurriculumTermCfg
中的 func
传递。
def modify_reward_weight(env: ManagerBasedRLEnv, env_ids: Sequence[int], term_name: str, weight: float, num_steps: int):
"""Curriculum that modifies a reward weight a given number of steps.
Args:
env: The learning environment.
env_ids: Not used since all environments are affected.
term_name: The name of the reward term.
weight: The weight of the reward term.
num_steps: The number of steps after which the change should be applied.
"""
if env.common_step_counter > num_steps:
# obtain term settings
term_cfg = env.reward_manager.get_term_cfg(term_name)
# update term settings
term_cfg.weight = weight
env.reward_manager.set_term_cfg(term_name, term_cfg)
使用示例:
from isaaclab.managers import CurriculumTermCfg
import isaaclab.managers.mdp as mdp
# After 100k steps, set the "sparse_reward" term weight to 0.5
sparse_reward_schedule = CurriculumTermCfg(
func=mdp.modify_reward_weight,
params={
"term_name": "sparse_reward",
"weight": 0.5,
"num_steps": 100_000,
}
)
动态修改环境参数#
modify_env_param
是一个 ManagerTermBase
的子类,允许您定位环境中的任何点属性路径,并应用用户提供的函数以在运行时计算新值。它处理嵌套属性、字典键、列表或元组索引,并在不需要更新时尊重 NO_CHANGE
标志。
class modify_env_param(ManagerTermBase):
"""Curriculum term for dynamically modifying a single environment parameter at runtime.
This term compiles getter/setter accessors for a target attribute (specified by
`cfg.params["address"]`) the first time it is called, then on each invocation
reads the current value, applies a user-provided `modify_fn`, and writes back
the result. Since None in this case can sometime be desirable value to write, we
use token, NO_CHANGE, as non-modification signal to this class, see usage below.
Usage:
.. code-block:: python
def resample_bucket_range(
env, env_id, data, static_friction_range, dynamic_friction_range, restitution_range, num_steps
):
if env.common_step_counter > num_steps:
range_list = [static_friction_range, dynamic_friction_range, restitution_range]
ranges = torch.tensor(range_list, device="cpu")
new_buckets = math_utils.sample_uniform(ranges[:, 0], ranges[:, 1], (len(data), 3), device="cpu")
return new_buckets
return mdp.modify_env_param.NO_CHANGE
object_physics_material_curriculum = CurrTerm(
func=mdp.modify_env_param,
params={
"address": "event_manager.cfg.object_physics_material.func.material_buckets",
"modify_fn": resample_bucket_range,
"modify_params": {
"static_friction_range": [.5, 1.],
"dynamic_friction_range": [.3, 1.],
"restitution_range": [0.0, 0.5],
"num_step": 120000
}
}
)
"""
NO_CHANGE = object()
def __init__(self, cfg, env):
"""
Initialize the ModifyEnvParam term.
Args:
cfg: A CurriculumTermCfg whose `params` dict must contain:
- "address" (str): dotted path into the env where the parameter lives.
env: The ManagerBasedRLEnv instance this term will act upon.
"""
super().__init__(cfg, env)
self._INDEX_RE = re.compile(r"^(\w+)\[(\d+)\]$")
self.get_fn: callable = None
self.set_fn: callable = None
self.address: str = self.cfg.params.get("address")
def __call__(
self,
env: ManagerBasedRLEnv,
env_ids: Sequence[int],
address: str,
modify_fn: callable,
modify_params: dict = {},
):
"""
Apply one curriculum step to the target parameter.
On the first call, compiles and caches the getter and setter accessors.
Then, retrieves the current value, passes it through `modify_fn`, and
writes back the new value.
Args:
env: The learning environment.
env_ids: Sub-environment indices (unused by default).
address: dotted path of the value retrieved from env.
modify_fn: Function signature `fn(env, env_ids, old_value, **modify_params) -> new_value`.
modify_params: Extra keyword arguments for `modify_fn`.
"""
if not self.get_fn:
self.get_fn, self.set_fn = self._compile_accessors(self._env, self.address)
data = self.get_fn()
new_val = modify_fn(self._env, env_ids, data, **modify_params)
if new_val is not self.NO_CHANGE: # if the modify_fn return NO_CHANGE signal, do not invoke self.set_fn
self.set_fn(new_val)
def _compile_accessors(self, root, path: str):
"""
Build and return (getter, setter) functions for a dotted attribute path.
Supports nested attributes, dict keys, and sequence indexing via "name[idx]".
Args:
root: Base object (usually `self._env`) from which to resolve `path`.
path: Dotted path string, e.g. "foo.bar[2].baz".
Returns:
tuple:
- getter: () -> current value
- setter: (new_value) -> None (writes new_value back into the object)
"""
# Turn "a.b[2].c" into ["a", ("b",2), "c"] and store in parts
parts = []
for part in path.split("."):
m = self._INDEX_RE.match(part)
if m:
parts.append((m.group(1), int(m.group(2))))
else:
parts.append(part)
cur = root
for p in parts[:-1]:
if isinstance(p, tuple):
name, idx = p
container = cur[name] if isinstance(cur, dict) else getattr(cur, name)
cur = container[idx]
else:
cur = cur[p] if isinstance(cur, dict) else getattr(cur, p)
self.container = cur
self.last = parts[-1]
# build the getter and setter
if isinstance(self.container, tuple):
getter = lambda: self.container[self.last] # noqa: E731
def setter(val):
tuple_list = list(self.container)
tuple_list[self.last] = val
self.container = tuple(tuple_list)
elif isinstance(self.container, (list, dict)):
getter = lambda: self.container[self.last] # noqa: E731
def setter(val):
self.container[self.last] = val
elif isinstance(self.container, object):
getter = lambda: getattr(self.container, self.last) # noqa: E731
def setter(val):
setattr(self.container, self.last, val)
else:
raise TypeError(f"getter does not recognize the type {type(self.container)}")
return getter, setter
使用示例:
import torch
from isaaclab.managers import CurriculumTermCfg
import isaaclab.managers.mdp as mdp
def resample_friction(env, env_ids, old_value, low, high, num_steps):
# After num_steps, sample a new friction coefficient uniformly
if env.common_step_counter > num_steps:
return torch.empty((len(env_ids),), device="cpu").uniform_(low, high)
return mdp.modify_env_param.NO_CHANGE
friction_curriculum = CurriculumTermCfg(
func=mdp.modify_env_param,
params={
"address": "event_manager.cfg.object_physics_material.func.material_buckets",
"modify_fn": resample_friction,
"modify_params": {
"low": 0.3,
"high": 1.0,
"num_steps": 120_000,
}
}
)
修改Term配置#
子类 modify_term_cfg
提供了更简洁的地址语法样式,使用与 hydra 配置语法一致。否则,其行为与 modify_env_param
完全相同。
class modify_term_cfg(modify_env_param):
"""Subclass of ModifyEnvParam that maps a simplified 's.'-style address
to the full manager path. This is a more natural style for writing configurations
Reads `cfg.params["address"]`, replaces only the first occurrence of "s."
with "_manager.cfg.", and then behaves identically to ModifyEnvParam.
for example: command_manager.cfg.object_pose.ranges.xpos -> commands.object_pose.ranges.xpos
Usage:
.. code-block:: python
def override_value(env, env_ids, data, value, num_steps):
if env.common_step_counter > num_steps:
return value
return mdp.modify_term_cfg.NO_CHANGE
command_object_pose_xrange_adr = CurrTerm(
func=mdp.modify_term_cfg,
params={
"address": "commands.object_pose.ranges.pos_x", # note that `_manager.cfg` is omitted
"modify_fn": override_value,
"modify_params": {"value": (-.75, -.25), "num_steps": 12000}
}
)
"""
def __init__(self, cfg, env):
"""
Initialize the ModifyTermCfg term.
Args:
cfg: A CurriculumTermCfg whose `params["address"]` is a simplified
path using "s." as separator, e.g. instead of write "observation_manager.cfg", writes "observations".
env: The ManagerBasedRLEnv instance this term will act upon.
"""
super().__init__(cfg, env)
input_address: str = self.cfg.params.get("address")
self.address = input_address.replace("s.", "_manager.cfg.", 1)
使用示例:
def override_command_range(env, env_ids, old_value, value, num_steps):
# Override after num_steps
if env.common_step_counter > num_steps:
return value
return mdp.modify_term_cfg.NO_CHANGE
range_override = CurriculumTermCfg(
func=mdp.modify_term_cfg,
params={
"address": "commands.object_pose.ranges.pos_x",
"modify_fn": override_command_range,
"modify_params": {
"value": (-0.75, -0.25),
"num_steps": 12_000,
}
}
)