Source code for isaaclab_contrib.mdp.actions.thrust_actions

# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import annotations

import logging
from collections.abc import Sequence
from typing import TYPE_CHECKING

import torch

import isaaclab.utils.string as string_utils
from isaaclab.managers.action_manager import ActionTerm

from isaaclab_contrib.assets import Multirotor

if TYPE_CHECKING:
    from isaaclab.envs import ManagerBasedEnv
    from isaaclab.envs.utils.io_descriptors import GenericActionIODescriptor

    from . import thrust_actions_cfg

# import logger
logger = logging.getLogger(__name__)


[docs]class ThrustAction(ActionTerm): """Thrust action term that applies the processed actions as thrust commands. This action term is designed specifically for controlling multirotor vehicles by mapping action inputs to thruster commands. It provides flexible preprocessing of actions through: - **Scaling**: Multiply actions by a scale factor to adjust command magnitudes - **Offset**: Add an offset to center actions around a baseline (e.g., hover thrust) - **Clipping**: Constrain actions to valid ranges to prevent unsafe commands The action term integrates with Isaac Lab's :class:`~isaaclab.managers.ActionManager` framework and is specifically designed to work with :class:`~isaaclab_contrib.assets.Multirotor` assets. Key Features: - Supports per-thruster or uniform scaling and offsets - Optional automatic offset computation based on hover thrust - Action clipping for safety and constraint enforcement - Regex-based thruster selection for flexible control schemes Example: .. code-block:: python from isaaclab.envs import ManagerBasedRLEnvCfg from isaaclab_contrib.mdp.actions import ThrustActionCfg @configclass class MyEnvCfg(ManagerBasedRLEnvCfg): # ... other configuration ... @configclass class ActionsCfg: # Direct thrust control (normalized actions) thrust = ThrustActionCfg( asset_name="robot", scale=5.0, # Convert [-1, 1] to [-5, 5] N use_default_offset=True, # Add hover thrust as offset clip={".*": (-2.0, 8.0)}, # Clip to safe thrust range ) """ cfg: thrust_actions_cfg.ThrustActionCfg """The configuration of the action term.""" _asset: Multirotor """The articulation asset on which the action term is applied.""" _scale: torch.Tensor | float """The scaling factor applied to the input action.""" _offset: torch.Tensor | float """The offset applied to the input action.""" _clip: torch.Tensor """The clip applied to the input action."""
[docs] def __init__(self, cfg: thrust_actions_cfg.ThrustActionCfg, env: ManagerBasedEnv) -> None: # initialize the action term super().__init__(cfg, env) thruster_names_expr = self._asset.actuators["thrusters"].cfg.thruster_names_expr # resolve the thrusters over which the action term is applied self._thruster_ids, self._thruster_names = self._asset.find_bodies( thruster_names_expr, preserve_order=self.cfg.preserve_order ) self._num_thrusters = len(self._thruster_ids) # log the resolved thruster names for debugging logger.info( f"Resolved thruster names for the action term {self.__class__.__name__}:" f" {self._thruster_names} [{self._thruster_ids}]" ) # Avoid indexing across all thrusters for efficiency if self._num_thrusters == self._asset.num_thrusters and not self.cfg.preserve_order: self._thruster_ids = slice(None) # create tensors for raw and processed actions self._raw_actions = torch.zeros(self.num_envs, self.action_dim, device=self.device) self._processed_actions = torch.zeros_like(self.raw_actions) # parse scale if isinstance(cfg.scale, (float, int)): self._scale = float(cfg.scale) elif isinstance(cfg.scale, dict): self._scale = torch.ones(self.num_envs, self.action_dim, device=self.device) # resolve the dictionary config index_list, _, value_list = string_utils.resolve_matching_names_values(self.cfg.scale, self._thruster_names) self._scale[:, index_list] = torch.tensor(value_list, device=self.device) else: raise ValueError(f"Unsupported scale type: {type(cfg.scale)}. Supported types are float and dict.") # parse offset if isinstance(cfg.offset, (float, int)): self._offset = float(cfg.offset) elif isinstance(cfg.offset, dict): self._offset = torch.zeros_like(self._raw_actions) # resolve the dictionary config index_list, _, value_list = string_utils.resolve_matching_names_values( self.cfg.offset, self._thruster_names ) self._offset[:, index_list] = torch.tensor(value_list, device=self.device) else: raise ValueError(f"Unsupported offset type: {type(cfg.offset)}. Supported types are float and dict.") # parse clip if cfg.clip is not None: if isinstance(cfg.clip, dict): self._clip = torch.tensor([[-float("inf"), float("inf")]], device=self.device).repeat( self.num_envs, self.action_dim, 1 ) index_list, _, value_list = string_utils.resolve_matching_names_values( self.cfg.clip, self._thruster_names ) self._clip[:, index_list] = torch.tensor(value_list, device=self.device) else: raise ValueError(f"Unsupported clip type: {type(cfg.clip)}. Supported types are dict.") # Handle use_default_offset if cfg.use_default_offset: # Use default thruster RPS as offset self._offset = self._asset.data.default_thruster_rps[:, self._thruster_ids].clone()
""" Properties """ @property def action_dim(self) -> int: return self._num_thrusters @property def raw_actions(self) -> torch.Tensor: return self._raw_actions @property def processed_actions(self) -> torch.Tensor: return self._processed_actions @property def IO_descriptor(self) -> GenericActionIODescriptor: """The IO descriptor of the action term.""" super().IO_descriptor self._IO_descriptor.shape = (self.action_dim,) self._IO_descriptor.dtype = str(self.raw_actions.dtype) self._IO_descriptor.action_type = "ThrustAction" self._IO_descriptor.thruster_names = self._thruster_names self._IO_descriptor.scale = self._scale if isinstance(self._offset, torch.Tensor): self._IO_descriptor.offset = self._offset[0].detach().cpu().numpy().tolist() else: self._IO_descriptor.offset = self._offset if self.cfg.clip is not None: if isinstance(self._clip, torch.Tensor): self._IO_descriptor.clip = self._clip[0].detach().cpu().numpy().tolist() else: self._IO_descriptor.clip = self._clip else: self._IO_descriptor.clip = None return self._IO_descriptor """ Methods """
[docs] def reset(self, env_ids: Sequence[int] | None = None) -> None: """Reset the action term. This method resets the raw actions to zero for the specified environments. The processed actions will be recomputed during the next :meth:`process_actions` call. Args: env_ids: Environment indices to reset. Defaults to None (all environments). """ self._raw_actions[env_ids] = 0.0
[docs] def process_actions(self, actions: torch.Tensor): r"""Process actions by applying scaling, offset, and clipping. This method transforms raw policy actions into thrust commands through an affine transformation followed by optional clipping. The transformation is: .. math:: \text{processed} = \text{raw} \times \text{scale} + \text{offset} If clipping is configured, the processed actions are then clamped: .. math:: \text{processed} = \text{clamp}(\text{processed}, \text{min}, \text{max}) Args: actions: Raw action tensor from the policy. Shape is ``(num_envs, action_dim)``. Typically in the range [-1, 1] for normalized policies. Note: The processed actions are stored internally and applied during the next :meth:`apply_actions` call. """ # store the raw actions self._raw_actions[:] = actions # apply the affine transformations self._processed_actions = self._raw_actions * self._scale + self._offset # clip actions if self.cfg.clip is not None: self._processed_actions = torch.clamp( self._processed_actions, min=self._clip[:, :, 0], max=self._clip[:, :, 1] )
[docs] def apply_actions(self): """Apply the processed actions as thrust commands. This method sets the processed actions as thrust targets on the multirotor asset. The thrust targets are then used by the thruster actuator models to compute actual thrust forces during the simulation step. The method calls :meth:`~isaaclab_contrib.assets.Multirotor.set_thrust_target` on the multirotor asset with the appropriate thruster IDs. """ # Set thrust targets using thruster IDs self._asset.set_thrust_target(self.processed_actions, thruster_ids=self._thruster_ids)