Source code for isaaclab.sim.simulation_context

# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

from __future__ import annotations

import gc
import logging
import os
import traceback
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any

import toml
import torch

from pxr import Gf, Usd, UsdGeom, UsdPhysics, UsdUtils

import isaaclab.sim as sim_utils
import isaaclab.sim.utils.stage as stage_utils
from isaaclab.app.settings_manager import SettingsManager
from isaaclab.envs.utils.recording_hooks import run_recording_hooks_after_visualizers
from isaaclab.markers.vis_marker_registry import VisMarkerRegistry
from isaaclab.physics import BaseSceneDataProvider, PhysicsManager, SceneDataProvider
from isaaclab.physics.scene_data_requirements import (
    SceneDataRequirement,
    resolve_scene_data_requirements,
)
from isaaclab.renderers.render_context import RenderContext
from isaaclab.sim.utils import create_new_stage
from isaaclab.utils.string import clear_resolve_matching_names_cache
from isaaclab.utils.version import has_kit
from isaaclab.visualizers.base_visualizer import BaseVisualizer

if TYPE_CHECKING:
    from isaaclab.cloner.clone_plan import ClonePlan

from .simulation_cfg import SimulationCfg
from .spawners import DomeLightCfg, GroundPlaneCfg

logger = logging.getLogger(__name__)

# Visualizer type names (CLI and config). App launcher parses CSV and stores as a space-separated setting.
_VISUALIZER_TYPES = ("newton", "rerun", "viser", "kit")


class SettingsHelper:
    """Helper for typed settings access via SettingsManager."""

    def __init__(self, settings: SettingsManager):
        self._settings = settings

    def set(self, name: str, value: Any) -> None:
        """Set a setting with automatic type routing."""
        if isinstance(value, bool):
            self._settings.set_bool(name, value)
        elif isinstance(value, int):
            self._settings.set_int(name, value)
        elif isinstance(value, float):
            self._settings.set_float(name, value)
        elif isinstance(value, str):
            self._settings.set_string(name, value)
        elif isinstance(value, (list, tuple)):
            self._settings.set(name, value)
        else:
            raise ValueError(f"Unsupported value type for setting '{name}': {type(value)}")

    def get(self, name: str) -> Any:
        """Get a setting value."""
        return self._settings.get(name)


[docs] class SimulationContext: """Controls simulation lifecycle including physics stepping and rendering. This singleton class manages: * Physics configuration (time-step, solver parameters via :class:`isaaclab.sim.SimulationCfg`) * Simulation state (play, pause, step, stop) * Rendering and visualization The singleton instance can be accessed using the ``instance()`` class method. """ # SINGLETON PATTERN _instance: SimulationContext | None = None
[docs] def __new__(cls, cfg: SimulationCfg | None = None): """Enforce singleton pattern.""" if cls._instance is not None: return cls._instance return super().__new__(cls)
[docs] @classmethod def instance(cls) -> SimulationContext | None: """Get the singleton instance, or None if not created.""" return cls._instance
[docs] def __init__(self, cfg: SimulationCfg | None = None): """Initialize the simulation context. Args: cfg: Simulation configuration. Defaults to None (uses default config). """ if type(self)._instance is not None: return # Already initialized # Store config self.cfg = SimulationCfg() if cfg is None else cfg # Get or create stage based on config stage_cache = UsdUtils.StageCache.Get() if self.cfg.create_stage_in_memory: self.stage = create_new_stage() else: # Prefer the thread-local current stage (set by create_new_stage / test fixtures) # over cache lookup, since the cache may contain stale stages from prior tests. current = getattr(stage_utils._context, "stage", None) if current is not None: self.stage = current else: all_stages = stage_cache.GetAllStages() if stage_cache.Size() > 0 else [] # type: ignore[union-attr] self.stage = all_stages[0] if all_stages else create_new_stage() # Ensure stage is in the USD cache stage_id = stage_cache.GetId(self.stage).ToLongInt() # type: ignore[union-attr] if stage_id < 0: stage_cache.Insert(self.stage) # type: ignore[union-attr] # Set as current stage in thread-local context for get_current_stage() stage_utils._context.stage = self.stage # When Kit is running, attach the stage to Kit's USD context so that # Kit extensions (PhysX views, Articulation, viewport) can discover it. if has_kit(): import omni.usd kit_context = omni.usd.get_context() if kit_context is not None and kit_context.get_stage() is not self.stage: kit_context.attach_stage_with_callback(stage_cache.GetId(self.stage).ToLongInt()) # Acquire settings interface (SettingsManager: standalone dict or Omniverse when available) self.settings = SettingsManager.instance() self._settings_helper = SettingsHelper(self.settings) # Initialize USD physics scene and physics manager self._init_usd_physics_scene() # Normalize "cuda" -> "cuda:<id>" now that the USD physics scene is initialized # and /physics/cudaDevice is available. Update cfg.device in-place so all # downstream code (physics backends, assets, sensors) sees a consistent value. if "cuda" in self.cfg.device and ":" not in self.cfg.device: cuda_device = self.get_setting("/physics/cudaDevice") device_id = max(0, int(cuda_device) if cuda_device is not None else 0) self.cfg.device = f"cuda:{device_id}" # Set default physics backend if not specified if self.cfg.physics is None: from isaaclab_physx.physics import PhysxCfg self.cfg.physics = PhysxCfg() self._physics = self.cfg.physics # If physics is a PresetCfg wrapper (has a 'default' field but no 'class_type'), # resolve to the default preset so downstream code always sees a concrete PhysicsCfg. if not hasattr(self._physics, "class_type") and hasattr(self._physics, "default"): self._physics = self._physics.default self.cfg.physics = self._physics self.physics_manager: type[PhysicsManager] = self._physics.class_type self.physics_manager.initialize(self) self._apply_render_cfg_settings() # Initialize visualizer state (provider/visualizers are created lazily during initialize_visualizers()). self._scene_data_provider: BaseSceneDataProvider | None = None self._visualizers: list[BaseVisualizer] = [] self._scene_data_requirements = SceneDataRequirement() # Per-group clone plans published by InteractiveScene after cloning. Providers (e.g. # the Newton visualizer model rebuilder on a PhysX backend) consume these to derive # their own backend args. Empty dict until :meth:`InteractiveScene.clone_environments` # runs. self._clone_plans: dict[str, ClonePlan] = {} self._visualizer_step_counter = 0 # Default visualization dt used before/without visualizer initialization. physics_dt = getattr(self.cfg.physics, "dt", None) self._viz_dt = (physics_dt if physics_dt is not None else self.cfg.dt) * self.cfg.render_interval # Cache commonly-used settings (these don't change during runtime) self._has_gui = bool(self.get_setting("/isaaclab/has_gui")) self._has_offscreen_render = bool(self.get_setting("/isaaclab/render/offscreen")) self._xr_enabled = bool(self.get_setting("/isaaclab/xr/enabled")) # Note: has_rtx_sensors is NOT cached because it changes when Camera sensors are created self._pending_camera_view: tuple[tuple[float, float, float], tuple[float, float, float]] | None = None self.vis_marker_registry = VisMarkerRegistry() # Simulation state self._is_playing = False self._is_stopped = True # Monotonic physics-step counter used by camera sensors for self._physics_step_count: int = 0 # Monotonic render-generation counter. This increments whenever render() # is executed and lets downstream camera freshness logic distinguish # render/reset transitions that occur without advancing physics steps. self._render_generation: int = 0 # Shared renderers for all Camera sensors (compatible renderer_cfg only). self._render_context = RenderContext() type(self)._instance = self # Mark as valid singleton only after successful init
def _apply_render_cfg_settings(self) -> None: """Apply render preset and overrides from SimulationCfg.render.""" # TODO: Refactor render preset + override handling to a dedicated RenderingQualityCfg # (name subject to change) to keep quality profiles and carb mappings centralized. render_cfg = getattr(self.cfg, "render", None) if render_cfg is None: return # Priority: # 1) CLI/AppLauncher setting if present, 2) SimulationCfg.render.rendering_mode. rendering_mode = self.get_setting("/isaaclab/rendering/rendering_mode") if not rendering_mode: rendering_mode = getattr(render_cfg, "rendering_mode", None) if rendering_mode: supported_rendering_modes = {"performance", "balanced", "quality"} if rendering_mode not in supported_rendering_modes: raise ValueError( f"RenderCfg rendering mode '{rendering_mode}' not in supported modes " f"{sorted(supported_rendering_modes)}." ) isaaclab_app_exp_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), *[".."] * 4, "apps") from isaaclab.utils.version import get_isaac_sim_version if get_isaac_sim_version().major < 6: isaaclab_app_exp_path = os.path.join(isaaclab_app_exp_path, "isaacsim_5") preset_filename = os.path.join(isaaclab_app_exp_path, f"rendering_modes/{rendering_mode}.kit") if os.path.exists(preset_filename): with open(preset_filename) as file: preset_dict = toml.load(file) def _apply_nested(data: dict[str, Any], path: str = "") -> None: for key, value in data.items(): key_path = f"{path}/{key}" if path else f"/{key}" if isinstance(value, dict): _apply_nested(value, key_path) else: self.set_setting(key_path.replace(".", "/"), value) _apply_nested(preset_dict) else: logger.warning("[SimulationContext] Render preset file not found: %s", preset_filename) # RenderCfg fields mapped to setting paths (stored via SettingsManager) field_to_setting = { "enable_translucency": "/rtx/translucency/enabled", "enable_reflections": "/rtx/reflections/enabled", "enable_global_illumination": "/rtx/indirectDiffuse/enabled", "enable_dlssg": "/rtx-transient/dlssg/enabled", "enable_dl_denoiser": "/rtx-transient/dldenoiser/enabled", "dlss_mode": "/rtx/post/dlss/execMode", "enable_direct_lighting": "/rtx/directLighting/enabled", "samples_per_pixel": "/rtx/directLighting/sampledLighting/samplesPerPixel", "enable_shadows": "/rtx/shadows/enabled", "enable_ambient_occlusion": "/rtx/ambientOcclusion/enabled", "dome_light_upper_lower_strategy": "/rtx/domeLight/upperLowerStrategy", } for key, value in vars(render_cfg).items(): if value is None or key in {"rendering_mode", "carb_settings", "antialiasing_mode"}: continue setting_path = field_to_setting.get(key) if setting_path is not None: self.set_setting(setting_path, value) # Raw overrides from render_cfg (stored via SettingsManager) extra_settings = getattr(render_cfg, "carb_settings", None) if extra_settings: for key, value in extra_settings.items(): if "_" in key: path = "/" + key.replace("_", "/") elif "." in key: path = "/" + key.replace(".", "/") else: path = key self.set_setting(path, value) # Optional anti-aliasing mode via Replicator (best-effort, may use Omniverse APIs) antialiasing_mode = getattr(render_cfg, "antialiasing_mode", None) if antialiasing_mode is not None: try: import omni.replicator.core as rep rep.settings.set_render_rtx_realtime(antialiasing=antialiasing_mode) except Exception: pass def _init_usd_physics_scene(self) -> None: """Create and configure the USD physics scene.""" cfg = self.cfg with sim_utils.use_stage(self.stage): # Set stage conventions for metric units UsdGeom.SetStageUpAxis(self.stage, "Z") UsdGeom.SetStageMetersPerUnit(self.stage, 1.0) UsdPhysics.SetStageKilogramsPerUnit(self.stage, 1.0) # Find and delete any existing physics scene. # Collect paths first to avoid mutating the stage while traversing, # which can invalidate the USD iterator. physics_scene_paths = [ prim.GetPath().pathString for prim in self.stage.Traverse() if prim.GetTypeName() == "PhysicsScene" ] for path in physics_scene_paths: sim_utils.delete_prim(path, stage=self.stage) # Create a new physics scene if self.stage.GetPrimAtPath(cfg.physics_prim_path).IsValid(): raise RuntimeError(f"A prim already exists at path '{cfg.physics_prim_path}'.") physics_scene = UsdPhysics.Scene.Define(self.stage, cfg.physics_prim_path) # Pre-create gravity tensor to avoid torch heap corruption issues (torch 2.1+) gravity = torch.tensor(cfg.gravity, dtype=torch.float32, device=self.cfg.device) gravity_magnitude = torch.norm(gravity).item() if gravity_magnitude == 0.0: gravity_direction = [0.0, 0.0, -1.0] else: gravity_direction = (gravity / gravity_magnitude).tolist() physics_scene.CreateGravityDirectionAttr(Gf.Vec3f(*gravity_direction)) physics_scene.CreateGravityMagnitudeAttr(gravity_magnitude) @property def physics_sim_view(self): """Returns the physics simulation view.""" return self.physics_manager.get_physics_sim_view() @property def device(self) -> str: """Returns the device on which the simulation is running.""" return self.physics_manager.get_device() @property def backend(self) -> str: """Returns the tensor backend being used ("numpy" or "torch").""" return self.physics_manager.get_backend() @property def has_gui(self) -> bool: """Returns whether GUI is enabled (cached at init).""" return self._has_gui @property def has_offscreen_render(self) -> bool: """Returns whether offscreen rendering is enabled (cached at init).""" return self._has_offscreen_render
[docs] def has_active_visualizers(self) -> bool: """Return whether any visualizer path is active for rendering/camera control.""" return bool(self.get_setting("/isaaclab/visualizer/types")) or bool( self.get_setting("/isaaclab/video/auto_start_kit") )
[docs] def can_render_rgb_array(self) -> bool: """Return whether rgb-array rendering is currently available.""" return self.has_gui or self.has_offscreen_render or self.has_active_visualizers()
@property def is_rendering(self) -> bool: """Returns whether rendering is active (GUI, RTX sensors, visualizers, or XR).""" return ( self._has_gui or self._has_offscreen_render or self.get_setting("/isaaclab/render/rtx_sensors") or bool(self.resolve_visualizer_types()) or self._xr_enabled )
[docs] def get_physics_dt(self) -> float: """Returns the physics time step.""" return self.physics_manager.get_physics_dt()
[docs] def get_physics_step_count(self) -> int: """Return the monotonic physics step counter (incremented each :meth:`step`).""" return self._physics_step_count
@property def render_context(self) -> RenderContext: """Shared :class:`~isaaclab.renderers.render_context.RenderContext` for camera renderers.""" return self._render_context @property def render_generation(self) -> int: """Returns a monotonic counter for render() executions.""" return self._render_generation def _create_default_visualizer_configs(self, requested_visualizers: list[str]) -> list: """Create default visualizer configs for requested types. Loads only the requested visualizer submodule (e.g. isaaclab_visualizers.rerun) so dependencies for other backends are not imported. """ import importlib default_configs = [] cfg_class_names = { "kit": "KitVisualizerCfg", "newton": "NewtonVisualizerCfg", "rerun": "RerunVisualizerCfg", "viser": "ViserVisualizerCfg", } for viz_type in requested_visualizers: try: if viz_type not in _VISUALIZER_TYPES: logger.warning( f"[SimulationContext] Unknown visualizer type '{viz_type}' requested. " f"Valid types: {', '.join(repr(t) for t in _VISUALIZER_TYPES)}. Skipping." ) continue mod = importlib.import_module(f"isaaclab_visualizers.{viz_type}") cfg_cls = getattr(mod, cfg_class_names[viz_type]) default_configs.append(cfg_cls()) except (ImportError, ModuleNotFoundError) as exc: # isaaclab_visualizers is optional; log once at warning level if "isaaclab_visualizers" in str(exc): logger.warning( "[SimulationContext] Visualizer '%s' skipped: isaaclab_visualizers is not installed. " "Install with: pip install isaaclab_visualizers[%s]", viz_type, viz_type, ) else: logger.error( "[SimulationContext] Failed to create default config for visualizer '%s': %s", viz_type, exc, ) except Exception as exc: logger.error(f"[SimulationContext] Failed to create default config for visualizer '{viz_type}': {exc}") return default_configs def _get_cli_visualizer_types(self) -> list[str]: """Return list of visualizer types requested via CLI (setting).""" requested = self.get_setting("/isaaclab/visualizer/types") if not isinstance(requested, str) or not requested.strip(): return [] # App launcher writes this as a single string; accept comma and/or whitespace separators. return [value for chunk in requested.split(",") for value in chunk.split() if value] def _apply_visualizer_cli_overrides(self, visualizer_cfgs: list[Any]) -> None: """Apply ``--max_visible_envs`` to every resolved visualizer cfg when set in settings. AppLauncher stores ``/isaaclab/visualizer/max_visible_envs`` as ``-1`` when the flag was omitted; any non-negative int overrides :attr:`VisualizerCfg.max_visible_envs` on each cfg. """ raw = self.get_setting("/isaaclab/visualizer/max_visible_envs") try: max_visible = int(raw) if raw is not None else -1 except (TypeError, ValueError): logger.warning("[SimulationContext] Invalid /isaaclab/visualizer/max_visible_envs: %r", raw) return if max_visible < 0: return for cfg in visualizer_cfgs: if hasattr(cfg, "max_visible_envs"): cfg.max_visible_envs = max_visible def _is_cli_visualizer_explicit(self) -> bool: """Return ``True`` when visualizers were explicitly provided via CLI.""" return bool(self.get_setting("/isaaclab/visualizer/explicit")) def _is_cli_visualizer_disable_all(self) -> bool: """Return ``True`` when CLI requested ``--viz none`` semantics.""" return bool(self.get_setting("/isaaclab/visualizer/disable_all"))
[docs] def resolve_visualizer_types(self) -> list[str]: """Resolve visualizer types from config or CLI settings.""" if self._is_cli_visualizer_disable_all(): return [] if self._is_cli_visualizer_explicit(): return self._get_cli_visualizer_types() visualizer_cfgs = self.cfg.visualizer_cfgs if visualizer_cfgs is None: return [] if not isinstance(visualizer_cfgs, list): visualizer_cfgs = [visualizer_cfgs] return [cfg.visualizer_type for cfg in visualizer_cfgs if getattr(cfg, "visualizer_type", None)]
def _resolve_visualizer_cfgs(self) -> list[Any]: """Resolve final visualizer configs from cfg and optional CLI override. When visualizers are explicitly requested via ``--visualizer`` CLI flag, a :class:`RuntimeError` is raised if any requested type cannot be resolved (unknown type or missing package). """ visualizer_cfgs: list[Any] = [] if self.cfg.visualizer_cfgs is not None: visualizer_cfgs = ( self.cfg.visualizer_cfgs if isinstance(self.cfg.visualizer_cfgs, list) else [self.cfg.visualizer_cfgs] ) cli_requested = self._get_cli_visualizer_types() cli_explicit = self._is_cli_visualizer_explicit() cli_disable_all = self._is_cli_visualizer_disable_all() if cli_disable_all: resolved = [] elif not cli_explicit: self._apply_visualizer_cli_overrides(visualizer_cfgs) resolved = visualizer_cfgs elif not visualizer_cfgs: resolved = self._create_default_visualizer_configs(cli_requested) if cli_requested else [] self._apply_visualizer_cli_overrides(resolved) else: # CLI selection is explicit: keep only requested cfg types, then add defaults for missing. cli_requested_set = set(cli_requested) resolved = [cfg for cfg in visualizer_cfgs if getattr(cfg, "visualizer_type", None) in cli_requested_set] existing_types = {getattr(cfg, "visualizer_type", None) for cfg in resolved} for viz_type in cli_requested: if viz_type not in existing_types and viz_type in _VISUALIZER_TYPES: resolved.extend(self._create_default_visualizer_configs([viz_type])) existing_types.add(viz_type) self._apply_visualizer_cli_overrides(resolved) # When visualizers were explicitly requested via CLI, verify all # requested types were resolved. This catches unknown types and # missing packages that _create_default_visualizer_configs silently # skips. if cli_explicit and cli_requested: resolved_types = {getattr(cfg, "visualizer_type", None) for cfg in resolved} missing = [t for t in cli_requested if t not in resolved_types] if missing: raise RuntimeError( f"Explicitly requested visualizer(s) {missing} could not be configured. " f"Valid types: {', '.join(repr(t) for t in _VISUALIZER_TYPES)}. " "Ensure the required package is installed " "(e.g., pip install isaaclab_visualizers[<type>])." ) # XR auto-start: auto-inject a KitVisualizer when XR is active and no # Kit visualizer is already present. The KitVisualizer pumps # app.update() and triggers forward() (via requires_forward_before_step) # to sync Fabric data so the XR runtime receives up-to-date hand/joint # transforms each frame. if self._xr_enabled and bool(self.get_setting("/isaaclab/xr/auto_start")): has_kit = any(getattr(cfg, "visualizer_type", None) == "kit" for cfg in resolved) if not has_kit: try: import importlib mod = importlib.import_module("isaaclab_visualizers.kit") kit_cfg_cls = getattr(mod, "KitVisualizerCfg") resolved.append(kit_cfg_cls()) logger.info("[SimulationContext] Auto-injecting KitVisualizer for XR app-update pumping.") except (ImportError, ModuleNotFoundError, AttributeError) as exc: logger.warning( "[SimulationContext] XR mode could not auto-inject a KitVisualizer: %s. " "Install isaaclab_visualizers[kit] or pass --visualizer kit.", exc, ) return resolved
[docs] def initialize_visualizers(self) -> None: """Initialize visualizers from SimulationCfg.visualizer_cfgs.""" if self._visualizers: return physics_dt = getattr(self.cfg.physics, "dt", None) self._viz_dt = (physics_dt if physics_dt is not None else self.cfg.dt) * self.cfg.render_interval visualizer_cfgs = self._resolve_visualizer_cfgs() if not visualizer_cfgs: return cli_explicit = self._is_cli_visualizer_explicit() # Resolve visualizer-driven requirements once and keep optional artifact payload untouched. visualizer_types = [ cfg.visualizer_type for cfg in visualizer_cfgs if getattr(cfg, "visualizer_type", None) is not None ] requirements = resolve_scene_data_requirements(visualizer_types=visualizer_types) self._scene_data_requirements = requirements self.initialize_scene_data_provider() self._visualizers = [] for cfg in visualizer_cfgs: try: visualizer = cfg.create_visualizer() visualizer.initialize(self._scene_data_provider) self._visualizers.append(visualizer) except Exception as exc: if cli_explicit: raise RuntimeError( f"Visualizer '{cfg.visualizer_type}' was explicitly requested " f"but failed to create or initialize: {exc}" ) from exc logger.exception( "Failed to initialize visualizer '%s' (%s): %s", cfg.visualizer_type, type(cfg).__name__, exc, ) # Replay any camera pose requested before visualizers were initialized. pending = getattr(self, "_pending_camera_view", None) if pending is not None: eye, target = pending for viz in self._visualizers: viz.set_camera_view(eye, target) self._pending_camera_view = None if not self._visualizers and self._scene_data_provider is not None: close_provider = getattr(self._scene_data_provider, "close", None) if callable(close_provider): close_provider() self._scene_data_provider = None
def initialize_scene_data_provider(self) -> BaseSceneDataProvider: if self._scene_data_provider is None: self._scene_data_provider = SceneDataProvider(self.stage, self) return self._scene_data_provider
[docs] def get_scene_data_requirements(self) -> SceneDataRequirement: """Return scene-data requirements resolved from visualizers/renderers.""" return self._scene_data_requirements
[docs] def update_scene_data_requirements(self, requirements: SceneDataRequirement) -> None: """Update scene-data requirements.""" self._scene_data_requirements = requirements
[docs] def get_clone_plans(self) -> dict[str, ClonePlan]: """Return per-group clone plans published by the scene, keyed by destination template. Set by :meth:`InteractiveScene.clone_environments` after replication. Consumed by scene data providers that build backend models (e.g. Newton visualizer model on a PhysX backend) from the same plan the cloner used. Empty dict until the scene clones. """ return self._clone_plans
[docs] def set_clone_plans(self, plans: dict[str, ClonePlan]) -> None: """Set the cloner's per-group clone-plan map.""" self._clone_plans = plans
@property def visualizers(self) -> list[BaseVisualizer]: """Returns the list of active visualizers.""" return self._visualizers
[docs] def get_rendering_dt(self) -> float: """Return rendering dt, allowing visualizer-specific override.""" for viz in self._visualizers: viz_dt = viz.get_rendering_dt() if viz_dt is not None and viz_dt > 0: return float(viz_dt) return self._viz_dt
[docs] def set_camera_view(self, eye: tuple, target: tuple) -> None: """Set camera view on all visualizers that support it.""" self._pending_camera_view = (tuple(eye), tuple(target)) for viz in self._visualizers: viz.set_camera_view(eye, target)
[docs] def forward(self) -> None: """Update kinematics without stepping physics.""" self.physics_manager.forward()
[docs] def reset(self, soft: bool = False) -> None: """Reset the simulation. Args: soft: If True, skip full reinitialization. """ self.physics_manager.reset(soft) for viz in self._visualizers: viz.reset(soft) # Start the timeline so the play button is pressed self.physics_manager.play() if not self._visualizers: # Initialize visualizers after PhysX sim view is ready. self.initialize_visualizers() self._is_playing = True self._is_stopped = False
[docs] def step(self, render: bool = True) -> None: """Step physics and optionally render. If the timeline is paused (e.g. via the GUI), this method blocks and keeps the visualizer responsive until the timeline is resumed or stopped. Args: render: Whether to render the scene after stepping. Defaults to True. """ # Block while the GUI timeline is paused so the entire training loop freezes. # See: https://github.com/isaac-sim/IsaacLab/issues/4279 self.physics_manager.wait_for_playing() self._physics_step_count += 1 self.physics_manager.step() if render and self.is_rendering: self.render()
[docs] def render(self, mode: int | None = None, skip_app_pumping: bool = False) -> None: """Update visualizers and render the scene. Calls update_visualizers() so visualizers run at the render cadence (not at every physics step). Camera sensors drive their configured renderer when fetching data. Recording-related follow-up (Kit/RTX headless video, Newton GL video, etc.) runs in :mod:`isaaclab.envs.utils.recording_hooks` so it is not tied to a specific :class:`~isaaclab.physics.PhysicsManager` subclass. **Kit vs. standalone visualizers:** The Kit app loop (``app.update()``) is the only way to drive camera/RTX sensor rendering and viewport GUI updates; it cannot be split into "cameras only" and "GUI only". Standalone visualizers (Newton, Rerun, Viser) have self-contained ``step()`` methods that never call ``app.update()``, so they can run independently of camera rendering. The ``skip_app_pumping`` flag exploits this distinction: when True, Kit is skipped while standalone visualizers continue to update. Args: mode: Unused. Kept for backward compatibility. skip_app_pumping: When True, skip visualizers whose :meth:`~BaseVisualizer.pumps_app_update` returns True (e.g. KitVisualizer). This disables the Kit app loop and camera updates while still stepping standalone visualizers (Newton, Rerun, Viser). Used by environment ``step()`` when ``render_enabled`` is False. """ self.physics_manager.pre_render() self.update_visualizers(self.get_rendering_dt(), skip_app_pumping=skip_app_pumping) self.physics_manager.after_visualizers_render() run_recording_hooks_after_visualizers(self) self._render_generation += 1 # Call render callbacks if hasattr(self, "_render_callbacks"): for callback in self._render_callbacks.values(): callback(None) # Pass None as event data
[docs] def update_visualizers(self, dt: float, skip_app_pumping: bool = False) -> None: """Update visualizers without triggering renderer/GUI. Args: dt: Simulation time-step in seconds. skip_app_pumping: When True, skip visualizers whose :meth:`~BaseVisualizer.pumps_app_update` returns True (e.g. KitVisualizer). This is used when the environment's ``render_enabled`` flag is False — cameras and the Kit app loop are skipped, but standalone visualizers (Newton, Rerun, Viser) still receive updates. """ if not self._visualizers: return self.update_scene_data_provider() # Marker callbacks update VisualizationMarkers state; visualizer step() # consumes that state later in this method. if any(viz.supports_markers() for viz in self._visualizers): self.vis_marker_registry.dispatch_callbacks() visualizers_to_remove = [] for viz in self._visualizers: try: # When skip_app_pumping is set, skip Kit-like visualizers that call app.update() if skip_app_pumping and viz.pumps_app_update(): continue if viz.is_closed or not viz.is_running(): if viz.is_closed: logger.info("Visualizer closed: %s", type(viz).__name__) else: logger.info("Visualizer not running: %s", type(viz).__name__) visualizers_to_remove.append(viz) continue if viz.is_rendering_paused(): # Keep non-Kit visualizer event loops responsive while rendering is paused. # Newton/Rerun/Viser need step(0.0) so GL/UI can process input (e.g. Resume). # Kit is skipped: step() would call app.update(), which must not run during pause. if not viz.pumps_app_update(): viz.step(0.0) continue while viz.is_training_paused() and viz.is_running(): viz.step(0.0) viz.step(dt) except Exception as exc: logger.error("Error stepping visualizer '%s': %s", type(viz).__name__, exc) visualizers_to_remove.append(viz) for viz in visualizers_to_remove: try: viz.close() self._visualizers.remove(viz) logger.info("Removed visualizer: %s", type(viz).__name__) except Exception as exc: logger.error("Error closing visualizer: %s", exc)
def update_scene_data_provider(self, force_require_forward: bool = False): if force_require_forward or self._should_forward_before_visualizer_update(): self.physics_manager.forward() self._visualizer_step_counter += 1 if self._scene_data_provider is None: return self._scene_data_provider.update() def _should_forward_before_visualizer_update(self) -> bool: """Return True if any visualizer requires pre-step forward kinematics.""" return any(viz.requires_forward_before_step() for viz in self._visualizers)
[docs] def play(self) -> None: """Start or resume the simulation.""" self.physics_manager.play() for viz in self._visualizers: viz.play() self._is_playing = True self._is_stopped = False
[docs] def pause(self) -> None: """Pause the simulation (can be resumed with play).""" self.physics_manager.pause() for viz in self._visualizers: viz.pause() self._is_playing = False
[docs] def stop(self) -> None: """Stop the simulation completely.""" self.physics_manager.stop() for viz in self._visualizers: viz.stop() self._is_playing = False self._is_stopped = True
[docs] def is_playing(self) -> bool: """Returns True if simulation is playing (not paused or stopped).""" return self._is_playing
[docs] def is_stopped(self) -> bool: """Returns True if simulation is stopped (not just paused).""" return self._is_stopped
[docs] def set_setting(self, name: str, value: Any) -> None: """Set a setting value.""" self._settings_helper.set(name, value)
[docs] def get_setting(self, name: str) -> Any: """Get a setting value.""" return self._settings_helper.get(name)
[docs] @classmethod def clear_instance(cls) -> None: """Clean up resources and clear the singleton instance.""" if cls._instance is not None: # Close physics manager FIRST to detach PhysX from the stage # This must happen before clearing USD prims to avoid PhysX cleanup errors cls._instance.physics_manager.close() # Close all visualizers for viz in cls._instance._visualizers: viz.close() cls._instance._visualizers.clear() if cls._instance._scene_data_provider is not None: close_provider = getattr(cls._instance._scene_data_provider, "close", None) if callable(close_provider): close_provider() cls._instance._scene_data_provider = None # Tear down the stage. We skip clear_stage() (prim-by-prim deletion) since # close_stage() + app shutdown destroy the entire stage at once. stage_utils.close_stage() # Discard cached name-resolution data from destroyed assets clear_resolve_matching_names_cache() # Clear instance cls._instance = None gc.collect() logger.info("SimulationContext cleared")
[docs] @classmethod def clear_stage(cls) -> None: """Clear the current USD stage (preserving /World and PhysicsScene). Uses a predicate that preserves /World and PhysicsScene while also respecting the default deletability checks (ancestral prims, etc.). """ if cls._instance is None: return def _predicate(prim: Usd.Prim) -> bool: path = prim.GetPath().pathString if path == "/World": return False if prim.GetTypeName() == "PhysicsScene": return False return True sim_utils.clear_stage(predicate=_predicate)
@contextmanager def build_simulation_context( create_new_stage: bool = True, gravity_enabled: bool = True, device: str = "cuda:0", dt: float = 0.01, sim_cfg: SimulationCfg | None = None, add_ground_plane: bool = False, add_lighting: bool = False, auto_add_lighting: bool = False, visualizers: list[str] | None = None, ) -> Iterator[SimulationContext]: """Context manager to build a simulation context with the provided settings. Args: create_new_stage: Whether to create a new stage. Defaults to True. gravity_enabled: Whether to enable gravity. Defaults to True. device: Device to run the simulation on. Defaults to "cuda:0". dt: Time step for the simulation. Defaults to 0.01. sim_cfg: SimulationCfg to use. Defaults to None. add_ground_plane: Whether to add a ground plane. Defaults to False. add_lighting: Whether to add a dome light. Defaults to False. auto_add_lighting: Whether to auto-add lighting if GUI present. Defaults to False. visualizers: List of visualizer backend keys to enable (e.g. ``["kit", "newton", "rerun"]``). Valid types: ``"kit"``, ``"newton"``, ``"rerun"``, ``"viser"``. When provided, sets the ``/isaaclab/visualizer/types`` setting so the existing visualizer resolution machinery picks them up. Defaults to None. Yields: The simulation context to use for the simulation. """ sim: SimulationContext | None = None try: if create_new_stage: sim_utils.create_new_stage() if sim_cfg is None: gravity = (0.0, 0.0, -9.81) if gravity_enabled else (0.0, 0.0, 0.0) sim_cfg = SimulationCfg(device=device, dt=dt, gravity=gravity) sim = SimulationContext(sim_cfg) if visualizers: sim.set_setting("/isaaclab/visualizer/types", " ".join(visualizers)) if add_ground_plane: cfg = GroundPlaneCfg() cfg.func("/World/defaultGroundPlane", cfg) if add_lighting or (auto_add_lighting and (sim.get_setting("/isaaclab/has_gui") or visualizers)): cfg = DomeLightCfg( color=(0.1, 0.1, 0.1), enable_color_temperature=True, color_temperature=5500, intensity=10000 ) cfg.func(prim_path="/World/defaultDomeLight", cfg=cfg, translation=(0.0, 0.0, 10.0)) yield sim except Exception: logger.error(traceback.format_exc()) raise finally: if sim is not None: if not sim.get_setting("/isaaclab/has_gui"): sim.stop() sim.clear_instance()