Source code for isaaclab_rl.rsl_rl.vecenv_wrapper

# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

import gymnasium as gym
import torch
from tensordict import TensorDict

from rsl_rl.env import VecEnv

from isaaclab.envs import DirectRLEnv, ManagerBasedRLEnv


[docs]class RslRlVecEnvWrapper(VecEnv): """Wraps around Isaac Lab environment for the RSL-RL library .. caution:: This class must be the last wrapper in the wrapper chain. This is because the wrapper does not follow the :class:`gym.Wrapper` interface. Any subsequent wrappers will need to be modified to work with this wrapper. Reference: https://github.com/leggedrobotics/rsl_rl/blob/master/rsl_rl/env/vec_env.py """
[docs] def __init__(self, env: ManagerBasedRLEnv | DirectRLEnv, clip_actions: float | None = None): """Initializes the wrapper. Note: The wrapper calls :meth:`reset` at the start since the RSL-RL runner does not call reset. Args: env: The environment to wrap around. clip_actions: The clipping value for actions. If ``None``, then no clipping is done. Raises: ValueError: When the environment is not an instance of :class:`ManagerBasedRLEnv` or :class:`DirectRLEnv`. """ # check that input is valid if not isinstance(env.unwrapped, ManagerBasedRLEnv) and not isinstance(env.unwrapped, DirectRLEnv): raise ValueError( "The environment must be inherited from ManagerBasedRLEnv or DirectRLEnv. Environment type:" f" {type(env)}" ) # initialize the wrapper self.env = env self.clip_actions = clip_actions # store information required by wrapper self.num_envs = self.unwrapped.num_envs self.device = self.unwrapped.device self.max_episode_length = self.unwrapped.max_episode_length # obtain dimensions of the environment if hasattr(self.unwrapped, "action_manager"): self.num_actions = self.unwrapped.action_manager.total_action_dim else: self.num_actions = gym.spaces.flatdim(self.unwrapped.single_action_space) # modify the action space to the clip range self._modify_action_space() # reset at the start since the RSL-RL runner does not call reset self.env.reset()
def __str__(self): """Returns the wrapper name and the :attr:`env` representation string.""" return f"<{type(self).__name__}{self.env}>" def __repr__(self): """Returns the string representation of the wrapper.""" return str(self) """ Properties -- Gym.Wrapper """ @property def cfg(self) -> object: """Returns the configuration class instance of the environment.""" return self.unwrapped.cfg @property def render_mode(self) -> str | None: """Returns the :attr:`Env` :attr:`render_mode`.""" return self.env.render_mode @property def observation_space(self) -> gym.Space: """Returns the :attr:`Env` :attr:`observation_space`.""" return self.env.observation_space @property def action_space(self) -> gym.Space: """Returns the :attr:`Env` :attr:`action_space`.""" return self.env.action_space
[docs] @classmethod def class_name(cls) -> str: """Returns the class name of the wrapper.""" return cls.__name__
@property def unwrapped(self) -> ManagerBasedRLEnv | DirectRLEnv: """Returns the base environment of the wrapper. This will be the bare :class:`gymnasium.Env` environment, underneath all layers of wrappers. """ return self.env.unwrapped """ Properties """ @property def episode_length_buf(self) -> torch.Tensor: """The episode length buffer.""" return self.unwrapped.episode_length_buf @episode_length_buf.setter def episode_length_buf(self, value: torch.Tensor): """Set the episode length buffer. Note: This is needed to perform random initialization of episode lengths in RSL-RL. """ self.unwrapped.episode_length_buf = value """ Operations - MDP """ def seed(self, seed: int = -1) -> int: # noqa: D102 return self.unwrapped.seed(seed) def reset(self) -> tuple[TensorDict, dict]: # noqa: D102 # reset the environment obs_dict, extras = self.env.reset() return TensorDict(obs_dict, batch_size=[self.num_envs]), extras
[docs] def get_observations(self) -> TensorDict: """Returns the current observations of the environment.""" if hasattr(self.unwrapped, "observation_manager"): obs_dict = self.unwrapped.observation_manager.compute() else: obs_dict = self.unwrapped._get_observations() return TensorDict(obs_dict, batch_size=[self.num_envs])
def step(self, actions: torch.Tensor) -> tuple[TensorDict, torch.Tensor, torch.Tensor, dict]: # clip actions if self.clip_actions is not None: actions = torch.clamp(actions, -self.clip_actions, self.clip_actions) # record step information obs_dict, rew, terminated, truncated, extras = self.env.step(actions) # compute dones for compatibility with RSL-RL dones = (terminated | truncated).to(dtype=torch.long) # move time out information to the extras dict # this is only needed for infinite horizon tasks if not self.unwrapped.cfg.is_finite_horizon: extras["time_outs"] = truncated # return the step information return TensorDict(obs_dict, batch_size=[self.num_envs]), rew, dones, extras def close(self): # noqa: D102 return self.env.close() """ Helper functions """ def _modify_action_space(self): """Modifies the action space to the clip range.""" if self.clip_actions is None: return # modify the action space to the clip range # note: this is only possible for the box action space. we need to change it in the future for other # action spaces. self.env.unwrapped.single_action_space = gym.spaces.Box( low=-self.clip_actions, high=self.clip_actions, shape=(self.num_actions,) ) self.env.unwrapped.action_space = gym.vector.utils.batch_space( self.env.unwrapped.single_action_space, self.num_envs )