Creating a Direct Workflow RL Environment#
In addition to the envs.ManagerBasedRLEnv
class, which encourages the use of configuration classes
for more modular environments, the DirectRLEnv
class allows for more direct control
in the scripting of environment.
Instead of using Manager classes for defining rewards and observations, the direct workflow tasks implement the full reward and observation functions directly in the task script. This allows for more control in the implementation of the methods, such as using pytorch jit features, and provides a less abstracted framework that makes it easier to find the various pieces of code.
In this tutorial, we will configure the cartpole environment using the direct workflow implementation to create a task for balancing the pole upright. We will learn how to specify the task using by implementing functions for scene creation, actions, resets, rewards and observations.
The Code#
For this tutorial, we use the cartpole environment defined in isaaclab_tasks.direct.cartpole
module.
Code for cartpole_env.py
1# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
2# All rights reserved.
3#
4# SPDX-License-Identifier: BSD-3-Clause
5
6# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
7# All rights reserved.
8#
9# SPDX-License-Identifier: BSD-3-Clause
10
11from __future__ import annotations
12
13import math
14import torch
15from collections.abc import Sequence
16
17from isaaclab_assets.robots.cartpole import CARTPOLE_CFG
18
19import isaaclab.sim as sim_utils
20from isaaclab.assets import Articulation, ArticulationCfg
21from isaaclab.envs import DirectRLEnv, DirectRLEnvCfg
22from isaaclab.scene import InteractiveSceneCfg
23from isaaclab.sim import SimulationCfg
24from isaaclab.sim.spawners.from_files import GroundPlaneCfg, spawn_ground_plane
25from isaaclab.utils import configclass
26from isaaclab.utils.math import sample_uniform
27
28
29@configclass
30class CartpoleEnvCfg(DirectRLEnvCfg):
31 # env
32 decimation = 2
33 episode_length_s = 5.0
34 action_scale = 100.0 # [N]
35 action_space = 1
36 observation_space = 4
37 state_space = 0
38
39 # simulation
40 sim: SimulationCfg = SimulationCfg(dt=1 / 120, render_interval=decimation)
41
42 # robot
43 robot_cfg: ArticulationCfg = CARTPOLE_CFG.replace(prim_path="/World/envs/env_.*/Robot")
44 cart_dof_name = "slider_to_cart"
45 pole_dof_name = "cart_to_pole"
46
47 # scene
48 scene: InteractiveSceneCfg = InteractiveSceneCfg(num_envs=4096, env_spacing=4.0, replicate_physics=True)
49
50 # reset
51 max_cart_pos = 3.0 # the cart is reset if it exceeds that position [m]
52 initial_pole_angle_range = [-0.25, 0.25] # the range in which the pole angle is sampled from on reset [rad]
53
54 # reward scales
55 rew_scale_alive = 1.0
56 rew_scale_terminated = -2.0
57 rew_scale_pole_pos = -1.0
58 rew_scale_cart_vel = -0.01
59 rew_scale_pole_vel = -0.005
60
61
62class CartpoleEnv(DirectRLEnv):
63 cfg: CartpoleEnvCfg
64
65 def __init__(self, cfg: CartpoleEnvCfg, render_mode: str | None = None, **kwargs):
66 super().__init__(cfg, render_mode, **kwargs)
67
68 self._cart_dof_idx, _ = self.cartpole.find_joints(self.cfg.cart_dof_name)
69 self._pole_dof_idx, _ = self.cartpole.find_joints(self.cfg.pole_dof_name)
70 self.action_scale = self.cfg.action_scale
71
72 self.joint_pos = self.cartpole.data.joint_pos
73 self.joint_vel = self.cartpole.data.joint_vel
74
75 def _setup_scene(self):
76 self.cartpole = Articulation(self.cfg.robot_cfg)
77 # add ground plane
78 spawn_ground_plane(prim_path="/World/ground", cfg=GroundPlaneCfg())
79 # clone and replicate
80 self.scene.clone_environments(copy_from_source=False)
81 # we need to explicitly filter collisions for CPU simulation
82 if self.device == "cpu":
83 self.scene.filter_collisions(global_prim_paths=[])
84 # add articulation to scene
85 self.scene.articulations["cartpole"] = self.cartpole
86 # add lights
87 light_cfg = sim_utils.DomeLightCfg(intensity=2000.0, color=(0.75, 0.75, 0.75))
88 light_cfg.func("/World/Light", light_cfg)
89
90 def _pre_physics_step(self, actions: torch.Tensor) -> None:
91 self.actions = self.action_scale * actions.clone()
92
93 def _apply_action(self) -> None:
94 self.cartpole.set_joint_effort_target(self.actions, joint_ids=self._cart_dof_idx)
95
96 def _get_observations(self) -> dict:
97 obs = torch.cat(
98 (
99 self.joint_pos[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
100 self.joint_vel[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
101 self.joint_pos[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
102 self.joint_vel[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
103 ),
104 dim=-1,
105 )
106 observations = {"policy": obs}
107 return observations
108
109 def _get_rewards(self) -> torch.Tensor:
110 total_reward = compute_rewards(
111 self.cfg.rew_scale_alive,
112 self.cfg.rew_scale_terminated,
113 self.cfg.rew_scale_pole_pos,
114 self.cfg.rew_scale_cart_vel,
115 self.cfg.rew_scale_pole_vel,
116 self.joint_pos[:, self._pole_dof_idx[0]],
117 self.joint_vel[:, self._pole_dof_idx[0]],
118 self.joint_pos[:, self._cart_dof_idx[0]],
119 self.joint_vel[:, self._cart_dof_idx[0]],
120 self.reset_terminated,
121 )
122 return total_reward
123
124 def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
125 self.joint_pos = self.cartpole.data.joint_pos
126 self.joint_vel = self.cartpole.data.joint_vel
127
128 time_out = self.episode_length_buf >= self.max_episode_length - 1
129 out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
130 out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
131 return out_of_bounds, time_out
132
133 def _reset_idx(self, env_ids: Sequence[int] | None):
134 if env_ids is None:
135 env_ids = self.cartpole._ALL_INDICES
136 super()._reset_idx(env_ids)
137
138 joint_pos = self.cartpole.data.default_joint_pos[env_ids]
139 joint_pos[:, self._pole_dof_idx] += sample_uniform(
140 self.cfg.initial_pole_angle_range[0] * math.pi,
141 self.cfg.initial_pole_angle_range[1] * math.pi,
142 joint_pos[:, self._pole_dof_idx].shape,
143 joint_pos.device,
144 )
145 joint_vel = self.cartpole.data.default_joint_vel[env_ids]
146
147 default_root_state = self.cartpole.data.default_root_state[env_ids]
148 default_root_state[:, :3] += self.scene.env_origins[env_ids]
149
150 self.joint_pos[env_ids] = joint_pos
151 self.joint_vel[env_ids] = joint_vel
152
153 self.cartpole.write_root_pose_to_sim(default_root_state[:, :7], env_ids)
154 self.cartpole.write_root_velocity_to_sim(default_root_state[:, 7:], env_ids)
155 self.cartpole.write_joint_state_to_sim(joint_pos, joint_vel, None, env_ids)
156
157
158@torch.jit.script
159def compute_rewards(
160 rew_scale_alive: float,
161 rew_scale_terminated: float,
162 rew_scale_pole_pos: float,
163 rew_scale_cart_vel: float,
164 rew_scale_pole_vel: float,
165 pole_pos: torch.Tensor,
166 pole_vel: torch.Tensor,
167 cart_pos: torch.Tensor,
168 cart_vel: torch.Tensor,
169 reset_terminated: torch.Tensor,
170):
171 rew_alive = rew_scale_alive * (1.0 - reset_terminated.float())
172 rew_termination = rew_scale_terminated * reset_terminated.float()
173 rew_pole_pos = rew_scale_pole_pos * torch.sum(torch.square(pole_pos).unsqueeze(dim=1), dim=-1)
174 rew_cart_vel = rew_scale_cart_vel * torch.sum(torch.abs(cart_vel).unsqueeze(dim=1), dim=-1)
175 rew_pole_vel = rew_scale_pole_vel * torch.sum(torch.abs(pole_vel).unsqueeze(dim=1), dim=-1)
176 total_reward = rew_alive + rew_termination + rew_pole_pos + rew_cart_vel + rew_pole_vel
177 return total_reward
The Code Explained#
Similar to the manager-based environments, a configuration class is defined for the task to hold settings
for the simulation parameters, the scene, the actors, and the task. With the direct workflow implementation,
the envs.DirectRLEnvCfg
class is used as the base class for configurations.
Since the direct workflow implementation does not use Action and Observation managers, the task
config should define the number of actions and observations for the environment.
@configclass
class CartpoleEnvCfg(DirectRLEnvCfg):
...
action_space = 1
observation_space = 4
state_space = 0
The config class can also be used to define task-specific attributes, such as scaling for reward terms and thresholds for reset conditions.
@configclass
class CartpoleEnvCfg(DirectRLEnvCfg):
...
# reset
max_cart_pos = 3.0
initial_pole_angle_range = [-0.25, 0.25]
# reward scales
rew_scale_alive = 1.0
rew_scale_terminated = -2.0
rew_scale_pole_pos = -1.0
rew_scale_cart_vel = -0.01
rew_scale_pole_vel = -0.005
When creating a new environment, the code should define a new class that inherits from DirectRLEnv
.
class CartpoleEnv(DirectRLEnv):
cfg: CartpoleEnvCfg
def __init__(self, cfg: CartpoleEnvCfg, render_mode: str | None = None, **kwargs):
super().__init__(cfg, render_mode, **kwargs)
The class can also hold class variables that are accessible by all functions in the class, including functions for applying actions, computing resets, rewards, and observations.
Scene Creation#
In contrast to manager-based environments where the scene creation is taken care of by the framework,
the direct workflow implementation provides flexibility for users to implement their own scene creation
function. This includes adding actors into the stage, cloning the environments, filtering collisions
between the environments, adding the actors into the scene, and adding any additional props to the
scene, such as ground plane and lights. These operations should be implemented in the
_setup_scene(self)
method.
def _setup_scene(self):
self.cartpole = Articulation(self.cfg.robot_cfg)
# add ground plane
spawn_ground_plane(prim_path="/World/ground", cfg=GroundPlaneCfg())
# clone and replicate
self.scene.clone_environments(copy_from_source=False)
# we need to explicitly filter collisions for CPU simulation
if self.device == "cpu":
self.scene.filter_collisions(global_prim_paths=[])
# add articulation to scene
self.scene.articulations["cartpole"] = self.cartpole
# add lights
light_cfg = sim_utils.DomeLightCfg(intensity=2000.0, color=(0.75, 0.75, 0.75))
light_cfg.func("/World/Light", light_cfg)
Defining Rewards#
Reward function should be defined in the _get_rewards(self)
API, which returns the reward
buffer as a return value. Within this function, the task is free to implement the logic of
the reward function. In this example, we implement a Pytorch JIT function that computes
the various components of the reward function.
def _get_rewards(self) -> torch.Tensor:
total_reward = compute_rewards(
self.cfg.rew_scale_alive,
self.cfg.rew_scale_terminated,
self.cfg.rew_scale_pole_pos,
self.cfg.rew_scale_cart_vel,
self.cfg.rew_scale_pole_vel,
self.joint_pos[:, self._pole_dof_idx[0]],
self.joint_vel[:, self._pole_dof_idx[0]],
self.joint_pos[:, self._cart_dof_idx[0]],
self.joint_vel[:, self._cart_dof_idx[0]],
self.reset_terminated,
)
return total_reward
@torch.jit.script
def compute_rewards(
rew_scale_alive: float,
rew_scale_terminated: float,
rew_scale_pole_pos: float,
rew_scale_cart_vel: float,
rew_scale_pole_vel: float,
pole_pos: torch.Tensor,
pole_vel: torch.Tensor,
cart_pos: torch.Tensor,
cart_vel: torch.Tensor,
reset_terminated: torch.Tensor,
):
rew_alive = rew_scale_alive * (1.0 - reset_terminated.float())
rew_termination = rew_scale_terminated * reset_terminated.float()
rew_pole_pos = rew_scale_pole_pos * torch.sum(torch.square(pole_pos), dim=-1)
rew_cart_vel = rew_scale_cart_vel * torch.sum(torch.abs(cart_vel), dim=-1)
rew_pole_vel = rew_scale_pole_vel * torch.sum(torch.abs(pole_vel), dim=-1)
total_reward = rew_alive + rew_termination + rew_pole_pos + rew_cart_vel + rew_pole_vel
return total_reward
Defining Observations#
The observation buffer should be computed in the _get_observations(self)
function,
which constructs the observation buffer for the environment. At the end of this API,
a dictionary should be returned that contains policy
as the key, and the full
observation buffer as the value. For asymmetric policies, the dictionary should also
include the key critic
and the states buffer as the value.
def _get_observations(self) -> dict:
obs = torch.cat(
(
self.joint_pos[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
self.joint_vel[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
self.joint_pos[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
self.joint_vel[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
),
dim=-1,
)
observations = {"policy": obs}
return observations
Computing Dones and Performing Resets#
Populating the dones
buffer should be done in the _get_dones(self)
method.
This method is free to implement logic that computes which environments would need to be reset
and which environments have reached the episode length limit. Both results should be
returned by the _get_dones(self)
function, in the form of a tuple of boolean tensors.
def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
self.joint_pos = self.cartpole.data.joint_pos
self.joint_vel = self.cartpole.data.joint_vel
time_out = self.episode_length_buf >= self.max_episode_length - 1
out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
return out_of_bounds, time_out
Once the indices for environments requiring reset have been computed, the _reset_idx(self, env_ids)
function performs the reset operations on those environments. Within this function, new states
for the environments requiring reset should be set directly into simulation.
def _reset_idx(self, env_ids: Sequence[int] | None):
if env_ids is None:
env_ids = self.cartpole._ALL_INDICES
super()._reset_idx(env_ids)
joint_pos = self.cartpole.data.default_joint_pos[env_ids]
joint_pos[:, self._pole_dof_idx] += sample_uniform(
self.cfg.initial_pole_angle_range[0] * math.pi,
self.cfg.initial_pole_angle_range[1] * math.pi,
joint_pos[:, self._pole_dof_idx].shape,
joint_pos.device,
)
joint_vel = self.cartpole.data.default_joint_vel[env_ids]
default_root_state = self.cartpole.data.default_root_state[env_ids]
default_root_state[:, :3] += self.scene.env_origins[env_ids]
self.joint_pos[env_ids] = joint_pos
self.joint_vel[env_ids] = joint_vel
self.cartpole.write_root_pose_to_sim(default_root_state[:, :7], env_ids)
self.cartpole.write_root_velocity_to_sim(default_root_state[:, 7:], env_ids)
self.cartpole.write_joint_state_to_sim(joint_pos, joint_vel, None, env_ids)
Applying Actions#
There are two APIs that are designed for working with actions. The _pre_physics_step(self, actions)
takes in actions
from the policy as an argument and is called once per RL step, prior to taking any physics steps. This function can
be used to process the actions buffer from the policy and cache the data in a class variable for the environment.
def _pre_physics_step(self, actions: torch.Tensor) -> None:
self.actions = self.action_scale * actions.clone()
The _apply_action(self)
API is called decimation
number of times for each RL step, prior to taking
each physics step. This provides more flexibility for environments where actions should be applied
for each physics step.
def _apply_action(self) -> None:
self.cartpole.set_joint_effort_target(self.actions, joint_ids=self._cart_dof_idx)
The Code Execution#
To run training for the direct workflow Cartpole environment, we can use the following command:
./isaaclab.sh -p scripts/reinforcement_learning/rl_games/train.py --task=Isaac-Cartpole-Direct-v0

All direct workflow tasks have the suffix -Direct
added to the task name to differentiate the implementation style.
Domain Randomization#
In the direct workflow, domain randomization configuration uses the configclass
module
to specify a configuration class consisting of EventTermCfg
variables.
Below is an example of a configuration class for domain randomization:
@configclass
class EventCfg:
robot_physics_material = EventTerm(
func=mdp.randomize_rigid_body_material,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", body_names=".*"),
"static_friction_range": (0.7, 1.3),
"dynamic_friction_range": (1.0, 1.0),
"restitution_range": (1.0, 1.0),
"num_buckets": 250,
},
)
robot_joint_stiffness_and_damping = EventTerm(
func=mdp.randomize_actuator_gains,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", joint_names=".*"),
"stiffness_distribution_params": (0.75, 1.5),
"damping_distribution_params": (0.3, 3.0),
"operation": "scale",
"distribution": "log_uniform",
},
)
reset_gravity = EventTerm(
func=mdp.randomize_physics_scene_gravity,
mode="interval",
is_global_time=True,
interval_range_s=(36.0, 36.0), # time_s = num_steps * (decimation * dt)
params={
"gravity_distribution_params": ([0.0, 0.0, 0.0], [0.0, 0.0, 0.4]),
"operation": "add",
"distribution": "gaussian",
},
)
Each EventTerm
object is of the EventTermCfg
class and takes in a func
parameter
for specifying the function to call during randomization, a mode
parameter, which can be startup
,
reset
or interval
. THe params
dictionary should provide the necessary arguments to the
function that is specified in the func
parameter.
Functions specified as func
for the EventTerm
can be found in the events
module.
Note that as part of the "asset_cfg": SceneEntityCfg("robot", body_names=".*")
parameter, the name of
the actor "robot"
is provided, along with the body or joint names specified as a regex expression,
which will be the actors and bodies/joints that will have randomization applied.
Once the configclass
for the randomization terms have been set up, the class must be added
to the base config class for the task and be assigned to the variable events
.
@configclass
class MyTaskConfig:
events: EventCfg = EventCfg()
Action and Observation Noise#
Actions and observation noise can also be added using the configclass
module.
Action and observation noise configs must be added to the main task config using the
action_noise_model
and observation_noise_model
variables:
@configclass
class MyTaskConfig:
# at every time-step add gaussian noise + bias. The bias is a gaussian sampled at reset
action_noise_model: NoiseModelWithAdditiveBiasCfg = NoiseModelWithAdditiveBiasCfg(
noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.05, operation="add"),
bias_noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.015, operation="abs"),
)
# at every time-step add gaussian noise + bias. The bias is a gaussian sampled at reset
observation_noise_model: NoiseModelWithAdditiveBiasCfg = NoiseModelWithAdditiveBiasCfg(
noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.002, operation="add"),
bias_noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.0001, operation="abs"),
)
NoiseModelWithAdditiveBiasCfg
can be used to sample both uncorrelated noise
per step as well as correlated noise that is re-sampled at reset time.
The noise_cfg
term specifies the Gaussian distribution that will be sampled at each
step for all environments. This noise will be added to the corresponding actions and
observations buffers at every step.
The bias_noise_cfg
term specifies the Gaussian distribution for the correlated noise
that will be sampled at reset time for the environments being reset. The same noise
will be applied each step for the remaining of the episode for the environments and
resampled at the next reset.
If only per-step noise is desired, GaussianNoiseCfg
can be used
to specify an additive Gaussian distribution that adds the sampled noise to the input buffer.
@configclass
class MyTaskConfig:
action_noise_model: GaussianNoiseCfg = GaussianNoiseCfg(mean=0.0, std=0.05, operation="add")
In this tutorial, we learnt how to create a direct workflow task environment for reinforcement learning. We do this by extending the base environment to include the scene setup, actions, dones, reset, reward and observaion functions.
While it is possible to manually create an instance of DirectRLEnv
class for a desired task,
this is not scalable as it requires specialized scripts for each task. Thus, we exploit the
gymnasium.make()
function to create the environment with the gym interface. We will learn how to do this
in the next tutorial.