Creating a Direct Workflow RL Environment#
In addition to the envs.ManagerBasedRLEnv
class, which encourages the use of configuration classes
for more modular environments, the DirectRLEnv
class allows for more direct control
in the scripting of environment.
Instead of using Manager classes for defining rewards and observations, the direct workflow tasks implement the full reward and observation functions directly in the task script. This allows for more control in the implementation of the methods, such as using pytorch jit features, and provides a less abstracted framework that makes it easier to find the various pieces of code.
In this tutorial, we will configure the cartpole environment using the direct workflow implementation to create a task for balancing the pole upright. We will learn how to specify the task using by implementing functions for scene creation, actions, resets, rewards and observations.
The Code#
For this tutorial, we use the cartpole environment defined in isaaclab_tasks.direct.cartpole
module.
Code for cartpole_env.py
1# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
2# All rights reserved.
3#
4# SPDX-License-Identifier: BSD-3-Clause
5
6from __future__ import annotations
7
8import math
9import torch
10from collections.abc import Sequence
11
12from isaaclab_assets.robots.cartpole import CARTPOLE_CFG
13
14import isaaclab.sim as sim_utils
15from isaaclab.assets import Articulation, ArticulationCfg
16from isaaclab.envs import DirectRLEnv, DirectRLEnvCfg
17from isaaclab.scene import InteractiveSceneCfg
18from isaaclab.sim import SimulationCfg
19from isaaclab.sim.spawners.from_files import GroundPlaneCfg, spawn_ground_plane
20from isaaclab.utils import configclass
21from isaaclab.utils.math import sample_uniform
22
23
24@configclass
25class CartpoleEnvCfg(DirectRLEnvCfg):
26 # env
27 decimation = 2
28 episode_length_s = 5.0
29 action_scale = 100.0 # [N]
30 action_space = 1
31 observation_space = 4
32 state_space = 0
33
34 # simulation
35 sim: SimulationCfg = SimulationCfg(dt=1 / 120, render_interval=decimation)
36
37 # robot
38 robot_cfg: ArticulationCfg = CARTPOLE_CFG.replace(prim_path="/World/envs/env_.*/Robot")
39 cart_dof_name = "slider_to_cart"
40 pole_dof_name = "cart_to_pole"
41
42 # scene
43 scene: InteractiveSceneCfg = InteractiveSceneCfg(
44 num_envs=4096, env_spacing=4.0, replicate_physics=True, clone_in_fabric=True
45 )
46
47 # reset
48 max_cart_pos = 3.0 # the cart is reset if it exceeds that position [m]
49 initial_pole_angle_range = [-0.25, 0.25] # the range in which the pole angle is sampled from on reset [rad]
50
51 # reward scales
52 rew_scale_alive = 1.0
53 rew_scale_terminated = -2.0
54 rew_scale_pole_pos = -1.0
55 rew_scale_cart_vel = -0.01
56 rew_scale_pole_vel = -0.005
57
58
59class CartpoleEnv(DirectRLEnv):
60 cfg: CartpoleEnvCfg
61
62 def __init__(self, cfg: CartpoleEnvCfg, render_mode: str | None = None, **kwargs):
63 super().__init__(cfg, render_mode, **kwargs)
64
65 self._cart_dof_idx, _ = self.cartpole.find_joints(self.cfg.cart_dof_name)
66 self._pole_dof_idx, _ = self.cartpole.find_joints(self.cfg.pole_dof_name)
67 self.action_scale = self.cfg.action_scale
68
69 self.joint_pos = self.cartpole.data.joint_pos
70 self.joint_vel = self.cartpole.data.joint_vel
71
72 def _setup_scene(self):
73 self.cartpole = Articulation(self.cfg.robot_cfg)
74 # add ground plane
75 spawn_ground_plane(prim_path="/World/ground", cfg=GroundPlaneCfg())
76 # clone and replicate
77 self.scene.clone_environments(copy_from_source=False)
78 # we need to explicitly filter collisions for CPU simulation
79 if self.device == "cpu":
80 self.scene.filter_collisions(global_prim_paths=[])
81 # add articulation to scene
82 self.scene.articulations["cartpole"] = self.cartpole
83 # add lights
84 light_cfg = sim_utils.DomeLightCfg(intensity=2000.0, color=(0.75, 0.75, 0.75))
85 light_cfg.func("/World/Light", light_cfg)
86
87 def _pre_physics_step(self, actions: torch.Tensor) -> None:
88 self.actions = self.action_scale * actions.clone()
89
90 def _apply_action(self) -> None:
91 self.cartpole.set_joint_effort_target(self.actions, joint_ids=self._cart_dof_idx)
92
93 def _get_observations(self) -> dict:
94 obs = torch.cat(
95 (
96 self.joint_pos[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
97 self.joint_vel[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
98 self.joint_pos[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
99 self.joint_vel[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
100 ),
101 dim=-1,
102 )
103 observations = {"policy": obs}
104 return observations
105
106 def _get_rewards(self) -> torch.Tensor:
107 total_reward = compute_rewards(
108 self.cfg.rew_scale_alive,
109 self.cfg.rew_scale_terminated,
110 self.cfg.rew_scale_pole_pos,
111 self.cfg.rew_scale_cart_vel,
112 self.cfg.rew_scale_pole_vel,
113 self.joint_pos[:, self._pole_dof_idx[0]],
114 self.joint_vel[:, self._pole_dof_idx[0]],
115 self.joint_pos[:, self._cart_dof_idx[0]],
116 self.joint_vel[:, self._cart_dof_idx[0]],
117 self.reset_terminated,
118 )
119 return total_reward
120
121 def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
122 self.joint_pos = self.cartpole.data.joint_pos
123 self.joint_vel = self.cartpole.data.joint_vel
124
125 time_out = self.episode_length_buf >= self.max_episode_length - 1
126 out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
127 out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
128 return out_of_bounds, time_out
129
130 def _reset_idx(self, env_ids: Sequence[int] | None):
131 if env_ids is None:
132 env_ids = self.cartpole._ALL_INDICES
133 super()._reset_idx(env_ids)
134
135 joint_pos = self.cartpole.data.default_joint_pos[env_ids]
136 joint_pos[:, self._pole_dof_idx] += sample_uniform(
137 self.cfg.initial_pole_angle_range[0] * math.pi,
138 self.cfg.initial_pole_angle_range[1] * math.pi,
139 joint_pos[:, self._pole_dof_idx].shape,
140 joint_pos.device,
141 )
142 joint_vel = self.cartpole.data.default_joint_vel[env_ids]
143
144 default_root_state = self.cartpole.data.default_root_state[env_ids]
145 default_root_state[:, :3] += self.scene.env_origins[env_ids]
146
147 self.joint_pos[env_ids] = joint_pos
148 self.joint_vel[env_ids] = joint_vel
149
150 self.cartpole.write_root_pose_to_sim(default_root_state[:, :7], env_ids)
151 self.cartpole.write_root_velocity_to_sim(default_root_state[:, 7:], env_ids)
152 self.cartpole.write_joint_state_to_sim(joint_pos, joint_vel, None, env_ids)
153
154
155@torch.jit.script
156def compute_rewards(
157 rew_scale_alive: float,
158 rew_scale_terminated: float,
159 rew_scale_pole_pos: float,
160 rew_scale_cart_vel: float,
161 rew_scale_pole_vel: float,
162 pole_pos: torch.Tensor,
163 pole_vel: torch.Tensor,
164 cart_pos: torch.Tensor,
165 cart_vel: torch.Tensor,
166 reset_terminated: torch.Tensor,
167):
168 rew_alive = rew_scale_alive * (1.0 - reset_terminated.float())
169 rew_termination = rew_scale_terminated * reset_terminated.float()
170 rew_pole_pos = rew_scale_pole_pos * torch.sum(torch.square(pole_pos).unsqueeze(dim=1), dim=-1)
171 rew_cart_vel = rew_scale_cart_vel * torch.sum(torch.abs(cart_vel).unsqueeze(dim=1), dim=-1)
172 rew_pole_vel = rew_scale_pole_vel * torch.sum(torch.abs(pole_vel).unsqueeze(dim=1), dim=-1)
173 total_reward = rew_alive + rew_termination + rew_pole_pos + rew_cart_vel + rew_pole_vel
174 return total_reward
The Code Explained#
Similar to the manager-based environments, a configuration class is defined for the task to hold settings
for the simulation parameters, the scene, the actors, and the task. With the direct workflow implementation,
the envs.DirectRLEnvCfg
class is used as the base class for configurations.
Since the direct workflow implementation does not use Action and Observation managers, the task
config should define the number of actions and observations for the environment.
@configclass
class CartpoleEnvCfg(DirectRLEnvCfg):
...
action_space = 1
observation_space = 4
state_space = 0
The config class can also be used to define task-specific attributes, such as scaling for reward terms and thresholds for reset conditions.
@configclass
class CartpoleEnvCfg(DirectRLEnvCfg):
...
# reset
max_cart_pos = 3.0
initial_pole_angle_range = [-0.25, 0.25]
# reward scales
rew_scale_alive = 1.0
rew_scale_terminated = -2.0
rew_scale_pole_pos = -1.0
rew_scale_cart_vel = -0.01
rew_scale_pole_vel = -0.005
When creating a new environment, the code should define a new class that inherits from DirectRLEnv
.
class CartpoleEnv(DirectRLEnv):
cfg: CartpoleEnvCfg
def __init__(self, cfg: CartpoleEnvCfg, render_mode: str | None = None, **kwargs):
super().__init__(cfg, render_mode, **kwargs)
The class can also hold class variables that are accessible by all functions in the class, including functions for applying actions, computing resets, rewards, and observations.
Scene Creation#
In contrast to manager-based environments where the scene creation is taken care of by the framework,
the direct workflow implementation provides flexibility for users to implement their own scene creation
function. This includes adding actors into the stage, cloning the environments, filtering collisions
between the environments, adding the actors into the scene, and adding any additional props to the
scene, such as ground plane and lights. These operations should be implemented in the
_setup_scene(self)
method.
def _setup_scene(self):
self.cartpole = Articulation(self.cfg.robot_cfg)
# add ground plane
spawn_ground_plane(prim_path="/World/ground", cfg=GroundPlaneCfg())
# clone and replicate
self.scene.clone_environments(copy_from_source=False)
# we need to explicitly filter collisions for CPU simulation
if self.device == "cpu":
self.scene.filter_collisions(global_prim_paths=[])
# add articulation to scene
self.scene.articulations["cartpole"] = self.cartpole
# add lights
light_cfg = sim_utils.DomeLightCfg(intensity=2000.0, color=(0.75, 0.75, 0.75))
light_cfg.func("/World/Light", light_cfg)
Defining Rewards#
Reward function should be defined in the _get_rewards(self)
API, which returns the reward
buffer as a return value. Within this function, the task is free to implement the logic of
the reward function. In this example, we implement a Pytorch JIT function that computes
the various components of the reward function.
def _get_rewards(self) -> torch.Tensor:
total_reward = compute_rewards(
self.cfg.rew_scale_alive,
self.cfg.rew_scale_terminated,
self.cfg.rew_scale_pole_pos,
self.cfg.rew_scale_cart_vel,
self.cfg.rew_scale_pole_vel,
self.joint_pos[:, self._pole_dof_idx[0]],
self.joint_vel[:, self._pole_dof_idx[0]],
self.joint_pos[:, self._cart_dof_idx[0]],
self.joint_vel[:, self._cart_dof_idx[0]],
self.reset_terminated,
)
return total_reward
@torch.jit.script
def compute_rewards(
rew_scale_alive: float,
rew_scale_terminated: float,
rew_scale_pole_pos: float,
rew_scale_cart_vel: float,
rew_scale_pole_vel: float,
pole_pos: torch.Tensor,
pole_vel: torch.Tensor,
cart_pos: torch.Tensor,
cart_vel: torch.Tensor,
reset_terminated: torch.Tensor,
):
rew_alive = rew_scale_alive * (1.0 - reset_terminated.float())
rew_termination = rew_scale_terminated * reset_terminated.float()
rew_pole_pos = rew_scale_pole_pos * torch.sum(torch.square(pole_pos), dim=-1)
rew_cart_vel = rew_scale_cart_vel * torch.sum(torch.abs(cart_vel), dim=-1)
rew_pole_vel = rew_scale_pole_vel * torch.sum(torch.abs(pole_vel), dim=-1)
total_reward = rew_alive + rew_termination + rew_pole_pos + rew_cart_vel + rew_pole_vel
return total_reward
Defining Observations#
The observation buffer should be computed in the _get_observations(self)
function,
which constructs the observation buffer for the environment. At the end of this API,
a dictionary should be returned that contains policy
as the key, and the full
observation buffer as the value. For asymmetric policies, the dictionary should also
include the key critic
and the states buffer as the value.
def _get_observations(self) -> dict:
obs = torch.cat(
(
self.joint_pos[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
self.joint_vel[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
self.joint_pos[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
self.joint_vel[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
),
dim=-1,
)
observations = {"policy": obs}
return observations
Computing Dones and Performing Resets#
Populating the dones
buffer should be done in the _get_dones(self)
method.
This method is free to implement logic that computes which environments would need to be reset
and which environments have reached the episode length limit. Both results should be
returned by the _get_dones(self)
function, in the form of a tuple of boolean tensors.
def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
self.joint_pos = self.cartpole.data.joint_pos
self.joint_vel = self.cartpole.data.joint_vel
time_out = self.episode_length_buf >= self.max_episode_length - 1
out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
return out_of_bounds, time_out
Once the indices for environments requiring reset have been computed, the _reset_idx(self, env_ids)
function performs the reset operations on those environments. Within this function, new states
for the environments requiring reset should be set directly into simulation.
def _reset_idx(self, env_ids: Sequence[int] | None):
if env_ids is None:
env_ids = self.cartpole._ALL_INDICES
super()._reset_idx(env_ids)
joint_pos = self.cartpole.data.default_joint_pos[env_ids]
joint_pos[:, self._pole_dof_idx] += sample_uniform(
self.cfg.initial_pole_angle_range[0] * math.pi,
self.cfg.initial_pole_angle_range[1] * math.pi,
joint_pos[:, self._pole_dof_idx].shape,
joint_pos.device,
)
joint_vel = self.cartpole.data.default_joint_vel[env_ids]
default_root_state = self.cartpole.data.default_root_state[env_ids]
default_root_state[:, :3] += self.scene.env_origins[env_ids]
self.joint_pos[env_ids] = joint_pos
self.joint_vel[env_ids] = joint_vel
self.cartpole.write_root_pose_to_sim(default_root_state[:, :7], env_ids)
self.cartpole.write_root_velocity_to_sim(default_root_state[:, 7:], env_ids)
self.cartpole.write_joint_state_to_sim(joint_pos, joint_vel, None, env_ids)
Applying Actions#
There are two APIs that are designed for working with actions. The _pre_physics_step(self, actions)
takes in actions
from the policy as an argument and is called once per RL step, prior to taking any physics steps. This function can
be used to process the actions buffer from the policy and cache the data in a class variable for the environment.
def _pre_physics_step(self, actions: torch.Tensor) -> None:
self.actions = self.action_scale * actions.clone()
The _apply_action(self)
API is called decimation
number of times for each RL step, prior to taking
each physics step. This provides more flexibility for environments where actions should be applied
for each physics step.
def _apply_action(self) -> None:
self.cartpole.set_joint_effort_target(self.actions, joint_ids=self._cart_dof_idx)
The Code Execution#
To run training for the direct workflow Cartpole environment, we can use the following command:
./isaaclab.sh -p scripts/reinforcement_learning/rl_games/train.py --task=Isaac-Cartpole-Direct-v0

All direct workflow tasks have the suffix -Direct
added to the task name to differentiate the implementation style.
Domain Randomization#
In the direct workflow, domain randomization configuration uses the configclass
module
to specify a configuration class consisting of EventTermCfg
variables.
Below is an example of a configuration class for domain randomization:
@configclass
class EventCfg:
robot_physics_material = EventTerm(
func=mdp.randomize_rigid_body_material,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", body_names=".*"),
"static_friction_range": (0.7, 1.3),
"dynamic_friction_range": (1.0, 1.0),
"restitution_range": (1.0, 1.0),
"num_buckets": 250,
},
)
robot_joint_stiffness_and_damping = EventTerm(
func=mdp.randomize_actuator_gains,
mode="reset",
params={
"asset_cfg": SceneEntityCfg("robot", joint_names=".*"),
"stiffness_distribution_params": (0.75, 1.5),
"damping_distribution_params": (0.3, 3.0),
"operation": "scale",
"distribution": "log_uniform",
},
)
reset_gravity = EventTerm(
func=mdp.randomize_physics_scene_gravity,
mode="interval",
is_global_time=True,
interval_range_s=(36.0, 36.0), # time_s = num_steps * (decimation * dt)
params={
"gravity_distribution_params": ([0.0, 0.0, 0.0], [0.0, 0.0, 0.4]),
"operation": "add",
"distribution": "gaussian",
},
)
Each EventTerm
object is of the EventTermCfg
class and takes in a func
parameter
for specifying the function to call during randomization, a mode
parameter, which can be startup
,
reset
or interval
. THe params
dictionary should provide the necessary arguments to the
function that is specified in the func
parameter.
Functions specified as func
for the EventTerm
can be found in the events
module.
Note that as part of the "asset_cfg": SceneEntityCfg("robot", body_names=".*")
parameter, the name of
the actor "robot"
is provided, along with the body or joint names specified as a regex expression,
which will be the actors and bodies/joints that will have randomization applied.
Once the configclass
for the randomization terms have been set up, the class must be added
to the base config class for the task and be assigned to the variable events
.
@configclass
class MyTaskConfig:
events: EventCfg = EventCfg()
Action and Observation Noise#
Actions and observation noise can also be added using the configclass
module.
Action and observation noise configs must be added to the main task config using the
action_noise_model
and observation_noise_model
variables:
@configclass
class MyTaskConfig:
# at every time-step add gaussian noise + bias. The bias is a gaussian sampled at reset
action_noise_model: NoiseModelWithAdditiveBiasCfg = NoiseModelWithAdditiveBiasCfg(
noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.05, operation="add"),
bias_noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.015, operation="abs"),
)
# at every time-step add gaussian noise + bias. The bias is a gaussian sampled at reset
observation_noise_model: NoiseModelWithAdditiveBiasCfg = NoiseModelWithAdditiveBiasCfg(
noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.002, operation="add"),
bias_noise_cfg=GaussianNoiseCfg(mean=0.0, std=0.0001, operation="abs"),
)
NoiseModelWithAdditiveBiasCfg
can be used to sample both uncorrelated noise
per step as well as correlated noise that is re-sampled at reset time.
The noise_cfg
term specifies the Gaussian distribution that will be sampled at each
step for all environments. This noise will be added to the corresponding actions and
observations buffers at every step.
The bias_noise_cfg
term specifies the Gaussian distribution for the correlated noise
that will be sampled at reset time for the environments being reset. The same noise
will be applied each step for the remaining of the episode for the environments and
resampled at the next reset.
If only per-step noise is desired, GaussianNoiseCfg
can be used
to specify an additive Gaussian distribution that adds the sampled noise to the input buffer.
@configclass
class MyTaskConfig:
action_noise_model: GaussianNoiseCfg = GaussianNoiseCfg(mean=0.0, std=0.05, operation="add")
In this tutorial, we learnt how to create a direct workflow task environment for reinforcement learning. We do this by extending the base environment to include the scene setup, actions, dones, reset, reward and observaion functions.
While it is possible to manually create an instance of DirectRLEnv
class for a desired task,
this is not scalable as it requires specialized scripts for each task. Thus, we exploit the
gymnasium.make()
function to create the environment with the gym interface. We will learn how to do this
in the next tutorial.