# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause
from __future__ import annotations
import gc
import logging
import os
import traceback
from collections.abc import Iterator
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any
import toml
import torch
from pxr import Gf, Usd, UsdGeom, UsdPhysics, UsdUtils
import isaaclab.sim as sim_utils
import isaaclab.sim.utils.stage as stage_utils
from isaaclab.app.settings_manager import SettingsManager
from isaaclab.envs.utils.recording_hooks import run_recording_hooks_after_visualizers
from isaaclab.markers.vis_marker_registry import VisMarkerRegistry
from isaaclab.physics import BaseSceneDataProvider, PhysicsManager, SceneDataProvider
from isaaclab.physics.scene_data_requirements import (
SceneDataRequirement,
resolve_scene_data_requirements,
)
from isaaclab.renderers.render_context import RenderContext
from isaaclab.sim.utils import create_new_stage
from isaaclab.utils.string import clear_resolve_matching_names_cache
from isaaclab.utils.version import has_kit
from isaaclab.visualizers.base_visualizer import BaseVisualizer
if TYPE_CHECKING:
from isaaclab.cloner.clone_plan import ClonePlan
from .simulation_cfg import SimulationCfg
from .spawners import DomeLightCfg, GroundPlaneCfg
logger = logging.getLogger(__name__)
# Visualizer type names (CLI and config). App launcher parses CSV and stores as a space-separated setting.
_VISUALIZER_TYPES = ("newton", "rerun", "viser", "kit")
class SettingsHelper:
"""Helper for typed settings access via SettingsManager."""
def __init__(self, settings: SettingsManager):
self._settings = settings
def set(self, name: str, value: Any) -> None:
"""Set a setting with automatic type routing."""
if isinstance(value, bool):
self._settings.set_bool(name, value)
elif isinstance(value, int):
self._settings.set_int(name, value)
elif isinstance(value, float):
self._settings.set_float(name, value)
elif isinstance(value, str):
self._settings.set_string(name, value)
elif isinstance(value, (list, tuple)):
self._settings.set(name, value)
else:
raise ValueError(f"Unsupported value type for setting '{name}': {type(value)}")
def get(self, name: str) -> Any:
"""Get a setting value."""
return self._settings.get(name)
[docs]
class SimulationContext:
"""Controls simulation lifecycle including physics stepping and rendering.
This singleton class manages:
* Physics configuration (time-step, solver parameters via :class:`isaaclab.sim.SimulationCfg`)
* Simulation state (play, pause, step, stop)
* Rendering and visualization
The singleton instance can be accessed using the ``instance()`` class method.
"""
# SINGLETON PATTERN
_instance: SimulationContext | None = None
[docs]
def __new__(cls, cfg: SimulationCfg | None = None):
"""Enforce singleton pattern."""
if cls._instance is not None:
return cls._instance
return super().__new__(cls)
[docs]
@classmethod
def instance(cls) -> SimulationContext | None:
"""Get the singleton instance, or None if not created."""
return cls._instance
[docs]
def __init__(self, cfg: SimulationCfg | None = None):
"""Initialize the simulation context.
Args:
cfg: Simulation configuration. Defaults to None (uses default config).
"""
if type(self)._instance is not None:
return # Already initialized
# Store config
self.cfg = SimulationCfg() if cfg is None else cfg
# Get or create stage based on config
stage_cache = UsdUtils.StageCache.Get()
if self.cfg.create_stage_in_memory:
self.stage = create_new_stage()
else:
# Prefer the thread-local current stage (set by create_new_stage / test fixtures)
# over cache lookup, since the cache may contain stale stages from prior tests.
current = getattr(stage_utils._context, "stage", None)
if current is not None:
self.stage = current
else:
all_stages = stage_cache.GetAllStages() if stage_cache.Size() > 0 else [] # type: ignore[union-attr]
self.stage = all_stages[0] if all_stages else create_new_stage()
# Ensure stage is in the USD cache
stage_id = stage_cache.GetId(self.stage).ToLongInt() # type: ignore[union-attr]
if stage_id < 0:
stage_cache.Insert(self.stage) # type: ignore[union-attr]
# Set as current stage in thread-local context for get_current_stage()
stage_utils._context.stage = self.stage
# When Kit is running, attach the stage to Kit's USD context so that
# Kit extensions (PhysX views, Articulation, viewport) can discover it.
if has_kit():
import omni.usd
kit_context = omni.usd.get_context()
if kit_context is not None and kit_context.get_stage() is not self.stage:
kit_context.attach_stage_with_callback(stage_cache.GetId(self.stage).ToLongInt())
# Acquire settings interface (SettingsManager: standalone dict or Omniverse when available)
self.settings = SettingsManager.instance()
self._settings_helper = SettingsHelper(self.settings)
# Initialize USD physics scene and physics manager
self._init_usd_physics_scene()
# Normalize "cuda" -> "cuda:<id>" now that the USD physics scene is initialized
# and /physics/cudaDevice is available. Update cfg.device in-place so all
# downstream code (physics backends, assets, sensors) sees a consistent value.
if "cuda" in self.cfg.device and ":" not in self.cfg.device:
cuda_device = self.get_setting("/physics/cudaDevice")
device_id = max(0, int(cuda_device) if cuda_device is not None else 0)
self.cfg.device = f"cuda:{device_id}"
# Set default physics backend if not specified
if self.cfg.physics is None:
from isaaclab_physx.physics import PhysxCfg
self.cfg.physics = PhysxCfg()
self._physics = self.cfg.physics
# If physics is a PresetCfg wrapper (has a 'default' field but no 'class_type'),
# resolve to the default preset so downstream code always sees a concrete PhysicsCfg.
if not hasattr(self._physics, "class_type") and hasattr(self._physics, "default"):
self._physics = self._physics.default
self.cfg.physics = self._physics
self.physics_manager: type[PhysicsManager] = self._physics.class_type
self.physics_manager.initialize(self)
self._apply_render_cfg_settings()
# Initialize visualizer state (provider/visualizers are created lazily during initialize_visualizers()).
self._scene_data_provider: BaseSceneDataProvider | None = None
self._visualizers: list[BaseVisualizer] = []
self._scene_data_requirements = SceneDataRequirement()
# Per-group clone plans published by InteractiveScene after cloning. Providers (e.g.
# the Newton visualizer model rebuilder on a PhysX backend) consume these to derive
# their own backend args. Empty dict until :meth:`InteractiveScene.clone_environments`
# runs.
self._clone_plans: dict[str, ClonePlan] = {}
self._visualizer_step_counter = 0
# Default visualization dt used before/without visualizer initialization.
physics_dt = getattr(self.cfg.physics, "dt", None)
self._viz_dt = (physics_dt if physics_dt is not None else self.cfg.dt) * self.cfg.render_interval
# Cache commonly-used settings (these don't change during runtime)
self._has_gui = bool(self.get_setting("/isaaclab/has_gui"))
self._has_offscreen_render = bool(self.get_setting("/isaaclab/render/offscreen"))
self._xr_enabled = bool(self.get_setting("/isaaclab/xr/enabled"))
# Note: has_rtx_sensors is NOT cached because it changes when Camera sensors are created
self._pending_camera_view: tuple[tuple[float, float, float], tuple[float, float, float]] | None = None
self.vis_marker_registry = VisMarkerRegistry()
# Simulation state
self._is_playing = False
self._is_stopped = True
# Monotonic physics-step counter used by camera sensors for
self._physics_step_count: int = 0
# Monotonic render-generation counter. This increments whenever render()
# is executed and lets downstream camera freshness logic distinguish
# render/reset transitions that occur without advancing physics steps.
self._render_generation: int = 0
# Shared renderers for all Camera sensors (compatible renderer_cfg only).
self._render_context = RenderContext()
type(self)._instance = self # Mark as valid singleton only after successful init
def _apply_render_cfg_settings(self) -> None:
"""Apply render preset and overrides from SimulationCfg.render."""
# TODO: Refactor render preset + override handling to a dedicated RenderingQualityCfg
# (name subject to change) to keep quality profiles and carb mappings centralized.
render_cfg = getattr(self.cfg, "render", None)
if render_cfg is None:
return
# Priority:
# 1) CLI/AppLauncher setting if present, 2) SimulationCfg.render.rendering_mode.
rendering_mode = self.get_setting("/isaaclab/rendering/rendering_mode")
if not rendering_mode:
rendering_mode = getattr(render_cfg, "rendering_mode", None)
if rendering_mode:
supported_rendering_modes = {"performance", "balanced", "quality"}
if rendering_mode not in supported_rendering_modes:
raise ValueError(
f"RenderCfg rendering mode '{rendering_mode}' not in supported modes "
f"{sorted(supported_rendering_modes)}."
)
isaaclab_app_exp_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), *[".."] * 4, "apps")
from isaaclab.utils.version import get_isaac_sim_version
if get_isaac_sim_version().major < 6:
isaaclab_app_exp_path = os.path.join(isaaclab_app_exp_path, "isaacsim_5")
preset_filename = os.path.join(isaaclab_app_exp_path, f"rendering_modes/{rendering_mode}.kit")
if os.path.exists(preset_filename):
with open(preset_filename) as file:
preset_dict = toml.load(file)
def _apply_nested(data: dict[str, Any], path: str = "") -> None:
for key, value in data.items():
key_path = f"{path}/{key}" if path else f"/{key}"
if isinstance(value, dict):
_apply_nested(value, key_path)
else:
self.set_setting(key_path.replace(".", "/"), value)
_apply_nested(preset_dict)
else:
logger.warning("[SimulationContext] Render preset file not found: %s", preset_filename)
# RenderCfg fields mapped to setting paths (stored via SettingsManager)
field_to_setting = {
"enable_translucency": "/rtx/translucency/enabled",
"enable_reflections": "/rtx/reflections/enabled",
"enable_global_illumination": "/rtx/indirectDiffuse/enabled",
"enable_dlssg": "/rtx-transient/dlssg/enabled",
"enable_dl_denoiser": "/rtx-transient/dldenoiser/enabled",
"dlss_mode": "/rtx/post/dlss/execMode",
"enable_direct_lighting": "/rtx/directLighting/enabled",
"samples_per_pixel": "/rtx/directLighting/sampledLighting/samplesPerPixel",
"enable_shadows": "/rtx/shadows/enabled",
"enable_ambient_occlusion": "/rtx/ambientOcclusion/enabled",
"dome_light_upper_lower_strategy": "/rtx/domeLight/upperLowerStrategy",
}
for key, value in vars(render_cfg).items():
if value is None or key in {"rendering_mode", "carb_settings", "antialiasing_mode"}:
continue
setting_path = field_to_setting.get(key)
if setting_path is not None:
self.set_setting(setting_path, value)
# Raw overrides from render_cfg (stored via SettingsManager)
extra_settings = getattr(render_cfg, "carb_settings", None)
if extra_settings:
for key, value in extra_settings.items():
if "_" in key:
path = "/" + key.replace("_", "/")
elif "." in key:
path = "/" + key.replace(".", "/")
else:
path = key
self.set_setting(path, value)
# Optional anti-aliasing mode via Replicator (best-effort, may use Omniverse APIs)
antialiasing_mode = getattr(render_cfg, "antialiasing_mode", None)
if antialiasing_mode is not None:
try:
import omni.replicator.core as rep
rep.settings.set_render_rtx_realtime(antialiasing=antialiasing_mode)
except Exception:
pass
def _init_usd_physics_scene(self) -> None:
"""Create and configure the USD physics scene."""
cfg = self.cfg
with sim_utils.use_stage(self.stage):
# Set stage conventions for metric units
UsdGeom.SetStageUpAxis(self.stage, "Z")
UsdGeom.SetStageMetersPerUnit(self.stage, 1.0)
UsdPhysics.SetStageKilogramsPerUnit(self.stage, 1.0)
# Find and delete any existing physics scene.
# Collect paths first to avoid mutating the stage while traversing,
# which can invalidate the USD iterator.
physics_scene_paths = [
prim.GetPath().pathString for prim in self.stage.Traverse() if prim.GetTypeName() == "PhysicsScene"
]
for path in physics_scene_paths:
sim_utils.delete_prim(path, stage=self.stage)
# Create a new physics scene
if self.stage.GetPrimAtPath(cfg.physics_prim_path).IsValid():
raise RuntimeError(f"A prim already exists at path '{cfg.physics_prim_path}'.")
physics_scene = UsdPhysics.Scene.Define(self.stage, cfg.physics_prim_path)
# Pre-create gravity tensor to avoid torch heap corruption issues (torch 2.1+)
gravity = torch.tensor(cfg.gravity, dtype=torch.float32, device=self.cfg.device)
gravity_magnitude = torch.norm(gravity).item()
if gravity_magnitude == 0.0:
gravity_direction = [0.0, 0.0, -1.0]
else:
gravity_direction = (gravity / gravity_magnitude).tolist()
physics_scene.CreateGravityDirectionAttr(Gf.Vec3f(*gravity_direction))
physics_scene.CreateGravityMagnitudeAttr(gravity_magnitude)
@property
def physics_sim_view(self):
"""Returns the physics simulation view."""
return self.physics_manager.get_physics_sim_view()
@property
def device(self) -> str:
"""Returns the device on which the simulation is running."""
return self.physics_manager.get_device()
@property
def backend(self) -> str:
"""Returns the tensor backend being used ("numpy" or "torch")."""
return self.physics_manager.get_backend()
@property
def has_gui(self) -> bool:
"""Returns whether GUI is enabled (cached at init)."""
return self._has_gui
@property
def has_offscreen_render(self) -> bool:
"""Returns whether offscreen rendering is enabled (cached at init)."""
return self._has_offscreen_render
[docs]
def has_active_visualizers(self) -> bool:
"""Return whether any visualizer path is active for rendering/camera control."""
return bool(self.get_setting("/isaaclab/visualizer/types")) or bool(
self.get_setting("/isaaclab/video/auto_start_kit")
)
[docs]
def can_render_rgb_array(self) -> bool:
"""Return whether rgb-array rendering is currently available."""
return self.has_gui or self.has_offscreen_render or self.has_active_visualizers()
@property
def is_rendering(self) -> bool:
"""Returns whether rendering is active (GUI, RTX sensors, visualizers, or XR)."""
return (
self._has_gui
or self._has_offscreen_render
or self.get_setting("/isaaclab/render/rtx_sensors")
or bool(self.resolve_visualizer_types())
or self._xr_enabled
)
[docs]
def get_physics_dt(self) -> float:
"""Returns the physics time step."""
return self.physics_manager.get_physics_dt()
[docs]
def get_physics_step_count(self) -> int:
"""Return the monotonic physics step counter (incremented each :meth:`step`)."""
return self._physics_step_count
@property
def render_context(self) -> RenderContext:
"""Shared :class:`~isaaclab.renderers.render_context.RenderContext` for camera renderers."""
return self._render_context
@property
def render_generation(self) -> int:
"""Returns a monotonic counter for render() executions."""
return self._render_generation
def _create_default_visualizer_configs(self, requested_visualizers: list[str]) -> list:
"""Create default visualizer configs for requested types.
Loads only the requested visualizer submodule (e.g. isaaclab_visualizers.rerun)
so dependencies for other backends are not imported.
"""
import importlib
default_configs = []
cfg_class_names = {
"kit": "KitVisualizerCfg",
"newton": "NewtonVisualizerCfg",
"rerun": "RerunVisualizerCfg",
"viser": "ViserVisualizerCfg",
}
for viz_type in requested_visualizers:
try:
if viz_type not in _VISUALIZER_TYPES:
logger.warning(
f"[SimulationContext] Unknown visualizer type '{viz_type}' requested. "
f"Valid types: {', '.join(repr(t) for t in _VISUALIZER_TYPES)}. Skipping."
)
continue
mod = importlib.import_module(f"isaaclab_visualizers.{viz_type}")
cfg_cls = getattr(mod, cfg_class_names[viz_type])
default_configs.append(cfg_cls())
except (ImportError, ModuleNotFoundError) as exc:
# isaaclab_visualizers is optional; log once at warning level
if "isaaclab_visualizers" in str(exc):
logger.warning(
"[SimulationContext] Visualizer '%s' skipped: isaaclab_visualizers is not installed. "
"Install with: pip install isaaclab_visualizers[%s]",
viz_type,
viz_type,
)
else:
logger.error(
"[SimulationContext] Failed to create default config for visualizer '%s': %s",
viz_type,
exc,
)
except Exception as exc:
logger.error(f"[SimulationContext] Failed to create default config for visualizer '{viz_type}': {exc}")
return default_configs
def _get_cli_visualizer_types(self) -> list[str]:
"""Return list of visualizer types requested via CLI (setting)."""
requested = self.get_setting("/isaaclab/visualizer/types")
if not isinstance(requested, str) or not requested.strip():
return []
# App launcher writes this as a single string; accept comma and/or whitespace separators.
return [value for chunk in requested.split(",") for value in chunk.split() if value]
def _apply_visualizer_cli_overrides(self, visualizer_cfgs: list[Any]) -> None:
"""Apply ``--max_visible_envs`` to every resolved visualizer cfg when set in settings.
AppLauncher stores ``/isaaclab/visualizer/max_visible_envs`` as ``-1`` when the flag was
omitted; any non-negative int overrides :attr:`VisualizerCfg.max_visible_envs` on each cfg.
"""
raw = self.get_setting("/isaaclab/visualizer/max_visible_envs")
try:
max_visible = int(raw) if raw is not None else -1
except (TypeError, ValueError):
logger.warning("[SimulationContext] Invalid /isaaclab/visualizer/max_visible_envs: %r", raw)
return
if max_visible < 0:
return
for cfg in visualizer_cfgs:
if hasattr(cfg, "max_visible_envs"):
cfg.max_visible_envs = max_visible
def _is_cli_visualizer_explicit(self) -> bool:
"""Return ``True`` when visualizers were explicitly provided via CLI."""
return bool(self.get_setting("/isaaclab/visualizer/explicit"))
def _is_cli_visualizer_disable_all(self) -> bool:
"""Return ``True`` when CLI requested ``--viz none`` semantics."""
return bool(self.get_setting("/isaaclab/visualizer/disable_all"))
[docs]
def resolve_visualizer_types(self) -> list[str]:
"""Resolve visualizer types from config or CLI settings."""
if self._is_cli_visualizer_disable_all():
return []
if self._is_cli_visualizer_explicit():
return self._get_cli_visualizer_types()
visualizer_cfgs = self.cfg.visualizer_cfgs
if visualizer_cfgs is None:
return []
if not isinstance(visualizer_cfgs, list):
visualizer_cfgs = [visualizer_cfgs]
return [cfg.visualizer_type for cfg in visualizer_cfgs if getattr(cfg, "visualizer_type", None)]
def _resolve_visualizer_cfgs(self) -> list[Any]:
"""Resolve final visualizer configs from cfg and optional CLI override.
When visualizers are explicitly requested via ``--visualizer`` CLI flag,
a :class:`RuntimeError` is raised if any requested type cannot be
resolved (unknown type or missing package).
"""
visualizer_cfgs: list[Any] = []
if self.cfg.visualizer_cfgs is not None:
visualizer_cfgs = (
self.cfg.visualizer_cfgs if isinstance(self.cfg.visualizer_cfgs, list) else [self.cfg.visualizer_cfgs]
)
cli_requested = self._get_cli_visualizer_types()
cli_explicit = self._is_cli_visualizer_explicit()
cli_disable_all = self._is_cli_visualizer_disable_all()
if cli_disable_all:
resolved = []
elif not cli_explicit:
self._apply_visualizer_cli_overrides(visualizer_cfgs)
resolved = visualizer_cfgs
elif not visualizer_cfgs:
resolved = self._create_default_visualizer_configs(cli_requested) if cli_requested else []
self._apply_visualizer_cli_overrides(resolved)
else:
# CLI selection is explicit: keep only requested cfg types, then add defaults for missing.
cli_requested_set = set(cli_requested)
resolved = [cfg for cfg in visualizer_cfgs if getattr(cfg, "visualizer_type", None) in cli_requested_set]
existing_types = {getattr(cfg, "visualizer_type", None) for cfg in resolved}
for viz_type in cli_requested:
if viz_type not in existing_types and viz_type in _VISUALIZER_TYPES:
resolved.extend(self._create_default_visualizer_configs([viz_type]))
existing_types.add(viz_type)
self._apply_visualizer_cli_overrides(resolved)
# When visualizers were explicitly requested via CLI, verify all
# requested types were resolved. This catches unknown types and
# missing packages that _create_default_visualizer_configs silently
# skips.
if cli_explicit and cli_requested:
resolved_types = {getattr(cfg, "visualizer_type", None) for cfg in resolved}
missing = [t for t in cli_requested if t not in resolved_types]
if missing:
raise RuntimeError(
f"Explicitly requested visualizer(s) {missing} could not be configured. "
f"Valid types: {', '.join(repr(t) for t in _VISUALIZER_TYPES)}. "
"Ensure the required package is installed "
"(e.g., pip install isaaclab_visualizers[<type>])."
)
# XR auto-start: auto-inject a KitVisualizer when XR is active and no
# Kit visualizer is already present. The KitVisualizer pumps
# app.update() and triggers forward() (via requires_forward_before_step)
# to sync Fabric data so the XR runtime receives up-to-date hand/joint
# transforms each frame.
if self._xr_enabled and bool(self.get_setting("/isaaclab/xr/auto_start")):
has_kit = any(getattr(cfg, "visualizer_type", None) == "kit" for cfg in resolved)
if not has_kit:
try:
import importlib
mod = importlib.import_module("isaaclab_visualizers.kit")
kit_cfg_cls = getattr(mod, "KitVisualizerCfg")
resolved.append(kit_cfg_cls())
logger.info("[SimulationContext] Auto-injecting KitVisualizer for XR app-update pumping.")
except (ImportError, ModuleNotFoundError, AttributeError) as exc:
logger.warning(
"[SimulationContext] XR mode could not auto-inject a KitVisualizer: %s. "
"Install isaaclab_visualizers[kit] or pass --visualizer kit.",
exc,
)
return resolved
[docs]
def initialize_visualizers(self) -> None:
"""Initialize visualizers from SimulationCfg.visualizer_cfgs."""
if self._visualizers:
return
physics_dt = getattr(self.cfg.physics, "dt", None)
self._viz_dt = (physics_dt if physics_dt is not None else self.cfg.dt) * self.cfg.render_interval
visualizer_cfgs = self._resolve_visualizer_cfgs()
if not visualizer_cfgs:
return
cli_explicit = self._is_cli_visualizer_explicit()
# Resolve visualizer-driven requirements once and keep optional artifact payload untouched.
visualizer_types = [
cfg.visualizer_type for cfg in visualizer_cfgs if getattr(cfg, "visualizer_type", None) is not None
]
requirements = resolve_scene_data_requirements(visualizer_types=visualizer_types)
self._scene_data_requirements = requirements
self.initialize_scene_data_provider()
self._visualizers = []
for cfg in visualizer_cfgs:
try:
visualizer = cfg.create_visualizer()
visualizer.initialize(self._scene_data_provider)
self._visualizers.append(visualizer)
except Exception as exc:
if cli_explicit:
raise RuntimeError(
f"Visualizer '{cfg.visualizer_type}' was explicitly requested "
f"but failed to create or initialize: {exc}"
) from exc
logger.exception(
"Failed to initialize visualizer '%s' (%s): %s",
cfg.visualizer_type,
type(cfg).__name__,
exc,
)
# Replay any camera pose requested before visualizers were initialized.
pending = getattr(self, "_pending_camera_view", None)
if pending is not None:
eye, target = pending
for viz in self._visualizers:
viz.set_camera_view(eye, target)
self._pending_camera_view = None
if not self._visualizers and self._scene_data_provider is not None:
close_provider = getattr(self._scene_data_provider, "close", None)
if callable(close_provider):
close_provider()
self._scene_data_provider = None
def initialize_scene_data_provider(self) -> BaseSceneDataProvider:
if self._scene_data_provider is None:
self._scene_data_provider = SceneDataProvider(self.stage, self)
return self._scene_data_provider
[docs]
def get_scene_data_requirements(self) -> SceneDataRequirement:
"""Return scene-data requirements resolved from visualizers/renderers."""
return self._scene_data_requirements
[docs]
def update_scene_data_requirements(self, requirements: SceneDataRequirement) -> None:
"""Update scene-data requirements."""
self._scene_data_requirements = requirements
[docs]
def get_clone_plans(self) -> dict[str, ClonePlan]:
"""Return per-group clone plans published by the scene, keyed by destination template.
Set by :meth:`InteractiveScene.clone_environments` after replication. Consumed by
scene data providers that build backend models (e.g. Newton visualizer model on a
PhysX backend) from the same plan the cloner used. Empty dict until the scene clones.
"""
return self._clone_plans
[docs]
def set_clone_plans(self, plans: dict[str, ClonePlan]) -> None:
"""Set the cloner's per-group clone-plan map."""
self._clone_plans = plans
@property
def visualizers(self) -> list[BaseVisualizer]:
"""Returns the list of active visualizers."""
return self._visualizers
[docs]
def get_rendering_dt(self) -> float:
"""Return rendering dt, allowing visualizer-specific override."""
for viz in self._visualizers:
viz_dt = viz.get_rendering_dt()
if viz_dt is not None and viz_dt > 0:
return float(viz_dt)
return self._viz_dt
[docs]
def set_camera_view(self, eye: tuple, target: tuple) -> None:
"""Set camera view on all visualizers that support it."""
self._pending_camera_view = (tuple(eye), tuple(target))
for viz in self._visualizers:
viz.set_camera_view(eye, target)
[docs]
def forward(self) -> None:
"""Update kinematics without stepping physics."""
self.physics_manager.forward()
[docs]
def reset(self, soft: bool = False) -> None:
"""Reset the simulation.
Args:
soft: If True, skip full reinitialization.
"""
self.physics_manager.reset(soft)
for viz in self._visualizers:
viz.reset(soft)
# Start the timeline so the play button is pressed
self.physics_manager.play()
if not self._visualizers:
# Initialize visualizers after PhysX sim view is ready.
self.initialize_visualizers()
self._is_playing = True
self._is_stopped = False
[docs]
def step(self, render: bool = True) -> None:
"""Step physics and optionally render.
If the timeline is paused (e.g. via the GUI), this method blocks and keeps
the visualizer responsive until the timeline is resumed or stopped.
Args:
render: Whether to render the scene after stepping. Defaults to True.
"""
# Block while the GUI timeline is paused so the entire training loop freezes.
# See: https://github.com/isaac-sim/IsaacLab/issues/4279
self.physics_manager.wait_for_playing()
self._physics_step_count += 1
self.physics_manager.step()
if render and self.is_rendering:
self.render()
[docs]
def render(self, mode: int | None = None, skip_app_pumping: bool = False) -> None:
"""Update visualizers and render the scene.
Calls update_visualizers() so visualizers run at the render cadence (not at
every physics step). Camera sensors drive their configured renderer when
fetching data. Recording-related follow-up (Kit/RTX headless video, Newton GL
video, etc.) runs in :mod:`isaaclab.envs.utils.recording_hooks` so it is not tied to a
specific :class:`~isaaclab.physics.PhysicsManager` subclass.
**Kit vs. standalone visualizers:** The Kit app loop (``app.update()``) is the
only way to drive camera/RTX sensor rendering and viewport GUI updates; it
cannot be split into "cameras only" and "GUI only". Standalone visualizers
(Newton, Rerun, Viser) have self-contained ``step()`` methods that never call
``app.update()``, so they can run independently of camera rendering. The
``skip_app_pumping`` flag exploits this distinction: when True, Kit is skipped
while standalone visualizers continue to update.
Args:
mode: Unused. Kept for backward compatibility.
skip_app_pumping: When True, skip visualizers whose :meth:`~BaseVisualizer.pumps_app_update`
returns True (e.g. KitVisualizer). This disables the Kit app loop and camera
updates while still stepping standalone visualizers (Newton, Rerun, Viser).
Used by environment ``step()`` when ``render_enabled`` is False.
"""
self.physics_manager.pre_render()
self.update_visualizers(self.get_rendering_dt(), skip_app_pumping=skip_app_pumping)
self.physics_manager.after_visualizers_render()
run_recording_hooks_after_visualizers(self)
self._render_generation += 1
# Call render callbacks
if hasattr(self, "_render_callbacks"):
for callback in self._render_callbacks.values():
callback(None) # Pass None as event data
[docs]
def update_visualizers(self, dt: float, skip_app_pumping: bool = False) -> None:
"""Update visualizers without triggering renderer/GUI.
Args:
dt: Simulation time-step in seconds.
skip_app_pumping: When True, skip visualizers whose :meth:`~BaseVisualizer.pumps_app_update`
returns True (e.g. KitVisualizer). This is used when the environment's ``render_enabled``
flag is False — cameras and the Kit app loop are skipped, but standalone visualizers
(Newton, Rerun, Viser) still receive updates.
"""
if not self._visualizers:
return
self.update_scene_data_provider()
# Marker callbacks update VisualizationMarkers state; visualizer step()
# consumes that state later in this method.
if any(viz.supports_markers() for viz in self._visualizers):
self.vis_marker_registry.dispatch_callbacks()
visualizers_to_remove = []
for viz in self._visualizers:
try:
# When skip_app_pumping is set, skip Kit-like visualizers that call app.update()
if skip_app_pumping and viz.pumps_app_update():
continue
if viz.is_closed or not viz.is_running():
if viz.is_closed:
logger.info("Visualizer closed: %s", type(viz).__name__)
else:
logger.info("Visualizer not running: %s", type(viz).__name__)
visualizers_to_remove.append(viz)
continue
if viz.is_rendering_paused():
# Keep non-Kit visualizer event loops responsive while rendering is paused.
# Newton/Rerun/Viser need step(0.0) so GL/UI can process input (e.g. Resume).
# Kit is skipped: step() would call app.update(), which must not run during pause.
if not viz.pumps_app_update():
viz.step(0.0)
continue
while viz.is_training_paused() and viz.is_running():
viz.step(0.0)
viz.step(dt)
except Exception as exc:
logger.error("Error stepping visualizer '%s': %s", type(viz).__name__, exc)
visualizers_to_remove.append(viz)
for viz in visualizers_to_remove:
try:
viz.close()
self._visualizers.remove(viz)
logger.info("Removed visualizer: %s", type(viz).__name__)
except Exception as exc:
logger.error("Error closing visualizer: %s", exc)
def update_scene_data_provider(self, force_require_forward: bool = False):
if force_require_forward or self._should_forward_before_visualizer_update():
self.physics_manager.forward()
self._visualizer_step_counter += 1
if self._scene_data_provider is None:
return
self._scene_data_provider.update()
def _should_forward_before_visualizer_update(self) -> bool:
"""Return True if any visualizer requires pre-step forward kinematics."""
return any(viz.requires_forward_before_step() for viz in self._visualizers)
[docs]
def play(self) -> None:
"""Start or resume the simulation."""
self.physics_manager.play()
for viz in self._visualizers:
viz.play()
self._is_playing = True
self._is_stopped = False
[docs]
def pause(self) -> None:
"""Pause the simulation (can be resumed with play)."""
self.physics_manager.pause()
for viz in self._visualizers:
viz.pause()
self._is_playing = False
[docs]
def stop(self) -> None:
"""Stop the simulation completely."""
self.physics_manager.stop()
for viz in self._visualizers:
viz.stop()
self._is_playing = False
self._is_stopped = True
[docs]
def is_playing(self) -> bool:
"""Returns True if simulation is playing (not paused or stopped)."""
return self._is_playing
[docs]
def is_stopped(self) -> bool:
"""Returns True if simulation is stopped (not just paused)."""
return self._is_stopped
[docs]
def set_setting(self, name: str, value: Any) -> None:
"""Set a setting value."""
self._settings_helper.set(name, value)
[docs]
def get_setting(self, name: str) -> Any:
"""Get a setting value."""
return self._settings_helper.get(name)
[docs]
@classmethod
def clear_instance(cls) -> None:
"""Clean up resources and clear the singleton instance."""
if cls._instance is not None:
# Close physics manager FIRST to detach PhysX from the stage
# This must happen before clearing USD prims to avoid PhysX cleanup errors
cls._instance.physics_manager.close()
# Close all visualizers
for viz in cls._instance._visualizers:
viz.close()
cls._instance._visualizers.clear()
if cls._instance._scene_data_provider is not None:
close_provider = getattr(cls._instance._scene_data_provider, "close", None)
if callable(close_provider):
close_provider()
cls._instance._scene_data_provider = None
# Tear down the stage. We skip clear_stage() (prim-by-prim deletion) since
# close_stage() + app shutdown destroy the entire stage at once.
stage_utils.close_stage()
# Discard cached name-resolution data from destroyed assets
clear_resolve_matching_names_cache()
# Clear instance
cls._instance = None
gc.collect()
logger.info("SimulationContext cleared")
[docs]
@classmethod
def clear_stage(cls) -> None:
"""Clear the current USD stage (preserving /World and PhysicsScene).
Uses a predicate that preserves /World and PhysicsScene while also
respecting the default deletability checks (ancestral prims, etc.).
"""
if cls._instance is None:
return
def _predicate(prim: Usd.Prim) -> bool:
path = prim.GetPath().pathString
if path == "/World":
return False
if prim.GetTypeName() == "PhysicsScene":
return False
return True
sim_utils.clear_stage(predicate=_predicate)
@contextmanager
def build_simulation_context(
create_new_stage: bool = True,
gravity_enabled: bool = True,
device: str = "cuda:0",
dt: float = 0.01,
sim_cfg: SimulationCfg | None = None,
add_ground_plane: bool = False,
add_lighting: bool = False,
auto_add_lighting: bool = False,
visualizers: list[str] | None = None,
) -> Iterator[SimulationContext]:
"""Context manager to build a simulation context with the provided settings.
Args:
create_new_stage: Whether to create a new stage. Defaults to True.
gravity_enabled: Whether to enable gravity. Defaults to True.
device: Device to run the simulation on. Defaults to "cuda:0".
dt: Time step for the simulation. Defaults to 0.01.
sim_cfg: SimulationCfg to use. Defaults to None.
add_ground_plane: Whether to add a ground plane. Defaults to False.
add_lighting: Whether to add a dome light. Defaults to False.
auto_add_lighting: Whether to auto-add lighting if GUI present. Defaults to False.
visualizers: List of visualizer backend keys to enable (e.g. ``["kit", "newton", "rerun"]``).
Valid types: ``"kit"``, ``"newton"``, ``"rerun"``, ``"viser"``.
When provided, sets the ``/isaaclab/visualizer/types`` setting so the
existing visualizer resolution machinery picks them up. Defaults to None.
Yields:
The simulation context to use for the simulation.
"""
sim: SimulationContext | None = None
try:
if create_new_stage:
sim_utils.create_new_stage()
if sim_cfg is None:
gravity = (0.0, 0.0, -9.81) if gravity_enabled else (0.0, 0.0, 0.0)
sim_cfg = SimulationCfg(device=device, dt=dt, gravity=gravity)
sim = SimulationContext(sim_cfg)
if visualizers:
sim.set_setting("/isaaclab/visualizer/types", " ".join(visualizers))
if add_ground_plane:
cfg = GroundPlaneCfg()
cfg.func("/World/defaultGroundPlane", cfg)
if add_lighting or (auto_add_lighting and (sim.get_setting("/isaaclab/has_gui") or visualizers)):
cfg = DomeLightCfg(
color=(0.1, 0.1, 0.1), enable_color_temperature=True, color_temperature=5500, intensity=10000
)
cfg.func(prim_path="/World/defaultDomeLight", cfg=cfg, translation=(0.0, 0.0, 10.0))
yield sim
except Exception:
logger.error(traceback.format_exc())
raise
finally:
if sim is not None:
if not sim.get_setting("/isaaclab/has_gui"):
sim.stop()
sim.clear_instance()