创建直接工作流RL环境#
除了 envs.ManagerBasedRLEnv
类之外,还可以使用配置类来为更模块化的环境提供支持, DirectRLEnv
类允许在环境脚本化中进行更直接的控制。
直接工作流任务实现完全奖励和观察功能的直接任务脚本。这允许在方法的实现中更多地控制,比如使用Pytorch jit功能,并提供一个更少抽象的框架,更容易找到各种代码片段。
在本教程中,我们将使用直接工作流实现来配置cartpole环境,以创建一个平衡杆直立的任务。我们将学习如何使用实现场景创建,动作,重置,奖励和观察的功能来指定任务。
代码#
对于本教程,我们使用 omni.isaac.lab_tasks.direct.cartpole
模块中定义的cartpole环境。
cartpole_env.py的代码
1# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
2# All rights reserved.
3#
4# SPDX-License-Identifier: BSD-3-Clause
5
6from __future__ import annotations
7
8import math
9import torch
10from collections.abc import Sequence
11
12from omni.isaac.lab_assets.cartpole import CARTPOLE_CFG
13
14import omni.isaac.lab.sim as sim_utils
15from omni.isaac.lab.assets import Articulation, ArticulationCfg
16from omni.isaac.lab.envs import DirectRLEnv, DirectRLEnvCfg
17from omni.isaac.lab.scene import InteractiveSceneCfg
18from omni.isaac.lab.sim import SimulationCfg
19from omni.isaac.lab.sim.spawners.from_files import GroundPlaneCfg, spawn_ground_plane
20from omni.isaac.lab.utils import configclass
21from omni.isaac.lab.utils.math import sample_uniform
22
23
24@configclass
25class CartpoleEnvCfg(DirectRLEnvCfg):
26 # env
27 decimation = 2
28 episode_length_s = 5.0
29 action_scale = 100.0 # [N]
30 action_space = 1
31 observation_space = 4
32 state_space = 0
33
34 # simulation
35 sim: SimulationCfg = SimulationCfg(dt=1 / 120, render_interval=decimation)
36
37 # robot
38 robot_cfg: ArticulationCfg = CARTPOLE_CFG.replace(prim_path="/World/envs/env_.*/Robot")
39 cart_dof_name = "slider_to_cart"
40 pole_dof_name = "cart_to_pole"
41
42 # scene
43 scene: InteractiveSceneCfg = InteractiveSceneCfg(num_envs=4096, env_spacing=4.0, replicate_physics=True)
44
45 # reset
46 max_cart_pos = 3.0 # the cart is reset if it exceeds that position [m]
47 initial_pole_angle_range = [-0.25, 0.25] # the range in which the pole angle is sampled from on reset [rad]
48
49 # reward scales
50 rew_scale_alive = 1.0
51 rew_scale_terminated = -2.0
52 rew_scale_pole_pos = -1.0
53 rew_scale_cart_vel = -0.01
54 rew_scale_pole_vel = -0.005
55
56
57class CartpoleEnv(DirectRLEnv):
58 cfg: CartpoleEnvCfg
59
60 def __init__(self, cfg: CartpoleEnvCfg, render_mode: str | None = None, **kwargs):
61 super().__init__(cfg, render_mode, **kwargs)
62
63 self._cart_dof_idx, _ = self.cartpole.find_joints(self.cfg.cart_dof_name)
64 self._pole_dof_idx, _ = self.cartpole.find_joints(self.cfg.pole_dof_name)
65 self.action_scale = self.cfg.action_scale
66
67 self.joint_pos = self.cartpole.data.joint_pos
68 self.joint_vel = self.cartpole.data.joint_vel
69
70 def _setup_scene(self):
71 self.cartpole = Articulation(self.cfg.robot_cfg)
72 # add ground plane
73 spawn_ground_plane(prim_path="/World/ground", cfg=GroundPlaneCfg())
74 # clone, filter, and replicate
75 self.scene.clone_environments(copy_from_source=False)
76 self.scene.filter_collisions(global_prim_paths=[])
77 # add articulation to scene
78 self.scene.articulations["cartpole"] = self.cartpole
79 # add lights
80 light_cfg = sim_utils.DomeLightCfg(intensity=2000.0, color=(0.75, 0.75, 0.75))
81 light_cfg.func("/World/Light", light_cfg)
82
83 def _pre_physics_step(self, actions: torch.Tensor) -> None:
84 self.actions = self.action_scale * actions.clone()
85
86 def _apply_action(self) -> None:
87 self.cartpole.set_joint_effort_target(self.actions, joint_ids=self._cart_dof_idx)
88
89 def _get_observations(self) -> dict:
90 obs = torch.cat(
91 (
92 self.joint_pos[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
93 self.joint_vel[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
94 self.joint_pos[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
95 self.joint_vel[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
96 ),
97 dim=-1,
98 )
99 observations = {"policy": obs}
100 return observations
101
102 def _get_rewards(self) -> torch.Tensor:
103 total_reward = compute_rewards(
104 self.cfg.rew_scale_alive,
105 self.cfg.rew_scale_terminated,
106 self.cfg.rew_scale_pole_pos,
107 self.cfg.rew_scale_cart_vel,
108 self.cfg.rew_scale_pole_vel,
109 self.joint_pos[:, self._pole_dof_idx[0]],
110 self.joint_vel[:, self._pole_dof_idx[0]],
111 self.joint_pos[:, self._cart_dof_idx[0]],
112 self.joint_vel[:, self._cart_dof_idx[0]],
113 self.reset_terminated,
114 )
115 return total_reward
116
117 def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
118 self.joint_pos = self.cartpole.data.joint_pos
119 self.joint_vel = self.cartpole.data.joint_vel
120
121 time_out = self.episode_length_buf >= self.max_episode_length - 1
122 out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
123 out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
124 return out_of_bounds, time_out
125
126 def _reset_idx(self, env_ids: Sequence[int] | None):
127 if env_ids is None:
128 env_ids = self.cartpole._ALL_INDICES
129 super()._reset_idx(env_ids)
130
131 joint_pos = self.cartpole.data.default_joint_pos[env_ids]
132 joint_pos[:, self._pole_dof_idx] += sample_uniform(
133 self.cfg.initial_pole_angle_range[0] * math.pi,
134 self.cfg.initial_pole_angle_range[1] * math.pi,
135 joint_pos[:, self._pole_dof_idx].shape,
136 joint_pos.device,
137 )
138 joint_vel = self.cartpole.data.default_joint_vel[env_ids]
139
140 default_root_state = self.cartpole.data.default_root_state[env_ids]
141 default_root_state[:, :3] += self.scene.env_origins[env_ids]
142
143 self.joint_pos[env_ids] = joint_pos
144 self.joint_vel[env_ids] = joint_vel
145
146 self.cartpole.write_root_link_pose_to_sim(default_root_state[:, :7], env_ids)
147 self.cartpole.write_root_com_velocity_to_sim(default_root_state[:, 7:], env_ids)
148 self.cartpole.write_joint_state_to_sim(joint_pos, joint_vel, None, env_ids)
149
150
151@torch.jit.script
152def compute_rewards(
153 rew_scale_alive: float,
154 rew_scale_terminated: float,
155 rew_scale_pole_pos: float,
156 rew_scale_cart_vel: float,
157 rew_scale_pole_vel: float,
158 pole_pos: torch.Tensor,
159 pole_vel: torch.Tensor,
160 cart_pos: torch.Tensor,
161 cart_vel: torch.Tensor,
162 reset_terminated: torch.Tensor,
163):
164 rew_alive = rew_scale_alive * (1.0 - reset_terminated.float())
165 rew_termination = rew_scale_terminated * reset_terminated.float()
166 rew_pole_pos = rew_scale_pole_pos * torch.sum(torch.square(pole_pos).unsqueeze(dim=1), dim=-1)
167 rew_cart_vel = rew_scale_cart_vel * torch.sum(torch.abs(cart_vel).unsqueeze(dim=1), dim=-1)
168 rew_pole_vel = rew_scale_pole_vel * torch.sum(torch.abs(pole_vel).unsqueeze(dim=1), dim=-1)
169 total_reward = rew_alive + rew_termination + rew_pole_pos + rew_cart_vel + rew_pole_vel
170 return total_reward
解释代码#
与基于管理器的环境类似,为任务定义一个配置类,以保存模拟参数、场景、演员和任务的设置。使用直接工作流实现, envs.DirectRLEnvCfg
类作为配置的基类使用。由于直接工作流实现不使用Action和Observation管理器,任务配置应定义环境的动作和观察数。
@configclass
class CartpoleEnvCfg(DirectRLEnvCfg):
...
action_space = 1
observation_space = 4
state_space = 0
配置类也可以用来定义特定于任务的属性,例如奖励项的缩放和重置条件的阈值。
@configclass
class CartpoleEnvCfg(DirectRLEnvCfg):
...
# reset
max_cart_pos = 3.0
initial_pole_angle_range = [-0.25, 0.25]
# reward scales
rew_scale_alive = 1.0
rew_scale_terminated = -2.0
rew_scale_pole_pos = -1.0
rew_scale_cart_vel = -0.01
rew_scale_pole_vel = -0.005
在创建新环境时,代码应该定义一个新的类,该类继承自 DirectRLEnv
。
class CartpoleEnv(DirectRLEnv):
cfg: CartpoleEnvCfg
def __init__(self, cfg: CartpoleEnvCfg, render_mode: str | None = None, **kwargs):
super().__init__(cfg, render_mode, **kwargs)
该类也可以包含所有类函数都可访问的类变量,包括应用动作、计算重置、奖励和观察的功能。
场景创建#
与基于管理器的环境相比,场景创建在直接工作流实现中提供了灵活性,使用户可以实现自己的场景创建功能。这包括将演员添加到场景上,克隆环境,过滤环境之间的碰撞,将演员添加到场景中,以及将其他附加属性添加到场景中,例如地面和灯光。这些操作应该在 _setup_scene(self)
方法中实现。
def _setup_scene(self):
self.cartpole = Articulation(self.cfg.robot_cfg)
# add ground plane
spawn_ground_plane(prim_path="/World/ground", cfg=GroundPlaneCfg())
# clone, filter, and replicate
self.scene.clone_environments(copy_from_source=False)
self.scene.filter_collisions(global_prim_paths=[])
# add articulation to scene
self.scene.articulations["cartpole"] = self.cartpole
# add lights
light_cfg = sim_utils.DomeLightCfg(intensity=2000.0, color=(0.75, 0.75, 0.75))
light_cfg.func("/World/Light", light_cfg)
定义奖励#
奖励函数应该在 _get_rewards(self)
API中定义,它将奖励缓冲区作为返回值返回。在这个函数内部,任务可以自由实现奖励函数的逻辑。在这个示例中,我们实现了一个计算奖励函数各个组成部分的Pytorch JIT函数。
def _get_rewards(self) -> torch.Tensor:
total_reward = compute_rewards(
self.cfg.rew_scale_alive,
self.cfg.rew_scale_terminated,
self.cfg.rew_scale_pole_pos,
self.cfg.rew_scale_cart_vel,
self.cfg.rew_scale_pole_vel,
self.joint_pos[:, self._pole_dof_idx[0]],
self.joint_vel[:, self._pole_dof_idx[0]],
self.joint_pos[:, self._cart_dof_idx[0]],
self.joint_vel[:, self._cart_dof_idx[0]],
self.reset_terminated,
)
return total_reward
@torch.jit.script
def compute_rewards(
rew_scale_alive: float,
rew_scale_terminated: float,
rew_scale_pole_pos: float,
rew_scale_cart_vel: float,
rew_scale_pole_vel: float,
pole_pos: torch.Tensor,
pole_vel: torch.Tensor,
cart_pos: torch.Tensor,
cart_vel: torch.Tensor,
reset_terminated: torch.Tensor,
):
rew_alive = rew_scale_alive * (1.0 - reset_terminated.float())
rew_termination = rew_scale_terminated * reset_terminated.float()
rew_pole_pos = rew_scale_pole_pos * torch.sum(torch.square(pole_pos), dim=-1)
rew_cart_vel = rew_scale_cart_vel * torch.sum(torch.abs(cart_vel), dim=-1)
rew_pole_vel = rew_scale_pole_vel * torch.sum(torch.abs(pole_vel), dim=-1)
total_reward = rew_alive + rew_termination + rew_pole_pos + rew_cart_vel + rew_pole_vel
return total_reward
定义观察#
观察缓冲区应该在 _get_observations(self)
函数中计算,它为环境构造了观察缓冲区。在这个API的结尾,应该返回一个包含 policy
为键和完整观察缓冲区为值的字典。对于不对称的策略,字典还应该包括 critic
键和状态缓冲区作为值。
def _get_observations(self) -> dict:
obs = torch.cat(
(
self.joint_pos[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
self.joint_vel[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
self.joint_pos[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
self.joint_vel[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
),
dim=-1,
)
observations = {"policy": obs}
return observations
计算终止和执行重置#
填充 dones
缓冲区应该在 _get_dones(self)
方法中完成。这个方法可以自由实现逻辑,计算哪些环境需要重置,哪些环境已经达到了本集长度限制。这两个结果都应该作为 _get_dones(self)
函数的返回值,以布尔张量的形式返回。
def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
self.joint_pos = self.cartpole.data.joint_pos
self.joint_vel = self.cartpole.data.joint_vel
time_out = self.episode_length_buf >= self.max_episode_length - 1
out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
return out_of_bounds, time_out
一旦计算出需要重置的环境的索引, _reset_idx(self, env_ids)
函数将在这些环境上执行重置操作。在这个函数内部,应该直接将需要重置的环境的新状态设置到模拟中。
def _reset_idx(self, env_ids: Sequence[int] | None):
if env_ids is None:
env_ids = self.cartpole._ALL_INDICES
super()._reset_idx(env_ids)
joint_pos = self.cartpole.data.default_joint_pos[env_ids]
joint_pos[:, self._pole_dof_idx] += sample_uniform(
self.cfg.initial_pole_angle_range[0] * math.pi,
self.cfg.initial_pole_angle_range[1] * math.pi,
joint_pos[:, self._pole_dof_idx].shape,
joint_pos.device,
)
joint_vel = self.cartpole.data.default_joint_vel[env_ids]
default_root_state = self.cartpole.data.default_root_state[env_ids]
default_root_state[:, :3] += self.scene.env_origins[env_ids]
self.joint_pos[env_ids] = joint_pos
self.joint_vel[env_ids] = joint_vel
self.cartpole.write_root_link_pose_to_sim(default_root_state[:, :7], env_ids)
self.cartpole.write_root_com_velocity_to_sim(default_root_state[:, 7:], env_ids)
self.cartpole.write_joint_state_to_sim(joint_pos, joint_vel, None, env_ids)
应用动作#
有两个设计用于处理动作的API。 _pre_physics_step(self, actions)
以来于策略的动作作为参数,每个RL步骤只调用一次,在采取任何物理步骤之前。这个函数可以用来处理来自策略的动作缓冲区,并将数据缓存到环境的类变量中。
def _pre_physics_step(self, actions: torch.Tensor) -> None:
self.actions = self.action_scale * actions.clone()
_apply_action(self)
API被称为 decimation
次数,每个RL步骤之前调用一次,在采取每个物理步骤之前。这为环境提供了更多的灵活性,可以对需要为每个物理步骤应用动作的环境。
def _apply_action(self) -> None:
self.cartpole.set_joint_effort_target(self.actions, joint_ids=self._cart_dof_idx)
代码执行#
要运行直接工作流Cartpole环境的训练,我们可以使用以下命令:
./isaaclab.sh -p source/standalone/workflows/rl_games/train.py --task=Isaac-Cartpole-Direct-v0
所有直接工作流任务的名称都添加了后缀 -Direct
以区分实现风格。
域随机化#
在直接工作流中,域随机化配置使用 configclass
模块来指定包含 EventTermCfg
变量的配置类。
以下是域随机化的配置类示例:
@configclass
class EventCfg:
robot_physics_material = EventTerm(
func=mdp.randomize_rigid_body_material,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", body_names=".*"),
"static_friction_range": (0.7, 1.3),
"dynamic_friction_range": (1.0, 1.0),
"restitution_range": (1.0, 1.0),
"num_buckets": 250,
},
)
robot_joint_stiffness_and_damping = EventTerm(
func=mdp.randomize_actuator_gains,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", joint_names=".*"),
"stiffness_distribution_params": (0.75, 1.5),
"damping_distribution_params": (0.3, 3.0),
"operation": "scale",
"distribution": "log_uniform",
},
)
reset_gravity = EventTerm(
func=mdp.randomize_physics_scene_gravity,
mode="interval",
is_global_time=True,
interval_range_s=(36.0, 36.0), # time_s = num_steps * (decimation * dt)
params={
"gravity_distribution_params": ([0.0, 0.0, 0.0], [0.0, 0.0, 0.4]),
"operation": "add",
"distribution": "gaussian",
},
)
每个 EventTerm
对象都是 EventTermCfg
类的对象,并接受一个 func
参数,用于指定随机化时调用的函数,一个 mode
参数,可以是 startup
、reset
或 interval
。 params
字典应该提供在 func
参数中指定的函数所需的参数。作为 EventTerm
的 func
指定的函数可以在 events
模块中找到。
请注意,作为 "asset_cfg": SceneEntityCfg("robot", body_names=".*")
参数的一部分,提供了actor "robot"
的名称,以及以正则表达式指定的身体或关节名称,这些名称将是应用了随机化的演员和身体/关节。
一旦设置了随机化术语的 configclass
,必须将该类添加到任务的基本配置类中,并分配给变量 events
。
@configclass
class MyTaskConfig:
events: EventCfg = EventCfg()
动作和观察噪声#
也可以使用 configclass
模块添加动作和观察噪声配置。必须将动作和观察噪声配置添加到主任务配置中,使用 action_noise_model
和 observation_noise_model
变量:
@configclass
class MyTaskConfig:
# at every time-step add gaussian noise + bias. The bias is a gaussian sampled at reset
action_noise_model: NoiseModelWithAdditiveBiasCfg = NoiseModelWithAdditiveBiasCfg(
noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.05, operation="add"),
bias_noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.015, operation="abs"),
)
# at every time-step add gaussian noise + bias. The bias is a gaussian sampled at reset
observation_noise_model: NoiseModelWithAdditiveBiasCfg = NoiseModelWithAdditiveBiasCfg(
noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.002, operation="add"),
bias_noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.0001, operation="abs"),
)
NoiseModelWithAdditiveBiasCfg
可以用来对每个步骤进行无关噪声的采样,以及在重置时重新采样的相关噪声。
noise_cfg
术语指定了将在每一步中为所有环境进行采样的高斯分布。这个噪声将被添加到相应的动作和观察缓冲区中的每一步。
bias_noise_cfg
术语指定了在重置时为被重置的环境进行采样的相关噪声的高斯分布。对于剩余的环境,这个相同的噪声将在每一步应用,并在下一次重置时重新采样。
如果只希望每步有噪声,可以使用 GaussianNoiseCfg
来指定一个会将采样噪声添加到输入缓冲区中的混合高斯分布。
@configclass
class MyTaskConfig:
action_noise_model: GaussianNoiseCfg = GaussianNoiseCfg(mean=0.0, std=0.05, operation="add")
在本教程中,我们学习了如何为强化学习创建直接工作流任务环境。我们通过扩展基础环境来包括场景设置、动作、终止、重置、奖励和观察函数来实现这一点。
虽然可以手动创建所需任务的 DirectRLEnv
类的实例,但这种方法不具有可扩展性,因为它需要为每个任务编写专门的脚本。因此,在接下来的教程中,我们将利用 gymnasium.make()
函数来使用gym接口创建环境。