omni.isaac.lab.sensors.frame_transformer.frame_transformer 源代码
# Copyright (c) 2022-2025, The Isaac Lab Project Developers.
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import re
import torch
from collections.abc import Sequence
from typing import TYPE_CHECKING
import omni.log
import omni.physics.tensors.impl.api as physx
from pxr import UsdPhysics
import omni.isaac.lab.sim as sim_utils
from omni.isaac.lab.markers import VisualizationMarkers
from omni.isaac.lab.utils.math import (
combine_frame_transforms,
convert_quat,
is_identity_pose,
subtract_frame_transforms,
)
from ..sensor_base import SensorBase
from .frame_transformer_data import FrameTransformerData
if TYPE_CHECKING:
from .frame_transformer_cfg import FrameTransformerCfg
[文档]class FrameTransformer(SensorBase):
"""A sensor for reporting frame transforms.
This class provides an interface for reporting the transform of one or more frames (target frames)
with respect to another frame (source frame). The source frame is specified by the user as a prim path
(:attr:`FrameTransformerCfg.prim_path`) and the target frames are specified by the user as a list of
prim paths (:attr:`FrameTransformerCfg.target_frames`).
The source frame and target frames are assumed to be rigid bodies. The transform of the target frames
with respect to the source frame is computed by first extracting the transform of the source frame
and target frames from the physics engine and then computing the relative transform between the two.
Additionally, the user can specify an offset for the source frame and each target frame. This is useful
for specifying the transform of the desired frame with respect to the body's center of mass, for instance.
A common example of using this sensor is to track the position and orientation of the end effector of a
robotic manipulator. In this case, the source frame would be the body corresponding to the base frame of the
manipulator, and the target frame would be the body corresponding to the end effector. Since the end-effector is
typically a fictitious body, the user may need to specify an offset from the end-effector to the body of the
manipulator.
"""
cfg: FrameTransformerCfg
"""The configuration parameters."""
[文档] def __init__(self, cfg: FrameTransformerCfg):
"""Initializes the frame transformer object.
Args:
cfg: The configuration parameters.
"""
# initialize base class
super().__init__(cfg)
# Create empty variables for storing output data
self._data: FrameTransformerData = FrameTransformerData()
def __str__(self) -> str:
"""Returns: A string containing information about the instance."""
return (
f"FrameTransformer @ '{self.cfg.prim_path}': \n"
f"\ttracked body frames: {[self._source_frame_body_name] + self._target_frame_body_names} \n"
f"\tnumber of envs: {self._num_envs}\n"
f"\tsource body frame: {self._source_frame_body_name}\n"
f"\ttarget frames (count: {self._target_frame_names}): {len(self._target_frame_names)}\n"
)
"""
Properties
"""
@property
def data(self) -> FrameTransformerData:
# update sensors if needed
self._update_outdated_buffers()
# return the data
return self._data
"""
Operations
"""
[文档] def reset(self, env_ids: Sequence[int] | None = None):
# reset the timers and counters
super().reset(env_ids)
# resolve None
if env_ids is None:
env_ids = ...
"""
Implementation.
"""
def _initialize_impl(self):
super()._initialize_impl()
# resolve source frame offset
source_frame_offset_pos = torch.tensor(self.cfg.source_frame_offset.pos, device=self.device)
source_frame_offset_quat = torch.tensor(self.cfg.source_frame_offset.rot, device=self.device)
# Only need to perform offsetting of source frame if the position offsets is non-zero and rotation offset is
# not the identity quaternion for efficiency in _update_buffer_impl
self._apply_source_frame_offset = True
# Handle source frame offsets
if is_identity_pose(source_frame_offset_pos, source_frame_offset_quat):
omni.log.verbose(f"No offset application needed for source frame as it is identity: {self.cfg.prim_path}")
self._apply_source_frame_offset = False
else:
omni.log.verbose(f"Applying offset to source frame as it is not identity: {self.cfg.prim_path}")
# Store offsets as tensors (duplicating each env's offsets for ease of multiplication later)
self._source_frame_offset_pos = source_frame_offset_pos.unsqueeze(0).repeat(self._num_envs, 1)
self._source_frame_offset_quat = source_frame_offset_quat.unsqueeze(0).repeat(self._num_envs, 1)
# Keep track of mapping from the rigid body name to the desired frames and prim path, as there may be multiple frames
# based upon the same body name and we don't want to create unnecessary views
body_names_to_frames: dict[str, dict[str, set[str] | str]] = {}
# The offsets associated with each target frame
target_offsets: dict[str, dict[str, torch.Tensor]] = {}
# The frames whose offsets are not identity
non_identity_offset_frames: list[str] = []
# Only need to perform offsetting of target frame if any of the position offsets are non-zero or any of the
# rotation offsets are not the identity quaternion for efficiency in _update_buffer_impl
self._apply_target_frame_offset = False
# Need to keep track of whether the source frame is also a target frame
self._source_is_also_target_frame = False
# Collect all target frames, their associated body prim paths and their offsets so that we can extract
# the prim, check that it has the appropriate rigid body API in a single loop.
# First element is None because user can't specify source frame name
frames = [None] + [target_frame.name for target_frame in self.cfg.target_frames]
frame_prim_paths = [self.cfg.prim_path] + [target_frame.prim_path for target_frame in self.cfg.target_frames]
# First element is None because source frame offset is handled separately
frame_offsets = [None] + [target_frame.offset for target_frame in self.cfg.target_frames]
frame_types = ["source"] + ["target"] * len(self.cfg.target_frames)
for frame, prim_path, offset, frame_type in zip(frames, frame_prim_paths, frame_offsets, frame_types):
# Find correct prim
matching_prims = sim_utils.find_matching_prims(prim_path)
if len(matching_prims) == 0:
raise ValueError(
f"Failed to create frame transformer for frame '{frame}' with path '{prim_path}'."
" No matching prims were found."
)
for prim in matching_prims:
# Get the prim path of the matching prim
matching_prim_path = prim.GetPath().pathString
# Check if it is a rigid prim
if not prim.HasAPI(UsdPhysics.RigidBodyAPI):
raise ValueError(
f"While resolving expression '{prim_path}' found a prim '{matching_prim_path}' which is not a"
" rigid body. The class only supports transformations between rigid bodies."
)
# Get the name of the body
body_name = matching_prim_path.rsplit("/", 1)[-1]
# Use body name if frame isn't specified by user
frame_name = frame if frame is not None else body_name
# Keep track of which frames are associated with which bodies
if body_name in body_names_to_frames:
body_names_to_frames[body_name]["frames"].add(frame_name)
# This is a corner case where the source frame is also a target frame
if body_names_to_frames[body_name]["type"] == "source" and frame_type == "target":
self._source_is_also_target_frame = True
else:
# Store the first matching prim path and the type of frame
body_names_to_frames[body_name] = {
"frames": {frame_name},
"prim_path": matching_prim_path,
"type": frame_type,
}
if offset is not None:
offset_pos = torch.tensor(offset.pos, device=self.device)
offset_quat = torch.tensor(offset.rot, device=self.device)
# Check if we need to apply offsets (optimized code path in _update_buffer_impl)
if not is_identity_pose(offset_pos, offset_quat):
non_identity_offset_frames.append(frame_name)
self._apply_target_frame_offset = True
target_offsets[frame_name] = {"pos": offset_pos, "quat": offset_quat}
if not self._apply_target_frame_offset:
omni.log.info(
f"No offsets application needed from '{self.cfg.prim_path}' to target frames as all"
f" are identity: {frames[1:]}"
)
else:
omni.log.info(
f"Offsets application needed from '{self.cfg.prim_path}' to the following target frames:"
f" {non_identity_offset_frames}"
)
# The names of bodies that RigidPrimView will be tracking to later extract transforms from
tracked_prim_paths = [body_names_to_frames[body_name]["prim_path"] for body_name in body_names_to_frames.keys()]
tracked_body_names = [body_name for body_name in body_names_to_frames.keys()]
body_names_regex = [tracked_prim_path.replace("env_0", "env_*") for tracked_prim_path in tracked_prim_paths]
# Create simulation view
self._physics_sim_view = physx.create_simulation_view(self._backend)
self._physics_sim_view.set_subspace_roots("/")
# Create a prim view for all frames and initialize it
# order of transforms coming out of view will be source frame followed by target frame(s)
self._frame_physx_view = self._physics_sim_view.create_rigid_body_view(body_names_regex)
# Determine the order in which regex evaluated body names so we can later index into frame transforms
# by frame name correctly
all_prim_paths = self._frame_physx_view.prim_paths
if "env_" in all_prim_paths[0]:
def extract_env_num_and_prim_path(item: str) -> tuple[int, str]:
"""Separates the environment number and prim_path from the item.
Args:
item: The item to extract the environment number from. Assumes item is of the form
`/World/envs/env_1/blah` or `/World/envs/env_11/blah`.
Returns:
The environment number and the prim_path.
"""
match = re.search(r"env_(\d+)(.*)", item)
return (int(match.group(1)), match.group(2))
# Find the indices that would reorganize output to be per environment. We want `env_1/blah` to come before `env_11/blah`
# and env_1/Robot/base to come before env_1/Robot/foot so we need to use custom key function
self._per_env_indices = [
index
for index, _ in sorted(
list(enumerate(all_prim_paths)), key=lambda x: extract_env_num_and_prim_path(x[1])
)
]
# Only need 0th env as the names and their ordering are the same across environments
sorted_prim_paths = [
all_prim_paths[index] for index in self._per_env_indices if "env_0" in all_prim_paths[index]
]
else:
# If no environment is present, then the order of the body names is the same as the order of the prim paths sorted alphabetically
self._per_env_indices = [index for index, _ in sorted(enumerate(all_prim_paths), key=lambda x: x[1])]
sorted_prim_paths = [all_prim_paths[index] for index in self._per_env_indices]
# -- target frames
self._target_frame_body_names = [prim_path.split("/")[-1] for prim_path in sorted_prim_paths]
# -- source frame
self._source_frame_body_name = self.cfg.prim_path.split("/")[-1]
source_frame_index = self._target_frame_body_names.index(self._source_frame_body_name)
# Only remove source frame from tracked bodies if it is not also a target frame
if not self._source_is_also_target_frame:
self._target_frame_body_names.remove(self._source_frame_body_name)
# Determine indices into all tracked body frames for both source and target frames
all_ids = torch.arange(self._num_envs * len(tracked_body_names))
self._source_frame_body_ids = torch.arange(self._num_envs) * len(tracked_body_names) + source_frame_index
# If source frame is also a target frame, then the target frame body ids are the same as the source frame body ids
if self._source_is_also_target_frame:
self._target_frame_body_ids = all_ids
else:
self._target_frame_body_ids = all_ids[~torch.isin(all_ids, self._source_frame_body_ids)]
# The name of each of the target frame(s) - either user specified or defaulted to the body name
self._target_frame_names: list[str] = []
# The position and rotation components of target frame offsets
target_frame_offset_pos = []
target_frame_offset_quat = []
# Stores the indices of bodies that need to be duplicated. For instance, if body "LF_SHANK" is needed
# for 2 frames, this list enables us to duplicate the body to both frames when doing the calculations
# when updating sensor in _update_buffers_impl
duplicate_frame_indices = []
# Go through each body name and determine the number of duplicates we need for that frame
# and extract the offsets. This is all done to handle the case where multiple frames
# reference the same body, but have different names and/or offsets
for i, body_name in enumerate(self._target_frame_body_names):
for frame in body_names_to_frames[body_name]["frames"]:
# Only need to handle target frames here as source frame is handled separately
if frame in target_offsets:
target_frame_offset_pos.append(target_offsets[frame]["pos"])
target_frame_offset_quat.append(target_offsets[frame]["quat"])
self._target_frame_names.append(frame)
duplicate_frame_indices.append(i)
# To handle multiple environments, need to expand so [0, 1, 1, 2] with 2 environments becomes
# [0, 1, 1, 2, 3, 4, 4, 5]. Again, this is a optimization to make _update_buffer_impl more efficient
duplicate_frame_indices = torch.tensor(duplicate_frame_indices, device=self.device)
if self._source_is_also_target_frame:
num_target_body_frames = len(tracked_body_names)
else:
num_target_body_frames = len(tracked_body_names) - 1
self._duplicate_frame_indices = torch.cat(
[duplicate_frame_indices + num_target_body_frames * env_num for env_num in range(self._num_envs)]
)
# Target frame offsets are only applied if at least one of the offsets are non-identity
if self._apply_target_frame_offset:
# Stack up all the frame offsets for shape (num_envs, num_frames, 3) and (num_envs, num_frames, 4)
self._target_frame_offset_pos = torch.stack(target_frame_offset_pos).repeat(self._num_envs, 1)
self._target_frame_offset_quat = torch.stack(target_frame_offset_quat).repeat(self._num_envs, 1)
# fill the data buffer
self._data.target_frame_names = self._target_frame_names
self._data.source_pos_w = torch.zeros(self._num_envs, 3, device=self._device)
self._data.source_quat_w = torch.zeros(self._num_envs, 4, device=self._device)
self._data.target_pos_w = torch.zeros(self._num_envs, len(duplicate_frame_indices), 3, device=self._device)
self._data.target_quat_w = torch.zeros(self._num_envs, len(duplicate_frame_indices), 4, device=self._device)
self._data.target_pos_source = torch.zeros_like(self._data.target_pos_w)
self._data.target_quat_source = torch.zeros_like(self._data.target_quat_w)
def _update_buffers_impl(self, env_ids: Sequence[int]):
"""Fills the buffers of the sensor data."""
# default to all sensors
if len(env_ids) == self._num_envs:
env_ids = ...
# Extract transforms from view - shape is:
# (the total number of source and target body frames being tracked * self._num_envs, 7)
transforms = self._frame_physx_view.get_transforms()
# Reorder the transforms to be per environment as is expected of SensorData
transforms = transforms[self._per_env_indices]
# Convert quaternions as PhysX uses xyzw form
transforms[:, 3:] = convert_quat(transforms[:, 3:], to="wxyz")
# Process source frame transform
source_frames = transforms[self._source_frame_body_ids]
# Only apply offset if the offsets will result in a coordinate frame transform
if self._apply_source_frame_offset:
source_pos_w, source_quat_w = combine_frame_transforms(
source_frames[:, :3],
source_frames[:, 3:],
self._source_frame_offset_pos,
self._source_frame_offset_quat,
)
else:
source_pos_w = source_frames[:, :3]
source_quat_w = source_frames[:, 3:]
# Process target frame transforms
target_frames = transforms[self._target_frame_body_ids]
duplicated_target_frame_pos_w = target_frames[self._duplicate_frame_indices, :3]
duplicated_target_frame_quat_w = target_frames[self._duplicate_frame_indices, 3:]
# Only apply offset if the offsets will result in a coordinate frame transform
if self._apply_target_frame_offset:
target_pos_w, target_quat_w = combine_frame_transforms(
duplicated_target_frame_pos_w,
duplicated_target_frame_quat_w,
self._target_frame_offset_pos,
self._target_frame_offset_quat,
)
else:
target_pos_w = duplicated_target_frame_pos_w
target_quat_w = duplicated_target_frame_quat_w
# Compute the transform of the target frame with respect to the source frame
total_num_frames = len(self._target_frame_names)
target_pos_source, target_quat_source = subtract_frame_transforms(
source_pos_w.unsqueeze(1).expand(-1, total_num_frames, -1).reshape(-1, 3),
source_quat_w.unsqueeze(1).expand(-1, total_num_frames, -1).reshape(-1, 4),
target_pos_w,
target_quat_w,
)
# Update buffers
# note: The frame names / ordering don't change so no need to update them after initialization
self._data.source_pos_w[:] = source_pos_w.view(-1, 3)
self._data.source_quat_w[:] = source_quat_w.view(-1, 4)
self._data.target_pos_w[:] = target_pos_w.view(-1, total_num_frames, 3)
self._data.target_quat_w[:] = target_quat_w.view(-1, total_num_frames, 4)
self._data.target_pos_source[:] = target_pos_source.view(-1, total_num_frames, 3)
self._data.target_quat_source[:] = target_quat_source.view(-1, total_num_frames, 4)
def _set_debug_vis_impl(self, debug_vis: bool):
# set visibility of markers
# note: parent only deals with callbacks. not their visibility
if debug_vis:
if not hasattr(self, "frame_visualizer"):
self.frame_visualizer = VisualizationMarkers(self.cfg.visualizer_cfg)
# set their visibility to true
self.frame_visualizer.set_visibility(True)
else:
if hasattr(self, "frame_visualizer"):
self.frame_visualizer.set_visibility(False)
def _debug_vis_callback(self, event):
# Update the visualized markers
if self.frame_visualizer is not None:
self.frame_visualizer.visualize(self._data.target_pos_w.view(-1, 3), self._data.target_quat_w.view(-1, 4))
"""
Internal simulation callbacks.
"""
def _invalidate_initialize_callback(self, event):
"""Invalidates the scene elements."""
# call parent
super()._invalidate_initialize_callback(event)
# set all existing views to None to invalidate them
self._physics_sim_view = None
self._frame_physx_view = None