Source code for isaaclab.devices.openxr.openxr_device

# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""OpenXR-powered device for teleoperation and interaction."""

import contextlib
import numpy as np
from collections.abc import Callable
from dataclasses import dataclass
from enum import Enum
from typing import Any

import carb

from isaaclab.devices.openxr.common import HAND_JOINT_NAMES
from isaaclab.devices.retargeter_base import RetargeterBase

from ..device_base import DeviceBase, DeviceCfg
from .xr_cfg import XrCfg

# For testing purposes, we need to mock the XRCore, XRPoseValidityFlags classes
XRCore = None
XRPoseValidityFlags = None

with contextlib.suppress(ModuleNotFoundError):
    from omni.kit.xr.core import XRCore, XRPoseValidityFlags
from isaacsim.core.prims import SingleXFormPrim


@dataclass
class OpenXRDeviceCfg(DeviceCfg):
    """Configuration for OpenXR devices."""

    xr_cfg: XrCfg | None = None


[docs]class OpenXRDevice(DeviceBase): """An OpenXR-powered device for teleoperation and interaction. This device tracks hand joints using OpenXR and makes them available as: 1. A dictionary of tracking data (when used without retargeters) 2. Retargeted commands for robot control (when retargeters are provided) Raw data format (_get_raw_data output): * A dictionary with keys matching TrackingTarget enum values (HAND_LEFT, HAND_RIGHT, HEAD) * Each hand tracking entry contains a dictionary of joint poses * Each joint pose is a 7D vector (x, y, z, qw, qx, qy, qz) in meters and quaternion units * Joint names are defined in HAND_JOINT_NAMES from isaaclab.devices.openxr.common * Supported joints include palm, wrist, and joints for thumb, index, middle, ring and little fingers Teleop commands: The device responds to several teleop commands that can be subscribed to via add_callback(): * "START": Resume hand tracking data flow * "STOP": Pause hand tracking data flow * "RESET": Reset the tracking and signal simulation reset The device tracks the left hand, right hand, head position, or any combination of these based on the TrackingTarget enum values. When retargeters are provided, the raw tracking data is transformed into robot control commands suitable for teleoperation. """
[docs] class TrackingTarget(Enum): """Enum class specifying what to track with OpenXR. Attributes: HAND_LEFT: Track the left hand (index 0 in _get_raw_data output) HAND_RIGHT: Track the right hand (index 1 in _get_raw_data output) HEAD: Track the head/headset position (index 2 in _get_raw_data output) """ HAND_LEFT = 0 HAND_RIGHT = 1 HEAD = 2
TELEOP_COMMAND_EVENT_TYPE = "teleop_command"
[docs] def __init__( self, cfg: OpenXRDeviceCfg, retargeters: list[RetargeterBase] | None = None, ): """Initialize the OpenXR device. Args: cfg: Configuration object for OpenXR settings. retargeters: List of retargeter instances to use for transforming raw tracking data. """ super().__init__(retargeters) self._xr_cfg = cfg.xr_cfg or XrCfg() self._additional_callbacks = dict() self._vc_subscription = ( XRCore.get_singleton() .get_message_bus() .create_subscription_to_pop_by_type( carb.events.type_from_string(self.TELEOP_COMMAND_EVENT_TYPE), self._on_teleop_command ) ) # Initialize dictionaries instead of arrays default_pose = np.array([0, 0, 0, 1, 0, 0, 0], dtype=np.float32) self._previous_joint_poses_left = {name: default_pose.copy() for name in HAND_JOINT_NAMES} self._previous_joint_poses_right = {name: default_pose.copy() for name in HAND_JOINT_NAMES} self._previous_headpose = default_pose.copy() xr_anchor = SingleXFormPrim("/XRAnchor", position=self._xr_cfg.anchor_pos, orientation=self._xr_cfg.anchor_rot) carb.settings.get_settings().set_float("/persistent/xr/profile/ar/render/nearPlane", self._xr_cfg.near_plane) carb.settings.get_settings().set_string("/persistent/xr/profile/ar/anchorMode", "custom anchor") carb.settings.get_settings().set_string("/xrstage/profile/ar/customAnchor", xr_anchor.prim_path)
def __del__(self): """Clean up resources when the object is destroyed. Properly unsubscribes from the XR message bus to prevent memory leaks and resource issues when the device is no longer needed. """ if hasattr(self, "_vc_subscription") and self._vc_subscription is not None: self._vc_subscription = None # No need to explicitly clean up OpenXR instance as it's managed by NVIDIA Isaac Sim def __str__(self) -> str: """Returns a string containing information about the OpenXR hand tracking device. This provides details about the device configuration, tracking settings, and available gesture commands. Returns: Formatted string with device information """ msg = f"OpenXR Hand Tracking Device: {self.__class__.__name__}\n" msg += f"\tAnchor Position: {self._xr_cfg.anchor_pos}\n" msg += f"\tAnchor Rotation: {self._xr_cfg.anchor_rot}\n" # Add retargeter information if self._retargeters: msg += "\tRetargeters:\n" for i, retargeter in enumerate(self._retargeters): msg += f"\t\t{i + 1}. {retargeter.__class__.__name__}\n" else: msg += "\tRetargeters: None (raw joint data output)\n" # Add available gesture commands msg += "\t----------------------------------------------\n" msg += "\tAvailable Gesture Commands:\n" # Check which callbacks are registered start_avail = "START" in self._additional_callbacks stop_avail = "STOP" in self._additional_callbacks reset_avail = "RESET" in self._additional_callbacks msg += f"\t\tStart Teleoperation: {'✓' if start_avail else '✗'}\n" msg += f"\t\tStop Teleoperation: {'✓' if stop_avail else '✗'}\n" msg += f"\t\tReset Environment: {'✓' if reset_avail else '✗'}\n" # Add joint tracking information msg += "\t----------------------------------------------\n" msg += "\tTracked Joints: All 26 XR hand joints including:\n" msg += "\t\t- Wrist, palm\n" msg += "\t\t- Thumb (tip, intermediate joints)\n" msg += "\t\t- Fingers (tip, distal, intermediate, proximal)\n" return msg """ Operations """
[docs] def reset(self): default_pose = np.array([0, 0, 0, 1, 0, 0, 0], dtype=np.float32) self._previous_joint_poses_left = {name: default_pose.copy() for name in HAND_JOINT_NAMES} self._previous_joint_poses_right = {name: default_pose.copy() for name in HAND_JOINT_NAMES} self._previous_headpose = default_pose.copy()
[docs] def add_callback(self, key: str, func: Callable): """Add additional functions to bind to client messages. Args: key: The message type to bind to. Valid values are "START", "STOP", and "RESET". func: The function to call when the message is received. The callback function should not take any arguments. """ self._additional_callbacks[key] = func
def _get_raw_data(self) -> Any: """Get the latest tracking data from the OpenXR runtime. Returns: Dictionary with TrackingTarget enum keys (HAND_LEFT, HAND_RIGHT, HEAD) containing: - Left hand joint poses: Dictionary of 26 joints with position and orientation - Right hand joint poses: Dictionary of 26 joints with position and orientation - Head pose: Single 7-element array with position and orientation Each pose is represented as a 7-element array: [x, y, z, qw, qx, qy, qz] where the first 3 elements are position and the last 4 are quaternion orientation. """ return { self.TrackingTarget.HAND_LEFT: self._calculate_joint_poses( XRCore.get_singleton().get_input_device("/user/hand/left"), self._previous_joint_poses_left, ), self.TrackingTarget.HAND_RIGHT: self._calculate_joint_poses( XRCore.get_singleton().get_input_device("/user/hand/right"), self._previous_joint_poses_right, ), self.TrackingTarget.HEAD: self._calculate_headpose(), } """ Internal helpers. """ def _calculate_joint_poses( self, hand_device: Any, previous_joint_poses: dict[str, np.ndarray] ) -> dict[str, np.ndarray]: """Calculate and update joint poses for a hand device. This function retrieves the current joint poses from the OpenXR hand device and updates the previous joint poses with the new data. If a joint's position or orientation is not valid, it will use the previous values. Args: hand_device: The OpenXR input device for a hand (/user/hand/left or /user/hand/right). previous_joint_poses: Dictionary mapping joint names to their previous poses. Each pose is a 7-element array: [x, y, z, qw, qx, qy, qz]. Returns: Updated dictionary of joint poses with the same structure as previous_joint_poses. Each pose is represented as a 7-element numpy array: [x, y, z, qw, qx, qy, qz] where the first 3 elements are position and the last 4 are quaternion orientation. """ if hand_device is None: return previous_joint_poses joint_poses = hand_device.get_all_virtual_world_poses() # Update each joint that is present in the current data for joint_name, joint_pose in joint_poses.items(): if joint_name in HAND_JOINT_NAMES: # Extract translation and rotation if joint_pose.validity_flags & XRPoseValidityFlags.POSITION_VALID: position = joint_pose.pose_matrix.ExtractTranslation() else: position = previous_joint_poses[joint_name][:3] if joint_pose.validity_flags & XRPoseValidityFlags.ORIENTATION_VALID: quat = joint_pose.pose_matrix.ExtractRotationQuat() quati = quat.GetImaginary() quatw = quat.GetReal() else: quatw = previous_joint_poses[joint_name][3] quati = previous_joint_poses[joint_name][4:] # Directly update the dictionary with new data previous_joint_poses[joint_name] = np.array( [position[0], position[1], position[2], quatw, quati[0], quati[1], quati[2]], dtype=np.float32 ) # No need for conversion, just return the updated dictionary return previous_joint_poses def _calculate_headpose(self) -> np.ndarray: """Calculate the head pose from OpenXR. Returns: numpy.ndarray: 7-element array containing head position (xyz) and orientation (wxyz) """ head_device = XRCore.get_singleton().get_input_device("/user/head") if head_device: hmd = head_device.get_virtual_world_pose("") position = hmd.ExtractTranslation() quat = hmd.ExtractRotationQuat() quati = quat.GetImaginary() quatw = quat.GetReal() # Store in w, x, y, z order to match our convention self._previous_headpose = np.array([ position[0], position[1], position[2], quatw, quati[0], quati[1], quati[2], ]) return self._previous_headpose def _on_teleop_command(self, event: carb.events.IEvent): msg = event.payload["message"] if "start" in msg: if "START" in self._additional_callbacks: self._additional_callbacks["START"]() elif "stop" in msg: if "STOP" in self._additional_callbacks: self._additional_callbacks["STOP"]() elif "reset" in msg: if "RESET" in self._additional_callbacks: self._additional_callbacks["RESET"]()