创建直接工作流RL环境#

除了 envs.ManagerBasedRLEnv 类之外,还可以使用配置类来为更模块化的环境提供支持, DirectRLEnv 类允许在环境脚本化中进行更直接的控制。

直接工作流任务实现完全奖励和观察功能的直接任务脚本。这允许在方法的实现中更多地控制,比如使用Pytorch jit功能,并提供一个更少抽象的框架,更容易找到各种代码片段。

在本教程中,我们将使用直接工作流实现来配置cartpole环境,以创建一个平衡杆直立的任务。我们将学习如何使用实现场景创建,动作,重置,奖励和观察的功能来指定任务。

代码#

对于本教程,我们使用 omni.isaac.lab_tasks.direct.cartpole 模块中定义的cartpole环境。

cartpole_env.py的代码
  1# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
  2# All rights reserved.
  3#
  4# SPDX-License-Identifier: BSD-3-Clause
  5
  6from __future__ import annotations
  7
  8import math
  9import torch
 10from collections.abc import Sequence
 11
 12from omni.isaac.lab_assets.cartpole import CARTPOLE_CFG
 13
 14import omni.isaac.lab.sim as sim_utils
 15from omni.isaac.lab.assets import Articulation, ArticulationCfg
 16from omni.isaac.lab.envs import DirectRLEnv, DirectRLEnvCfg
 17from omni.isaac.lab.scene import InteractiveSceneCfg
 18from omni.isaac.lab.sim import SimulationCfg
 19from omni.isaac.lab.sim.spawners.from_files import GroundPlaneCfg, spawn_ground_plane
 20from omni.isaac.lab.utils import configclass
 21from omni.isaac.lab.utils.math import sample_uniform
 22
 23
 24@configclass
 25class CartpoleEnvCfg(DirectRLEnvCfg):
 26    # env
 27    decimation = 2
 28    episode_length_s = 5.0
 29    action_scale = 100.0  # [N]
 30    action_space = 1
 31    observation_space = 4
 32    state_space = 0
 33
 34    # simulation
 35    sim: SimulationCfg = SimulationCfg(dt=1 / 120, render_interval=decimation)
 36
 37    # robot
 38    robot_cfg: ArticulationCfg = CARTPOLE_CFG.replace(prim_path="/World/envs/env_.*/Robot")
 39    cart_dof_name = "slider_to_cart"
 40    pole_dof_name = "cart_to_pole"
 41
 42    # scene
 43    scene: InteractiveSceneCfg = InteractiveSceneCfg(num_envs=4096, env_spacing=4.0, replicate_physics=True)
 44
 45    # reset
 46    max_cart_pos = 3.0  # the cart is reset if it exceeds that position [m]
 47    initial_pole_angle_range = [-0.25, 0.25]  # the range in which the pole angle is sampled from on reset [rad]
 48
 49    # reward scales
 50    rew_scale_alive = 1.0
 51    rew_scale_terminated = -2.0
 52    rew_scale_pole_pos = -1.0
 53    rew_scale_cart_vel = -0.01
 54    rew_scale_pole_vel = -0.005
 55
 56
 57class CartpoleEnv(DirectRLEnv):
 58    cfg: CartpoleEnvCfg
 59
 60    def __init__(self, cfg: CartpoleEnvCfg, render_mode: str | None = None, **kwargs):
 61        super().__init__(cfg, render_mode, **kwargs)
 62
 63        self._cart_dof_idx, _ = self.cartpole.find_joints(self.cfg.cart_dof_name)
 64        self._pole_dof_idx, _ = self.cartpole.find_joints(self.cfg.pole_dof_name)
 65        self.action_scale = self.cfg.action_scale
 66
 67        self.joint_pos = self.cartpole.data.joint_pos
 68        self.joint_vel = self.cartpole.data.joint_vel
 69
 70    def _setup_scene(self):
 71        self.cartpole = Articulation(self.cfg.robot_cfg)
 72        # add ground plane
 73        spawn_ground_plane(prim_path="/World/ground", cfg=GroundPlaneCfg())
 74        # clone, filter, and replicate
 75        self.scene.clone_environments(copy_from_source=False)
 76        self.scene.filter_collisions(global_prim_paths=[])
 77        # add articulation to scene
 78        self.scene.articulations["cartpole"] = self.cartpole
 79        # add lights
 80        light_cfg = sim_utils.DomeLightCfg(intensity=2000.0, color=(0.75, 0.75, 0.75))
 81        light_cfg.func("/World/Light", light_cfg)
 82
 83    def _pre_physics_step(self, actions: torch.Tensor) -> None:
 84        self.actions = self.action_scale * actions.clone()
 85
 86    def _apply_action(self) -> None:
 87        self.cartpole.set_joint_effort_target(self.actions, joint_ids=self._cart_dof_idx)
 88
 89    def _get_observations(self) -> dict:
 90        obs = torch.cat(
 91            (
 92                self.joint_pos[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
 93                self.joint_vel[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
 94                self.joint_pos[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
 95                self.joint_vel[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
 96            ),
 97            dim=-1,
 98        )
 99        observations = {"policy": obs}
100        return observations
101
102    def _get_rewards(self) -> torch.Tensor:
103        total_reward = compute_rewards(
104            self.cfg.rew_scale_alive,
105            self.cfg.rew_scale_terminated,
106            self.cfg.rew_scale_pole_pos,
107            self.cfg.rew_scale_cart_vel,
108            self.cfg.rew_scale_pole_vel,
109            self.joint_pos[:, self._pole_dof_idx[0]],
110            self.joint_vel[:, self._pole_dof_idx[0]],
111            self.joint_pos[:, self._cart_dof_idx[0]],
112            self.joint_vel[:, self._cart_dof_idx[0]],
113            self.reset_terminated,
114        )
115        return total_reward
116
117    def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
118        self.joint_pos = self.cartpole.data.joint_pos
119        self.joint_vel = self.cartpole.data.joint_vel
120
121        time_out = self.episode_length_buf >= self.max_episode_length - 1
122        out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
123        out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
124        return out_of_bounds, time_out
125
126    def _reset_idx(self, env_ids: Sequence[int] | None):
127        if env_ids is None:
128            env_ids = self.cartpole._ALL_INDICES
129        super()._reset_idx(env_ids)
130
131        joint_pos = self.cartpole.data.default_joint_pos[env_ids]
132        joint_pos[:, self._pole_dof_idx] += sample_uniform(
133            self.cfg.initial_pole_angle_range[0] * math.pi,
134            self.cfg.initial_pole_angle_range[1] * math.pi,
135            joint_pos[:, self._pole_dof_idx].shape,
136            joint_pos.device,
137        )
138        joint_vel = self.cartpole.data.default_joint_vel[env_ids]
139
140        default_root_state = self.cartpole.data.default_root_state[env_ids]
141        default_root_state[:, :3] += self.scene.env_origins[env_ids]
142
143        self.joint_pos[env_ids] = joint_pos
144        self.joint_vel[env_ids] = joint_vel
145
146        self.cartpole.write_root_link_pose_to_sim(default_root_state[:, :7], env_ids)
147        self.cartpole.write_root_com_velocity_to_sim(default_root_state[:, 7:], env_ids)
148        self.cartpole.write_joint_state_to_sim(joint_pos, joint_vel, None, env_ids)
149
150
151@torch.jit.script
152def compute_rewards(
153    rew_scale_alive: float,
154    rew_scale_terminated: float,
155    rew_scale_pole_pos: float,
156    rew_scale_cart_vel: float,
157    rew_scale_pole_vel: float,
158    pole_pos: torch.Tensor,
159    pole_vel: torch.Tensor,
160    cart_pos: torch.Tensor,
161    cart_vel: torch.Tensor,
162    reset_terminated: torch.Tensor,
163):
164    rew_alive = rew_scale_alive * (1.0 - reset_terminated.float())
165    rew_termination = rew_scale_terminated * reset_terminated.float()
166    rew_pole_pos = rew_scale_pole_pos * torch.sum(torch.square(pole_pos).unsqueeze(dim=1), dim=-1)
167    rew_cart_vel = rew_scale_cart_vel * torch.sum(torch.abs(cart_vel).unsqueeze(dim=1), dim=-1)
168    rew_pole_vel = rew_scale_pole_vel * torch.sum(torch.abs(pole_vel).unsqueeze(dim=1), dim=-1)
169    total_reward = rew_alive + rew_termination + rew_pole_pos + rew_cart_vel + rew_pole_vel
170    return total_reward

解释代码#

与基于管理器的环境类似,为任务定义一个配置类,以保存模拟参数、场景、演员和任务的设置。使用直接工作流实现, envs.DirectRLEnvCfg 类作为配置的基类使用。由于直接工作流实现不使用Action和Observation管理器,任务配置应定义环境的动作和观察数。

@configclass
class CartpoleEnvCfg(DirectRLEnvCfg):
   ...
   action_space = 1
   observation_space = 4
   state_space = 0

配置类也可以用来定义特定于任务的属性,例如奖励项的缩放和重置条件的阈值。

@configclass
class CartpoleEnvCfg(DirectRLEnvCfg):
   ...
   # reset
   max_cart_pos = 3.0
   initial_pole_angle_range = [-0.25, 0.25]

   # reward scales
   rew_scale_alive = 1.0
   rew_scale_terminated = -2.0
   rew_scale_pole_pos = -1.0
   rew_scale_cart_vel = -0.01
   rew_scale_pole_vel = -0.005

在创建新环境时,代码应该定义一个新的类,该类继承自 DirectRLEnv

class CartpoleEnv(DirectRLEnv):
   cfg: CartpoleEnvCfg

   def __init__(self, cfg: CartpoleEnvCfg, render_mode: str | None = None, **kwargs):
     super().__init__(cfg, render_mode, **kwargs)

该类也可以包含所有类函数都可访问的类变量,包括应用动作、计算重置、奖励和观察的功能。

场景创建#

与基于管理器的环境相比,场景创建在直接工作流实现中提供了灵活性,使用户可以实现自己的场景创建功能。这包括将演员添加到场景上,克隆环境,过滤环境之间的碰撞,将演员添加到场景中,以及将其他附加属性添加到场景中,例如地面和灯光。这些操作应该在 _setup_scene(self) 方法中实现。

    def _setup_scene(self):
        self.cartpole = Articulation(self.cfg.robot_cfg)
        # add ground plane
        spawn_ground_plane(prim_path="/World/ground", cfg=GroundPlaneCfg())
        # clone, filter, and replicate
        self.scene.clone_environments(copy_from_source=False)
        self.scene.filter_collisions(global_prim_paths=[])
        # add articulation to scene
        self.scene.articulations["cartpole"] = self.cartpole
        # add lights
        light_cfg = sim_utils.DomeLightCfg(intensity=2000.0, color=(0.75, 0.75, 0.75))
        light_cfg.func("/World/Light", light_cfg)

定义奖励#

奖励函数应该在 _get_rewards(self) API中定义,它将奖励缓冲区作为返回值返回。在这个函数内部,任务可以自由实现奖励函数的逻辑。在这个示例中,我们实现了一个计算奖励函数各个组成部分的Pytorch JIT函数。

def _get_rewards(self) -> torch.Tensor:
     total_reward = compute_rewards(
         self.cfg.rew_scale_alive,
         self.cfg.rew_scale_terminated,
         self.cfg.rew_scale_pole_pos,
         self.cfg.rew_scale_cart_vel,
         self.cfg.rew_scale_pole_vel,
         self.joint_pos[:, self._pole_dof_idx[0]],
         self.joint_vel[:, self._pole_dof_idx[0]],
         self.joint_pos[:, self._cart_dof_idx[0]],
         self.joint_vel[:, self._cart_dof_idx[0]],
         self.reset_terminated,
     )
     return total_reward

@torch.jit.script
def compute_rewards(
    rew_scale_alive: float,
    rew_scale_terminated: float,
    rew_scale_pole_pos: float,
    rew_scale_cart_vel: float,
    rew_scale_pole_vel: float,
    pole_pos: torch.Tensor,
    pole_vel: torch.Tensor,
    cart_pos: torch.Tensor,
    cart_vel: torch.Tensor,
    reset_terminated: torch.Tensor,
):
    rew_alive = rew_scale_alive * (1.0 - reset_terminated.float())
    rew_termination = rew_scale_terminated * reset_terminated.float()
    rew_pole_pos = rew_scale_pole_pos * torch.sum(torch.square(pole_pos), dim=-1)
    rew_cart_vel = rew_scale_cart_vel * torch.sum(torch.abs(cart_vel), dim=-1)
    rew_pole_vel = rew_scale_pole_vel * torch.sum(torch.abs(pole_vel), dim=-1)
    total_reward = rew_alive + rew_termination + rew_pole_pos + rew_cart_vel + rew_pole_vel
    return total_reward

定义观察#

观察缓冲区应该在 _get_observations(self) 函数中计算,它为环境构造了观察缓冲区。在这个API的结尾,应该返回一个包含 policy 为键和完整观察缓冲区为值的字典。对于不对称的策略,字典还应该包括 critic 键和状态缓冲区作为值。

    def _get_observations(self) -> dict:
        obs = torch.cat(
            (
                self.joint_pos[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
                self.joint_vel[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
                self.joint_pos[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
                self.joint_vel[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
            ),
            dim=-1,
        )
        observations = {"policy": obs}
        return observations

计算终止和执行重置#

填充 dones 缓冲区应该在 _get_dones(self) 方法中完成。这个方法可以自由实现逻辑,计算哪些环境需要重置,哪些环境已经达到了本集长度限制。这两个结果都应该作为 _get_dones(self) 函数的返回值,以布尔张量的形式返回。

    def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
        self.joint_pos = self.cartpole.data.joint_pos
        self.joint_vel = self.cartpole.data.joint_vel

        time_out = self.episode_length_buf >= self.max_episode_length - 1
        out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
        out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
        return out_of_bounds, time_out

一旦计算出需要重置的环境的索引, _reset_idx(self, env_ids) 函数将在这些环境上执行重置操作。在这个函数内部,应该直接将需要重置的环境的新状态设置到模拟中。

    def _reset_idx(self, env_ids: Sequence[int] | None):
        if env_ids is None:
            env_ids = self.cartpole._ALL_INDICES
        super()._reset_idx(env_ids)

        joint_pos = self.cartpole.data.default_joint_pos[env_ids]
        joint_pos[:, self._pole_dof_idx] += sample_uniform(
            self.cfg.initial_pole_angle_range[0] * math.pi,
            self.cfg.initial_pole_angle_range[1] * math.pi,
            joint_pos[:, self._pole_dof_idx].shape,
            joint_pos.device,
        )
        joint_vel = self.cartpole.data.default_joint_vel[env_ids]

        default_root_state = self.cartpole.data.default_root_state[env_ids]
        default_root_state[:, :3] += self.scene.env_origins[env_ids]

        self.joint_pos[env_ids] = joint_pos
        self.joint_vel[env_ids] = joint_vel

        self.cartpole.write_root_link_pose_to_sim(default_root_state[:, :7], env_ids)
        self.cartpole.write_root_com_velocity_to_sim(default_root_state[:, 7:], env_ids)
        self.cartpole.write_joint_state_to_sim(joint_pos, joint_vel, None, env_ids)

应用动作#

有两个设计用于处理动作的API。 _pre_physics_step(self, actions) 以来于策略的动作作为参数,每个RL步骤只调用一次,在采取任何物理步骤之前。这个函数可以用来处理来自策略的动作缓冲区,并将数据缓存到环境的类变量中。

    def _pre_physics_step(self, actions: torch.Tensor) -> None:
        self.actions = self.action_scale * actions.clone()

_apply_action(self) API被称为 decimation 次数,每个RL步骤之前调用一次,在采取每个物理步骤之前。这为环境提供了更多的灵活性,可以对需要为每个物理步骤应用动作的环境。

    def _apply_action(self) -> None:
        self.cartpole.set_joint_effort_target(self.actions, joint_ids=self._cart_dof_idx)

代码执行#

要运行直接工作流Cartpole环境的训练,我们可以使用以下命令:

./isaaclab.sh -p source/standalone/workflows/rl_games/train.py --task=Isaac-Cartpole-Direct-v0
训练.py的结果

所有直接工作流任务的名称都添加了后缀 -Direct 以区分实现风格。

域随机化#

在直接工作流中,域随机化配置使用 configclass 模块来指定包含 EventTermCfg 变量的配置类。

以下是域随机化的配置类示例:

@configclass
class EventCfg:
  robot_physics_material = EventTerm(
      func=mdp.randomize_rigid_body_material,
      mode="reset",
      params={
          "asset_cfg": SceneEntityCfg("robot", body_names=".*"),
          "static_friction_range": (0.7, 1.3),
          "dynamic_friction_range": (1.0, 1.0),
          "restitution_range": (1.0, 1.0),
          "num_buckets": 250,
      },
  )
  robot_joint_stiffness_and_damping = EventTerm(
      func=mdp.randomize_actuator_gains,
      mode="reset",
      params={
          "asset_cfg": SceneEntityCfg("robot", joint_names=".*"),
          "stiffness_distribution_params": (0.75, 1.5),
          "damping_distribution_params": (0.3, 3.0),
          "operation": "scale",
          "distribution": "log_uniform",
      },
  )
  reset_gravity = EventTerm(
      func=mdp.randomize_physics_scene_gravity,
      mode="interval",
      is_global_time=True,
      interval_range_s=(36.0, 36.0),  # time_s = num_steps * (decimation * dt)
      params={
          "gravity_distribution_params": ([0.0, 0.0, 0.0], [0.0, 0.0, 0.4]),
          "operation": "add",
          "distribution": "gaussian",
      },
  )

每个 EventTerm 对象都是 EventTermCfg 类的对象,并接受一个 func 参数,用于指定随机化时调用的函数,一个 mode 参数,可以是 startupresetintervalparams 字典应该提供在 func 参数中指定的函数所需的参数。作为 EventTermfunc 指定的函数可以在 events 模块中找到。

请注意,作为 "asset_cfg": SceneEntityCfg("robot", body_names=".*") 参数的一部分,提供了actor "robot" 的名称,以及以正则表达式指定的身体或关节名称,这些名称将是应用了随机化的演员和身体/关节。

一旦设置了随机化术语的 configclass ,必须将该类添加到任务的基本配置类中,并分配给变量 events

@configclass
class MyTaskConfig:
  events: EventCfg = EventCfg()

动作和观察噪声#

也可以使用 configclass 模块添加动作和观察噪声配置。必须将动作和观察噪声配置添加到主任务配置中,使用 action_noise_modelobservation_noise_model 变量:

@configclass
class MyTaskConfig:

    # at every time-step add gaussian noise + bias. The bias is a gaussian sampled at reset
    action_noise_model: NoiseModelWithAdditiveBiasCfg = NoiseModelWithAdditiveBiasCfg(
      noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.05, operation="add"),
      bias_noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.015, operation="abs"),
    )

    # at every time-step add gaussian noise + bias. The bias is a gaussian sampled at reset
    observation_noise_model: NoiseModelWithAdditiveBiasCfg = NoiseModelWithAdditiveBiasCfg(
      noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.002, operation="add"),
      bias_noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.0001, operation="abs"),
    )

NoiseModelWithAdditiveBiasCfg 可以用来对每个步骤进行无关噪声的采样,以及在重置时重新采样的相关噪声。

noise_cfg 术语指定了将在每一步中为所有环境进行采样的高斯分布。这个噪声将被添加到相应的动作和观察缓冲区中的每一步。

bias_noise_cfg 术语指定了在重置时为被重置的环境进行采样的相关噪声的高斯分布。对于剩余的环境,这个相同的噪声将在每一步应用,并在下一次重置时重新采样。

如果只希望每步有噪声,可以使用 GaussianNoiseCfg 来指定一个会将采样噪声添加到输入缓冲区中的混合高斯分布。

@configclass
class MyTaskConfig:
  action_noise_model: GaussianNoiseCfg = GaussianNoiseCfg(mean=0.0, std=0.05, operation="add")

在本教程中,我们学习了如何为强化学习创建直接工作流任务环境。我们通过扩展基础环境来包括场景设置、动作、终止、重置、奖励和观察函数来实现这一点。

虽然可以手动创建所需任务的 DirectRLEnv 类的实例,但这种方法不具有可扩展性,因为它需要为每个任务编写专门的脚本。因此,在接下来的教程中,我们将利用 gymnasium.make() 函数来使用gym接口创建环境。