Useful Snippets#

Various examples of Isaac Sim Replicator snippets that can be run as Standalone Applications or from the UI using the Script Editor.

Annotator and Custom Writer Data from Multiple Cameras#

Example on how to access data from multiple cameras in a scene using annotators or custom writers. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):

./python.sh standalone_examples/api/isaacsim.replicator.examples/multi_camera.py
Annotator and Custom Writer Data from Multiple Cameras
  1from isaacsim import SimulationApp
  2
  3simulation_app = SimulationApp(launch_config={"headless": False})
  4
  5import os
  6import omni.usd
  7import omni.kit
  8import omni.replicator.core as rep
  9from omni.replicator.core import AnnotatorRegistry, Writer
 10from PIL import Image
 11from pxr import UsdGeom, Sdf
 12
 13NUM_FRAMES = 5
 14
 15# Save rgb image to file
 16def save_rgb(rgb_data, file_name):
 17    rgb_img = Image.fromarray(rgb_data, "RGBA")
 18    rgb_img.save(file_name + ".png")
 19
 20
 21# Randomize cube color every frame using a replicator randomizer
 22def cube_color_randomizer():
 23    cube_prims = rep.get.prims(path_pattern="Cube")
 24    with cube_prims:
 25        rep.randomizer.color(colors=rep.distribution.uniform((0, 0, 0), (1, 1, 1)))
 26    return cube_prims.node
 27
 28
 29# Access data through a custom replicator writer
 30class MyWriter(Writer):
 31    def __init__(self, rgb: bool = True):
 32        self._frame_id = 0
 33        if rgb:
 34            self.annotators.append(AnnotatorRegistry.get_annotator("rgb"))
 35        # Create writer output directory
 36        self.file_path = os.path.join(os.getcwd(), "_out_mc_writer", "")
 37        print(f"Writing writer data to {self.file_path}")
 38        dir = os.path.dirname(self.file_path)
 39        os.makedirs(dir, exist_ok=True)
 40
 41    def write(self, data):
 42        for annotator in data.keys():
 43            annotator_split = annotator.split("-")
 44            if len(annotator_split) > 1:
 45                render_product_name = annotator_split[-1]
 46            if annotator.startswith("rgb"):
 47                save_rgb(data[annotator], f"{self.file_path}/{render_product_name}_frame_{self._frame_id}")
 48        self._frame_id += 1
 49
 50
 51rep.WriterRegistry.register(MyWriter)
 52
 53# Create a new stage with a dome light
 54omni.usd.get_context().new_stage()
 55stage = omni.usd.get_context().get_stage()
 56dome_light = stage.DefinePrim("/World/DomeLight", "DomeLight")
 57dome_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(900.0)
 58
 59# Create cube
 60cube_prim = stage.DefinePrim("/World/Cube", "Cube")
 61UsdGeom.Xformable(cube_prim).AddTranslateOp().Set((0.0, 5.0, 1.0))
 62
 63# Register cube color randomizer to trigger on every frame
 64rep.randomizer.register(cube_color_randomizer)
 65with rep.trigger.on_frame():
 66    rep.randomizer.cube_color_randomizer()
 67
 68# Create cameras
 69camera_prim1 = stage.DefinePrim("/World/Camera1", "Camera")
 70UsdGeom.Xformable(camera_prim1).AddTranslateOp().Set((0.0, 10.0, 20.0))
 71UsdGeom.Xformable(camera_prim1).AddRotateXYZOp().Set((-15.0, 0.0, 0.0))
 72
 73camera_prim2 = stage.DefinePrim("/World/Camera2", "Camera")
 74UsdGeom.Xformable(camera_prim2).AddTranslateOp().Set((-10.0, 15.0, 15.0))
 75UsdGeom.Xformable(camera_prim2).AddRotateXYZOp().Set((-45.0, 0.0, 45.0))
 76
 77# Create render products
 78rp1 = rep.create.render_product(str(camera_prim1.GetPrimPath()), resolution=(320, 320))
 79rp2 = rep.create.render_product(str(camera_prim2.GetPrimPath()), resolution=(640, 640))
 80rp3 = rep.create.render_product("/OmniverseKit_Persp", (1024, 1024))
 81
 82# Access the data through a custom writer
 83writer = rep.WriterRegistry.get("MyWriter")
 84writer.initialize(rgb=True)
 85writer.attach([rp1, rp2, rp3])
 86
 87# Access the data through annotators
 88rgb_annotators = []
 89for rp in [rp1, rp2, rp3]:
 90    rgb = rep.AnnotatorRegistry.get_annotator("rgb")
 91    rgb.attach(rp)
 92    rgb_annotators.append(rgb)
 93
 94# Create annotator output directory
 95file_path = os.path.join(os.getcwd(), "_out_mc_annot", "")
 96print(f"Writing annotator data to {file_path}")
 97dir = os.path.dirname(file_path)
 98os.makedirs(dir, exist_ok=True)
 99
100# Data will be captured manually using step
101rep.orchestrator.set_capture_on_play(False)
102
103for i in range(NUM_FRAMES):
104    # The step function provides new data to the annotators, triggers the randomizers and the writer
105    rep.orchestrator.step(rt_subframes=4)
106    for j, rgb_annot in enumerate(rgb_annotators):
107        save_rgb(rgb_annot.get_data(), f"{dir}/rp{j}_step_{i}")
108
109simulation_app.close()
Annotator and Custom Writer Data from Multiple Cameras
  1import asyncio
  2import os
  3import omni.usd
  4import omni.kit
  5import omni.replicator.core as rep
  6from omni.replicator.core import AnnotatorRegistry, Writer
  7from PIL import Image
  8from pxr import UsdGeom, Sdf
  9
 10NUM_FRAMES = 5
 11
 12# Save rgb image to file
 13def save_rgb(rgb_data, file_name):
 14    rgb_img = Image.fromarray(rgb_data, "RGBA")
 15    rgb_img.save(file_name + ".png")
 16
 17
 18# Randomize cube color every frame using a replicator randomizer
 19def cube_color_randomizer():
 20    cube_prims = rep.get.prims(path_pattern="Cube")
 21    with cube_prims:
 22        rep.randomizer.color(colors=rep.distribution.uniform((0, 0, 0), (1, 1, 1)))
 23    return cube_prims.node
 24
 25
 26# Access data through a custom replicator writer
 27class MyWriter(Writer):
 28    def __init__(self, rgb: bool = True):
 29        self._frame_id = 0
 30        if rgb:
 31            self.annotators.append(AnnotatorRegistry.get_annotator("rgb"))
 32        # Create writer output directory
 33        self.file_path = os.path.join(os.getcwd(), "_out_mc_writer", "")
 34        print(f"Writing writer data to {self.file_path}")
 35        dir = os.path.dirname(self.file_path)
 36        os.makedirs(dir, exist_ok=True)
 37
 38    def write(self, data):
 39        for annotator in data.keys():
 40            annotator_split = annotator.split("-")
 41            if len(annotator_split) > 1:
 42                render_product_name = annotator_split[-1]
 43            if annotator.startswith("rgb"):
 44                save_rgb(data[annotator], f"{self.file_path}/{render_product_name}_frame_{self._frame_id}")
 45        self._frame_id += 1
 46
 47
 48rep.WriterRegistry.register(MyWriter)
 49
 50# Create a new stage with a dome light
 51omni.usd.get_context().new_stage()
 52stage = omni.usd.get_context().get_stage()
 53dome_light = stage.DefinePrim("/World/DomeLight", "DomeLight")
 54dome_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(900.0)
 55
 56# Create cube
 57cube_prim = stage.DefinePrim("/World/Cube", "Cube")
 58UsdGeom.Xformable(cube_prim).AddTranslateOp().Set((0.0, 5.0, 1.0))
 59
 60# Register cube color randomizer to trigger on every frame
 61rep.randomizer.register(cube_color_randomizer)
 62with rep.trigger.on_frame():
 63    rep.randomizer.cube_color_randomizer()
 64
 65# Create cameras
 66camera_prim1 = stage.DefinePrim("/World/Camera1", "Camera")
 67UsdGeom.Xformable(camera_prim1).AddTranslateOp().Set((0.0, 10.0, 20.0))
 68UsdGeom.Xformable(camera_prim1).AddRotateXYZOp().Set((-15.0, 0.0, 0.0))
 69
 70camera_prim2 = stage.DefinePrim("/World/Camera2", "Camera")
 71UsdGeom.Xformable(camera_prim2).AddTranslateOp().Set((-10.0, 15.0, 15.0))
 72UsdGeom.Xformable(camera_prim2).AddRotateXYZOp().Set((-45.0, 0.0, 45.0))
 73
 74# Create render products
 75rp1 = rep.create.render_product(str(camera_prim1.GetPrimPath()), resolution=(320, 320))
 76rp2 = rep.create.render_product(str(camera_prim2.GetPrimPath()), resolution=(640, 640))
 77rp3 = rep.create.render_product("/OmniverseKit_Persp", (1024, 1024))
 78
 79# Access the data through a custom writer
 80writer = rep.WriterRegistry.get("MyWriter")
 81writer.initialize(rgb=True)
 82writer.attach([rp1, rp2, rp3])
 83
 84# Access the data through annotators
 85rgb_annotators = []
 86for rp in [rp1, rp2, rp3]:
 87    rgb = rep.AnnotatorRegistry.get_annotator("rgb")
 88    rgb.attach(rp)
 89    rgb_annotators.append(rgb)
 90
 91# Create annotator output directory
 92file_path = os.path.join(os.getcwd(), "_out_mc_annot", "")
 93print(f"Writing annotator data to {file_path}")
 94dir = os.path.dirname(file_path)
 95os.makedirs(dir, exist_ok=True)
 96
 97# Data will be captured manually using step
 98rep.orchestrator.set_capture_on_play(False)
 99
100async def run_example_async():
101    for i in range(NUM_FRAMES):
102        # The step function provides new data to the annotators, triggers the randomizers and the writer
103        await rep.orchestrator.step_async(rt_subframes=4)
104        for j, rgb_annot in enumerate(rgb_annotators):
105            save_rgb(rgb_annot.get_data(), f"{dir}/rp{j}_step_{i}")
106
107
108asyncio.ensure_future(run_example_async())

Synthetic Data Access at Specific Simulation Timepoints#

Example on how to access synthetic data (rgb, semantic segmentation) from multiple cameras in a simulation scene at specific events using annotators or writers. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):

./python.sh standalone_examples/api/isaacsim.replicator.examples/simulation_get_data.py
Synthetic Data Access at Specific Simulation Timepoints
 1from isaacsim import SimulationApp
 2
 3simulation_app = SimulationApp(launch_config={"renderer": "RayTracedLighting", "headless": False})
 4
 5import json
 6import os
 7
 8import carb.settings
 9import numpy as np
10import omni
11import omni.replicator.core as rep
12from isaacsim.core.api import World
13from isaacsim.core.api.objects import DynamicCuboid
14from isaacsim.core.utils.semantics import add_update_semantics
15from PIL import Image
16
17
18# Util function to save rgb annotator data
19def write_rgb_data(rgb_data, file_path):
20    rgb_img = Image.fromarray(rgb_data, "RGBA")
21    rgb_img.save(file_path + ".png")
22
23
24# Util function to save semantic segmentation annotator data
25def write_sem_data(sem_data, file_path):
26    id_to_labels = sem_data["info"]["idToLabels"]
27    with open(file_path + ".json", "w") as f:
28        json.dump(id_to_labels, f)
29    sem_image_data = np.frombuffer(sem_data["data"], dtype=np.uint8).reshape(*sem_data["data"].shape, -1)
30    sem_img = Image.fromarray(sem_image_data, "RGBA")
31    sem_img.save(file_path + ".png")
32
33
34# Create a new stage with the default ground plane
35omni.usd.get_context().new_stage()
36
37# Setup the simulation world
38world = World()
39world.scene.add_default_ground_plane()
40world.reset()
41
42# Setting capture on play to False will prevent the replicator from capturing data each frame
43carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False)
44
45# Create a camera and render product to collect the data from
46cam = rep.create.camera(position=(5, 5, 5), look_at=(0, 0, 0))
47rp = rep.create.render_product(cam, (512, 512))
48
49# Set the output directory for the data
50out_dir = os.path.join(os.getcwd(), "_out_sim_event")
51os.makedirs(out_dir, exist_ok=True)
52print(f"Outputting data to {out_dir}..")
53
54# Example of using a writer to save the data
55writer = rep.WriterRegistry.get("BasicWriter")
56writer.initialize(
57    output_dir=f"{out_dir}/writer", rgb=True, semantic_segmentation=True, colorize_semantic_segmentation=True
58)
59writer.attach(rp)
60
61# Run a preview to ensure the replicator graph is initialized
62rep.orchestrator.preview()
63
64# Example of accessing the data directly from annotators
65rgb_annot = rep.AnnotatorRegistry.get_annotator("rgb")
66rgb_annot.attach(rp)
67sem_annot = rep.AnnotatorRegistry.get_annotator("semantic_segmentation", init_params={"colorize": True})
68sem_annot.attach(rp)
69
70# Spawn and drop a few cubes, capture data when they stop moving
71for i in range(5):
72    cuboid = world.scene.add(DynamicCuboid(prim_path=f"/World/Cuboid_{i}", name=f"Cuboid_{i}", position=(0, 0, 10 + i)))
73    add_update_semantics(cuboid.prim, "Cuboid")
74
75    for s in range(500):
76        world.step(render=False)
77        vel = np.linalg.norm(cuboid.get_linear_velocity())
78        if vel < 0.1:
79            print(f"Cube_{i} stopped moving after {s} simulation steps, writing data..")
80            # Trigger the writer and update the annotators with new data
81            rep.orchestrator.step(rt_subframes=4, delta_time=0.0, pause_timeline=False)
82            write_rgb_data(rgb_annot.get_data(), f"{out_dir}/Cube_{i}_step_{s}_rgb")
83            write_sem_data(sem_annot.get_data(), f"{out_dir}/Cube_{i}_step_{s}_sem")
84            break
85
86simulation_app.close()
Synthetic Data Access at Specific Simulation Timepoints
 1import asyncio
 2import json
 3import os
 4
 5import carb.settings
 6import numpy as np
 7import omni
 8import omni.replicator.core as rep
 9from isaacsim.core.api import World
10from isaacsim.core.api.objects import DynamicCuboid
11from isaacsim.core.utils.semantics import add_update_semantics
12from PIL import Image
13
14
15# Util function to save rgb annotator data
16def write_rgb_data(rgb_data, file_path):
17    rgb_img = Image.fromarray(rgb_data, "RGBA")
18    rgb_img.save(file_path + ".png")
19
20
21# Util function to save semantic segmentation annotator data
22def write_sem_data(sem_data, file_path):
23    id_to_labels = sem_data["info"]["idToLabels"]
24    with open(file_path + ".json", "w") as f:
25        json.dump(id_to_labels, f)
26    sem_image_data = np.frombuffer(sem_data["data"], dtype=np.uint8).reshape(*sem_data["data"].shape, -1)
27    sem_img = Image.fromarray(sem_image_data, "RGBA")
28    sem_img.save(file_path + ".png")
29
30
31# Create a new stage with the default ground plane
32omni.usd.get_context().new_stage()
33
34# Setup the simulation world
35world = World()
36world.scene.add_default_ground_plane()
37
38
39# Setting capture on play to False will prevent the replicator from capturing data each frame
40carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False)
41
42# Create a camera and render product to collect the data from
43cam = rep.create.camera(position=(5, 5, 5), look_at=(0, 0, 0))
44rp = rep.create.render_product(cam, (512, 512))
45
46# Set the output directory for the data
47out_dir = os.path.join(os.getcwd(), "_out_sim_event")
48os.makedirs(out_dir, exist_ok=True)
49print(f"Outputting data to {out_dir}..")
50
51# Example of using a writer to save the data
52writer = rep.WriterRegistry.get("BasicWriter")
53writer.initialize(
54    output_dir=f"{out_dir}/writer", rgb=True, semantic_segmentation=True, colorize_semantic_segmentation=True
55)
56writer.attach(rp)
57
58# Run a preview to ensure the replicator graph is initialized
59rep.orchestrator.preview()
60
61# Example of accessing the data directly from annotators
62rgb_annot = rep.AnnotatorRegistry.get_annotator("rgb")
63rgb_annot.attach(rp)
64sem_annot = rep.AnnotatorRegistry.get_annotator("semantic_segmentation", init_params={"colorize": True})
65sem_annot.attach(rp)
66
67
68async def run_example_async():
69    await world.initialize_simulation_context_async()
70    await world.reset_async()
71
72    # Spawn and drop a few cubes, capture data when they stop moving
73    for i in range(5):
74        cuboid = world.scene.add(
75            DynamicCuboid(prim_path=f"/World/Cuboid_{i}", name=f"Cuboid_{i}", position=(0, 0, 10 + i))
76        )
77        add_update_semantics(cuboid.prim, "Cuboid")
78
79        for s in range(500):
80            await omni.kit.app.get_app().next_update_async()
81            vel = np.linalg.norm(cuboid.get_linear_velocity())
82            if vel < 0.1:
83                print(f"Cube_{i} stopped moving after {s} simulation steps, writing data..")
84                # Trigger the writer and update the annotators with new data
85                await rep.orchestrator.step_async(rt_subframes=4, delta_time=0.0, pause_timeline=False)
86                write_rgb_data(rgb_annot.get_data(), f"{out_dir}/Cube_{i}_step_{s}_rgb")
87                write_sem_data(sem_annot.get_data(), f"{out_dir}/Cube_{i}_step_{s}_sem")
88                break
89
90
91asyncio.ensure_future(run_example_async())

Custom Event Randomization and Writing#

The following example showcases the use of custom events to trigger randomizations and data writing at various times throughout the simulation. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):

./python.sh standalone_examples/api/isaacsim.replicator.examples/custom_event_and_write.py
Custom Event Randomization and Writing
 1from isaacsim import SimulationApp
 2
 3simulation_app = SimulationApp(launch_config={"headless": False})
 4
 5import os
 6
 7import omni.replicator.core as rep
 8import omni.usd
 9
10omni.usd.get_context().new_stage()
11distance_light = rep.create.light(rotation=(315, 0, 0), intensity=4000, light_type="distant")
12
13large_cube = rep.create.cube(scale=1.25, position=(1, 1, 0))
14small_cube = rep.create.cube(scale=0.75, position=(-1, -1, 0))
15large_cube_prim = large_cube.get_output_prims()["prims"][0]
16small_cube_prim = small_cube.get_output_prims()["prims"][0]
17
18rp = rep.create.render_product("/OmniverseKit_Persp", (512, 512))
19writer = rep.WriterRegistry.get("BasicWriter")
20out_dir = os.path.join(os.getcwd(), "_out_custom_event")
21print(f"Writing data to {out_dir}")
22writer.initialize(output_dir=out_dir, rgb=True)
23writer.attach(rp)
24
25with rep.trigger.on_custom_event(event_name="randomize_large_cube"):
26    with large_cube:
27        rep.randomizer.rotation()
28
29with rep.trigger.on_custom_event(event_name="randomize_small_cube"):
30    with small_cube:
31        rep.randomizer.rotation()
32
33
34def run_example():
35    print(f"Randomizing small cube")
36    rep.utils.send_og_event(event_name="randomize_small_cube")
37    print("Capturing frame")
38    rep.orchestrator.step(rt_subframes=8)
39
40    print("Moving small cube")
41    small_cube_prim.GetAttribute("xformOp:translate").Set((-2, -2, 0))
42    print("Capturing frame")
43    rep.orchestrator.step(rt_subframes=8)
44
45    print(f"Randomizing large cube")
46    rep.utils.send_og_event(event_name="randomize_large_cube")
47    print("Capturing frame")
48    rep.orchestrator.step(rt_subframes=8)
49
50    print("Moving large cube")
51    large_cube_prim.GetAttribute("xformOp:translate").Set((2, 2, 0))
52    print("Capturing frame")
53    rep.orchestrator.step(rt_subframes=8)
54
55    # Wait until all the data is saved to disk
56    rep.orchestrator.wait_until_complete()
57
58
59run_example()
60
61simulation_app.close()
Custom Event Randomization and Writing
 1import asyncio
 2import os
 3
 4import omni.replicator.core as rep
 5import omni.usd
 6
 7omni.usd.get_context().new_stage()
 8distance_light = rep.create.light(rotation=(315, 0, 0), intensity=4000, light_type="distant")
 9
10large_cube = rep.create.cube(scale=1.25, position=(1, 1, 0))
11small_cube = rep.create.cube(scale=0.75, position=(-1, -1, 0))
12large_cube_prim = large_cube.get_output_prims()["prims"][0]
13small_cube_prim = small_cube.get_output_prims()["prims"][0]
14
15rp = rep.create.render_product("/OmniverseKit_Persp", (512, 512))
16writer = rep.WriterRegistry.get("BasicWriter")
17out_dir = os.path.join(os.getcwd(), "_out_custom_event")
18print(f"Writing data to {out_dir}")
19writer.initialize(output_dir=out_dir, rgb=True)
20writer.attach(rp)
21
22with rep.trigger.on_custom_event(event_name="randomize_large_cube"):
23    with large_cube:
24        rep.randomizer.rotation()
25
26with rep.trigger.on_custom_event(event_name="randomize_small_cube"):
27    with small_cube:
28        rep.randomizer.rotation()
29
30
31async def run_example_async():
32    print(f"Randomizing small cube")
33    rep.utils.send_og_event(event_name="randomize_small_cube")
34    print("Capturing frame")
35    await rep.orchestrator.step_async(rt_subframes=8)
36
37    print("Moving small cube")
38    small_cube_prim.GetAttribute("xformOp:translate").Set((-2, -2, 0))
39    print("Capturing frame")
40    await rep.orchestrator.step_async(rt_subframes=8)
41
42    print(f"Randomizing large cube")
43    rep.utils.send_og_event(event_name="randomize_large_cube")
44    print("Capturing frame")
45    await rep.orchestrator.step_async(rt_subframes=8)
46
47    print("Moving large cube")
48    large_cube_prim.GetAttribute("xformOp:translate").Set((2, 2, 0))
49    print("Capturing frame")
50    await rep.orchestrator.step_async(rt_subframes=8)
51
52    # Wait until all the data is saved to disk
53    await rep.orchestrator.wait_until_complete_async()
54
55
56asyncio.ensure_future(run_example_async())

Motion Blur#

This example demonstrates how to capture motion blur data using RTX Real-Time and RTX Interactive (Path Tracing) rendering modes. For the RTX - Real-Time mode, details on the motion blur parameters can be found here. For the RTX – Interactive (Path Tracing) mode, motion blur is achieved by rendering multiple subframes (/omni/replicator/pathTracedMotionBlurSubSamples) and combining them to create the effect. The example uses animated and physics-enabled assets with synchronized motion. Keyframe animated assets can be advanced at any custom delta time due to their interpolated motion, whereas physics-enabled assets require a custom physics FPS to ensure motion samples at any custom delta time. The example showcases how to compute the target physics FPS, change it if needed, and restore the original physics FPS after capturing the motion blur.

The standalone example can also be run directly (on Windows use python.bat instead of python.sh):

./python.sh standalone_examples/api/isaacsim.replicator.examples/motion_blur.py
Motion Blur
  1from isaacsim import SimulationApp
  2
  3simulation_app = SimulationApp({"headless": False})
  4
  5import os
  6
  7import carb.settings
  8import omni.kit.app
  9import omni.replicator.core as rep
 10import omni.timeline
 11import omni.usd
 12from isaacsim.storage.native import get_assets_root_path
 13from pxr import PhysxSchema, Sdf, UsdGeom, UsdPhysics
 14
 15# Paths to the animated and physics-ready assets
 16PHYSICS_ASSET_URL = "/Isaac/Props/YCB/Axis_Aligned_Physics/003_cracker_box.usd"
 17ANIM_ASSET_URL = "/Isaac/Props/YCB/Axis_Aligned/003_cracker_box.usd"
 18
 19# -z velocities and start locations of the animated (left side) and physics (right side) assets (stage units/s)
 20ASSET_VELOCITIES = [0, 5, 10]
 21ASSET_X_MIRRORED_LOCATIONS = [(0.5, 0, 0.3), (0.3, 0, 0.3), (0.1, 0, 0.3)]
 22
 23# Used to calculate how many frames to animate the assets to maintain the same velocity as the physics assets
 24ANIMATION_DURATION = 10
 25
 26# Create a new stage with animated and physics-enabled assets with synchronized motion
 27def setup_stage():
 28    # Create new stage
 29    omni.usd.get_context().new_stage()
 30    stage = omni.usd.get_context().get_stage()
 31    timeline = omni.timeline.get_timeline_interface()
 32    timeline.set_end_time(ANIMATION_DURATION)
 33
 34    # Create lights
 35    dome_light = stage.DefinePrim("/World/DomeLight", "DomeLight")
 36    dome_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(100.0)
 37    distant_light = stage.DefinePrim("/World/DistantLight", "DistantLight")
 38    if not distant_light.GetAttribute("xformOp:rotateXYZ"):
 39        UsdGeom.Xformable(distant_light).AddRotateXYZOp()
 40    distant_light.GetAttribute("xformOp:rotateXYZ").Set((-75, 0, 0))
 41    distant_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(2500)
 42
 43    # Setup the physics assets with gravity disabled and the requested velocity
 44    assets_root_path = get_assets_root_path()
 45    physics_asset_url = assets_root_path + PHYSICS_ASSET_URL
 46    for loc, vel in zip(ASSET_X_MIRRORED_LOCATIONS, ASSET_VELOCITIES):
 47        prim = stage.DefinePrim(f"/World/physics_asset_{int(abs(vel))}", "Xform")
 48        prim.GetReferences().AddReference(physics_asset_url)
 49        if not prim.GetAttribute("xformOp:translate"):
 50            UsdGeom.Xformable(prim).AddTranslateOp()
 51        prim.GetAttribute("xformOp:translate").Set(loc)
 52        prim.GetAttribute("physxRigidBody:disableGravity").Set(True)
 53        prim.GetAttribute("physxRigidBody:angularDamping").Set(0.0)
 54        prim.GetAttribute("physxRigidBody:linearDamping").Set(0.0)
 55        prim.GetAttribute("physics:velocity").Set((0, 0, -vel))
 56
 57    # Setup animated assets maintaining the same velocity as the physics assets
 58    anim_asset_url = assets_root_path + ANIM_ASSET_URL
 59    for loc, vel in zip(ASSET_X_MIRRORED_LOCATIONS, ASSET_VELOCITIES):
 60        start_loc = (-loc[0], loc[1], loc[2])
 61        prim = stage.DefinePrim(f"/World/anim_asset_{int(abs(vel))}", "Xform")
 62        prim.GetReferences().AddReference(anim_asset_url)
 63        if not prim.GetAttribute("xformOp:translate"):
 64            UsdGeom.Xformable(prim).AddTranslateOp()
 65        anim_distance = vel * ANIMATION_DURATION
 66        end_loc = (start_loc[0], start_loc[1], start_loc[2] - anim_distance)
 67        end_keyframe = timeline.get_time_codes_per_seconds() * ANIMATION_DURATION
 68        # Timesampled keyframe (animated) translation
 69        prim.GetAttribute("xformOp:translate").Set(start_loc, time=0)
 70        prim.GetAttribute("xformOp:translate").Set(end_loc, time=end_keyframe)
 71
 72
 73# Capture motion blur frames with the given delta time step and render mode
 74def run_motion_blur_example(num_frames=3, custom_delta_time=None, use_path_tracing=True, pt_subsamples=8, pt_spp=64):
 75    # Create a new stage with the assets
 76    setup_stage()
 77    stage = omni.usd.get_context().get_stage()
 78
 79    # Set replicator settings (capture only on request and enable motion blur)
 80    carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False)
 81    carb.settings.get_settings().set("/omni/replicator/captureMotionBlur", True)
 82
 83    # Set motion blur settings based on the render mode
 84    if use_path_tracing:
 85        print(f"[MotionBlur] Setting PathTracing render mode motion blur settings")
 86        carb.settings.get_settings().set("/rtx/rendermode", "PathTracing")
 87        # (int): Total number of samples for each rendered pixel, per frame.
 88        carb.settings.get_settings().set("/rtx/pathtracing/spp", pt_spp)
 89        # (int): Maximum number of samples to accumulate per pixel. When this count is reached the rendering stops until a scene or setting change is detected, restarting the rendering process. Set to 0 to remove this limit.
 90        carb.settings.get_settings().set("/rtx/pathtracing/totalSpp", pt_spp)
 91        carb.settings.get_settings().set("/rtx/pathtracing/optixDenoiser/enabled", 0)
 92        # Number of sub samples to render if in PathTracing render mode and motion blur is enabled.
 93        carb.settings.get_settings().set("/omni/replicator/pathTracedMotionBlurSubSamples", pt_subsamples)
 94    else:
 95        print(f"[MotionBlur] Setting RayTracedLighting render mode motion blur settings")
 96        carb.settings.get_settings().set("/rtx/rendermode", "RayTracedLighting")
 97        # 0: Disabled, 1: TAA, 2: FXAA, 3: DLSS, 4:RTXAA
 98        carb.settings.get_settings().set("/rtx/post/aa/op", 2)
 99        # (float): The fraction of the largest screen dimension to use as the maximum motion blur diameter.
100        carb.settings.get_settings().set("/rtx/post/motionblur/maxBlurDiameterFraction", 0.02)
101        # (float): Exposure time fraction in frames (1.0 = one frame duration) to sample.
102        carb.settings.get_settings().set("/rtx/post/motionblur/exposureFraction", 1.0)
103        # (int): Number of samples to use in the filter. A higher number improves quality at the cost of performance.
104        carb.settings.get_settings().set("/rtx/post/motionblur/numSamples", 8)
105
106    # Setup camera and writer
107    camera = rep.create.camera(position=(0, 1.5, 0), look_at=(0, 0, 0), name="MotionBlurCam")
108    render_product = rep.create.render_product(camera, (1280, 720))
109    basic_writer = rep.WriterRegistry.get("BasicWriter")
110    delta_time_str = "None" if custom_delta_time is None else f"{custom_delta_time:.4f}"
111    render_mode_str = f"pt_subsamples_{pt_subsamples}_spp_{pt_spp}" if use_path_tracing else "rt"
112    output_directory = os.path.join(os.getcwd(), f"_out_motion_blur_dt_{delta_time_str}_{render_mode_str}")
113    print(f"[MotionBlur] Output directory: {output_directory}")
114    basic_writer.initialize(output_dir=output_directory, rgb=True)
115    basic_writer.attach(render_product)
116
117    # Run a few updates to make sure all materials are fully loaded for capture
118    for _ in range(50):
119        simulation_app.update()
120
121    # Use the physics scene to modify the physics FPS (if needed) to guarantee motion samples at any custom delta time
122    physx_scene = None
123    for prim in stage.Traverse():
124        if prim.IsA(UsdPhysics.Scene):
125            physx_scene = PhysxSchema.PhysxSceneAPI.Apply(prim)
126            break
127    if physx_scene is None:
128        print(f"[MotionBlur] Creating a new PhysicsScene")
129        physics_scene = UsdPhysics.Scene.Define(stage, "/PhysicsScene")
130        physx_scene = PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/PhysicsScene"))
131
132    # Check the target physics depending on the custom delta time and the render mode
133    target_physics_fps = stage.GetTimeCodesPerSecond() if custom_delta_time is None else 1 / custom_delta_time
134    if use_path_tracing:
135        target_physics_fps *= pt_subsamples
136
137    # Check if the physics FPS needs to be increased to match the custom delta time
138    orig_physics_fps = physx_scene.GetTimeStepsPerSecondAttr().Get()
139    if target_physics_fps > orig_physics_fps:
140        print(f"[MotionBlur] Changing physics FPS from {orig_physics_fps} to {target_physics_fps}")
141        physx_scene.GetTimeStepsPerSecondAttr().Set(target_physics_fps)
142
143    # Start the timeline for physics updates in the step function
144    timeline = omni.timeline.get_timeline_interface()
145    timeline.play()
146
147    # Capture frames
148    for i in range(num_frames):
149        print(f"[MotionBlur] \tCapturing frame {i}")
150        rep.orchestrator.step(delta_time=custom_delta_time)
151
152    # Restore the original physics FPS
153    if target_physics_fps > orig_physics_fps:
154        print(f"[MotionBlur] Restoring physics FPS from {target_physics_fps} to {orig_physics_fps}")
155        physx_scene.GetTimeStepsPerSecondAttr().Set(orig_physics_fps)
156
157    # Switch back to the raytracing render mode
158    if use_path_tracing:
159        print(f"[MotionBlur] Restoring render mode to RayTracedLighting")
160        carb.settings.get_settings().set("/rtx/rendermode", "RayTracedLighting")
161
162    # Wait until all the data is saved to disk
163    rep.orchestrator.wait_until_complete()
164
165
166def run_motion_blur_examples():
167    motion_blur_step_duration = [None, 1 / 30, 1 / 60, 1 / 240]
168    for custom_delta_time in motion_blur_step_duration:
169        # RayTracing examples
170        run_motion_blur_example(custom_delta_time=custom_delta_time, use_path_tracing=False)
171        # PathTracing examples
172        spps = [32, 128]
173        motion_blur_sub_samples = [4, 16]
174        for motion_blur_sub_sample in motion_blur_sub_samples:
175            for spp in spps:
176                run_motion_blur_example(
177                    custom_delta_time=custom_delta_time,
178                    use_path_tracing=True,
179                    pt_subsamples=motion_blur_sub_sample,
180                    pt_spp=spp,
181                )
182
183
184run_motion_blur_examples()
185
186simulation_app.close()
Motion Blur
  1import asyncio
  2import os
  3
  4import carb.settings
  5import omni.kit.app
  6import omni.replicator.core as rep
  7import omni.timeline
  8import omni.usd
  9from isaacsim.storage.native import get_assets_root_path
 10from pxr import PhysxSchema, Sdf, UsdGeom, UsdPhysics
 11
 12# Paths to the animated and physics-ready assets
 13PHYSICS_ASSET_URL = "/Isaac/Props/YCB/Axis_Aligned_Physics/003_cracker_box.usd"
 14ANIM_ASSET_URL = "/Isaac/Props/YCB/Axis_Aligned/003_cracker_box.usd"
 15
 16# -z velocities and start locations of the animated (left side) and physics (right side) assets (stage units/s)
 17ASSET_VELOCITIES = [0, 5, 10]
 18ASSET_X_MIRRORED_LOCATIONS = [(0.5, 0, 0.3), (0.3, 0, 0.3), (0.1, 0, 0.3)]
 19
 20# Used to calculate how many frames to animate the assets to maintain the same velocity as the physics assets
 21ANIMATION_DURATION = 10
 22
 23# Create a new stage with animated and physics-enabled assets with synchronized motion
 24def setup_stage():
 25    # Create new stage
 26    omni.usd.get_context().new_stage()
 27    stage = omni.usd.get_context().get_stage()
 28    timeline = omni.timeline.get_timeline_interface()
 29    timeline.set_end_time(ANIMATION_DURATION)
 30
 31    # Create lights
 32    dome_light = stage.DefinePrim("/World/DomeLight", "DomeLight")
 33    dome_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(100.0)
 34    distant_light = stage.DefinePrim("/World/DistantLight", "DistantLight")
 35    if not distant_light.GetAttribute("xformOp:rotateXYZ"):
 36        UsdGeom.Xformable(distant_light).AddRotateXYZOp()
 37    distant_light.GetAttribute("xformOp:rotateXYZ").Set((-75, 0, 0))
 38    distant_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(2500)
 39
 40    # Setup the physics assets with gravity disabled and the requested velocity
 41    assets_root_path = get_assets_root_path()
 42    physics_asset_url = assets_root_path + PHYSICS_ASSET_URL
 43    for loc, vel in zip(ASSET_X_MIRRORED_LOCATIONS, ASSET_VELOCITIES):
 44        prim = stage.DefinePrim(f"/World/physics_asset_{int(abs(vel))}", "Xform")
 45        prim.GetReferences().AddReference(physics_asset_url)
 46        if not prim.GetAttribute("xformOp:translate"):
 47            UsdGeom.Xformable(prim).AddTranslateOp()
 48        prim.GetAttribute("xformOp:translate").Set(loc)
 49        prim.GetAttribute("physxRigidBody:disableGravity").Set(True)
 50        prim.GetAttribute("physxRigidBody:angularDamping").Set(0.0)
 51        prim.GetAttribute("physxRigidBody:linearDamping").Set(0.0)
 52        prim.GetAttribute("physics:velocity").Set((0, 0, -vel))
 53
 54    # Setup animated assets maintaining the same velocity as the physics assets
 55    anim_asset_url = assets_root_path + ANIM_ASSET_URL
 56    for loc, vel in zip(ASSET_X_MIRRORED_LOCATIONS, ASSET_VELOCITIES):
 57        start_loc = (-loc[0], loc[1], loc[2])
 58        prim = stage.DefinePrim(f"/World/anim_asset_{int(abs(vel))}", "Xform")
 59        prim.GetReferences().AddReference(anim_asset_url)
 60        if not prim.GetAttribute("xformOp:translate"):
 61            UsdGeom.Xformable(prim).AddTranslateOp()
 62        anim_distance = vel * ANIMATION_DURATION
 63        end_loc = (start_loc[0], start_loc[1], start_loc[2] - anim_distance)
 64        end_keyframe = timeline.get_time_codes_per_seconds() * ANIMATION_DURATION
 65        # Timesampled keyframe (animated) translation
 66        prim.GetAttribute("xformOp:translate").Set(start_loc, time=0)
 67        prim.GetAttribute("xformOp:translate").Set(end_loc, time=end_keyframe)
 68
 69
 70# Capture motion blur frames with the given delta time step and render mode
 71async def run_motion_blur_example_async(
 72    num_frames=3, custom_delta_time=None, use_path_tracing=True, pt_subsamples=8, pt_spp=64
 73):
 74    # Create a new stage with the assets
 75    setup_stage()
 76    stage = omni.usd.get_context().get_stage()
 77
 78    # Set replicator settings (capture only on request and enable motion blur)
 79    carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False)
 80    carb.settings.get_settings().set("/omni/replicator/captureMotionBlur", True)
 81
 82    # Set motion blur settings based on the render mode
 83    if use_path_tracing:
 84        print(f"[MotionBlur] Setting PathTracing render mode motion blur settings")
 85        carb.settings.get_settings().set("/rtx/rendermode", "PathTracing")
 86        # (int): Total number of samples for each rendered pixel, per frame.
 87        carb.settings.get_settings().set("/rtx/pathtracing/spp", pt_spp)
 88        # (int): Maximum number of samples to accumulate per pixel. When this count is reached the rendering stops until a scene or setting change is detected, restarting the rendering process. Set to 0 to remove this limit.
 89        carb.settings.get_settings().set("/rtx/pathtracing/totalSpp", pt_spp)
 90        carb.settings.get_settings().set("/rtx/pathtracing/optixDenoiser/enabled", 0)
 91        # Number of sub samples to render if in PathTracing render mode and motion blur is enabled.
 92        carb.settings.get_settings().set("/omni/replicator/pathTracedMotionBlurSubSamples", pt_subsamples)
 93    else:
 94        print(f"[MotionBlur] Setting RayTracedLighting render mode motion blur settings")
 95        carb.settings.get_settings().set("/rtx/rendermode", "RayTracedLighting")
 96        # 0: Disabled, 1: TAA, 2: FXAA, 3: DLSS, 4:RTXAA
 97        carb.settings.get_settings().set("/rtx/post/aa/op", 2)
 98        # (float): The fraction of the largest screen dimension to use as the maximum motion blur diameter.
 99        carb.settings.get_settings().set("/rtx/post/motionblur/maxBlurDiameterFraction", 0.02)
100        # (float): Exposure time fraction in frames (1.0 = one frame duration) to sample.
101        carb.settings.get_settings().set("/rtx/post/motionblur/exposureFraction", 1.0)
102        # (int): Number of samples to use in the filter. A higher number improves quality at the cost of performance.
103        carb.settings.get_settings().set("/rtx/post/motionblur/numSamples", 8)
104
105    # Setup camera and writer
106    camera = rep.create.camera(position=(0, 1.5, 0), look_at=(0, 0, 0), name="MotionBlurCam")
107    render_product = rep.create.render_product(camera, (1280, 720))
108    basic_writer = rep.WriterRegistry.get("BasicWriter")
109    delta_time_str = "None" if custom_delta_time is None else f"{custom_delta_time:.4f}"
110    render_mode_str = f"pt_subsamples_{pt_subsamples}_spp_{pt_spp}" if use_path_tracing else "rt"
111    output_directory = os.path.join(os.getcwd(), f"_out_motion_blur_dt_{delta_time_str}_{render_mode_str}")
112    print(f"[MotionBlur] Output directory: {output_directory}")
113    basic_writer.initialize(output_dir=output_directory, rgb=True)
114    basic_writer.attach(render_product)
115
116    # Run a few updates to make sure all materials are fully loaded for capture
117    for _ in range(50):
118        await omni.kit.app.get_app().next_update_async()
119
120    # Use the physics scene to modify the physics FPS (if needed) to guarantee motion samples at any custom delta time
121    physx_scene = None
122    for prim in stage.Traverse():
123        if prim.IsA(UsdPhysics.Scene):
124            physx_scene = PhysxSchema.PhysxSceneAPI.Apply(prim)
125            break
126    if physx_scene is None:
127        print(f"[MotionBlur] Creating a new PhysicsScene")
128        physics_scene = UsdPhysics.Scene.Define(stage, "/PhysicsScene")
129        physx_scene = PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/PhysicsScene"))
130
131    # Check the target physics depending on the custom delta time and the render mode
132    target_physics_fps = stage.GetTimeCodesPerSecond() if custom_delta_time is None else 1 / custom_delta_time
133    if use_path_tracing:
134        target_physics_fps *= pt_subsamples
135
136    # Check if the physics FPS needs to be increased to match the custom delta time
137    orig_physics_fps = physx_scene.GetTimeStepsPerSecondAttr().Get()
138    if target_physics_fps > orig_physics_fps:
139        print(f"[MotionBlur] Changing physics FPS from {orig_physics_fps} to {target_physics_fps}")
140        physx_scene.GetTimeStepsPerSecondAttr().Set(target_physics_fps)
141
142    # Start the timeline for physics updates in the step function
143    timeline = omni.timeline.get_timeline_interface()
144    timeline.play()
145
146    # Capture frames
147    for i in range(num_frames):
148        print(f"[MotionBlur] \tCapturing frame {i}")
149        await rep.orchestrator.step_async(delta_time=custom_delta_time)
150
151    # Restore the original physics FPS
152    if target_physics_fps > orig_physics_fps:
153        print(f"[MotionBlur] Restoring physics FPS from {target_physics_fps} to {orig_physics_fps}")
154        physx_scene.GetTimeStepsPerSecondAttr().Set(orig_physics_fps)
155
156    # Switch back to the raytracing render mode
157    if use_path_tracing:
158        print(f"[MotionBlur] Restoring render mode to RayTracedLighting")
159        carb.settings.get_settings().set("/rtx/rendermode", "RayTracedLighting")
160
161    # Wait until all the data is saved to disk
162    await rep.orchestrator.wait_until_complete_async()
163
164
165async def run_motion_blur_examples_async():
166    motion_blur_step_duration = [None, 1 / 30, 1 / 60, 1 / 240]
167    for custom_delta_time in motion_blur_step_duration:
168        # RayTracing examples
169        await run_motion_blur_example_async(custom_delta_time=custom_delta_time, use_path_tracing=False)
170        # PathTracing examples
171        spps = [32, 128]
172        motion_blur_sub_samples = [4, 16]
173        for motion_blur_sub_sample in motion_blur_sub_samples:
174            for spp in spps:
175                await run_motion_blur_example_async(
176                    custom_delta_time=custom_delta_time,
177                    use_path_tracing=True,
178                    pt_subsamples=motion_blur_sub_sample,
179                    pt_spp=spp,
180                )
181
182
183asyncio.ensure_future(run_motion_blur_examples_async())

Subscribers and Events at Custom FPS#

Examples of subscribing to various events (such as stage, physics, and render/app), setting custom update rates, and adjusting various related settings. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):

./python.sh standalone_examples/api/isaacsim.replicator.examples/subscribers_and_events.py
Subscribers and Events at Custom FPS
  1from isaacsim import SimulationApp
  2
  3simulation_app = SimulationApp({"headless": False})
  4
  5import asyncio
  6import math
  7import time
  8
  9import carb.eventdispatcher
 10import carb.events
 11import carb.settings
 12import omni.kit.app
 13import omni.physx
 14import omni.timeline
 15import omni.usd
 16from pxr import PhysxSchema, UsdPhysics
 17
 18# TIMELINE / STAGE
 19USE_CUSTOM_TIMELINE_SETTINGS = False
 20USE_FIXED_TIME_STEPPING = False
 21PLAY_EVERY_FRAME = True
 22PLAY_DELAY_COMPENSATION = 0.0
 23SUBSAMPLE_RATE = 1
 24STAGE_FPS = 30.0
 25
 26# PHYSX
 27USE_CUSTOM_PHYSX_FPS = False
 28PHYSX_FPS = 60.0
 29MIN_SIM_FPS = 30
 30
 31# Simulations can also be enabled/disabled at runtime
 32DISABLE_SIMULATIONS = False
 33
 34# APP / RENDER
 35LIMIT_APP_FPS = False
 36APP_FPS = 120
 37
 38# Duration after which to clear subscribers and print the cached events
 39SUBSCRIBER_WALL_TIME_LIMIT_SEC = 0.5
 40PRINT_EVENTS = True
 41
 42
 43def on_timeline_event(event: omni.timeline.TimelineEventType):
 44    global wall_start_time
 45    global timeline_sub
 46    global timeline_events
 47    elapsed_wall_time = time.time() - wall_start_time
 48
 49    # Cache only time advance events
 50    if event.type == omni.timeline.TimelineEventType.CURRENT_TIME_TICKED.value:
 51        event_name = omni.timeline.TimelineEventType(event.type).name
 52        event_payload = event.payload
 53        timeline_events.append((elapsed_wall_time, event_name, event_payload))
 54
 55    # Clear subscriber and print cached events
 56    if elapsed_wall_time > SUBSCRIBER_WALL_TIME_LIMIT_SEC:
 57        if timeline_sub is not None:
 58            timeline_sub.unsubscribe()
 59            timeline_sub = None
 60        num_events = len(timeline_events)
 61        fps = num_events / SUBSCRIBER_WALL_TIME_LIMIT_SEC
 62        print(f"[timeline] captured {num_events} events with aprox {fps} FPS")
 63        if PRINT_EVENTS:
 64            for i, (wall_time, event_name, payload) in enumerate(timeline_events):
 65                print(f"\t[timeline][{i}]\ttime={wall_time:.4f};\tevent={event_name};\tpayload={payload}")
 66
 67
 68def on_physics_step(dt: float):
 69    global wall_start_time
 70    global physx_events
 71    global physx_sub
 72    elapsed_wall_time = time.time() - wall_start_time
 73
 74    # Cache physics events
 75    physx_events.append((elapsed_wall_time, dt))
 76
 77    # Clear subscriber and print cached events
 78    if elapsed_wall_time > SUBSCRIBER_WALL_TIME_LIMIT_SEC:
 79        # Physics unsubscription needs to be defered from the callback function
 80        # see: '[Error] [omni.physx.plugin] Subscription cannot be changed during the event call'
 81        async def clear_physx_sub_async():
 82            global physx_sub
 83            if physx_sub is not None:
 84                physx_sub.unsubscribe()
 85                physx_sub = None
 86
 87        asyncio.ensure_future(clear_physx_sub_async())
 88        num_events = len(physx_events)
 89        fps = num_events / SUBSCRIBER_WALL_TIME_LIMIT_SEC
 90        print(f"[physics] captured {num_events} events with aprox {fps} FPS")
 91        if PRINT_EVENTS:
 92            for i, (wall_time, dt) in enumerate(physx_events):
 93                print(f"\t[physics][{i}]\ttime={wall_time:.4f};\tdt={dt};")
 94
 95
 96def on_stage_render_event(event: carb.eventdispatcher.Event):
 97    global wall_start_time
 98    global stage_render_sub
 99    global stage_render_events
100    elapsed_wall_time = time.time() - wall_start_time
101
102    event_name = event.event_name
103    event_payload = event.payload
104    stage_render_events.append((elapsed_wall_time, event_name, event_payload))
105
106    if elapsed_wall_time > SUBSCRIBER_WALL_TIME_LIMIT_SEC:
107        if stage_render_sub is not None:
108            stage_render_sub.reset()
109            stage_render_sub = None
110        num_events = len(stage_render_events)
111        fps = num_events / SUBSCRIBER_WALL_TIME_LIMIT_SEC
112        print(f"[stage render] captured {num_events} events with aprox {fps} FPS")
113        if PRINT_EVENTS:
114            for i, (wall_time, event_name, payload) in enumerate(stage_render_events):
115                print(f"\t[stage render][{i}]\ttime={wall_time:.4f};\tevent={event_name};\tpayload={payload}")
116
117
118def on_app_update(event: carb.eventdispatcher.Event):
119    global wall_start_time
120    global app_sub
121    global app_update_events
122    elapsed_wall_time = time.time() - wall_start_time
123
124    event_name = event.event_name
125    event_payload = event.payload
126    app_update_events.append((elapsed_wall_time, event_name, event_payload))
127
128    if elapsed_wall_time > SUBSCRIBER_WALL_TIME_LIMIT_SEC:
129        if app_sub is not None:
130            app_sub.reset()
131            app_sub = None
132        num_events = len(app_update_events)
133        fps = num_events / SUBSCRIBER_WALL_TIME_LIMIT_SEC
134        print(f"[app] captured {num_events} events with aprox {fps} FPS")
135        if PRINT_EVENTS:
136            for i, (wall_time, event_name, payload) in enumerate(app_update_events):
137                print(f"\t[app][{i}]\ttime={wall_time:.4f};\tevent={event_name};\tpayload={payload}")
138
139
140stage = omni.usd.get_context().get_stage()
141timeline = omni.timeline.get_timeline_interface()
142
143
144if USE_CUSTOM_TIMELINE_SETTINGS:
145    # Ideal to make simulation and animation synchronized.
146    # Default: True in editor, False in standalone.
147    # NOTE:
148    # - It may limit the frame rate (see 'timeline.set_play_every_frame') such that the elapsed wall clock time matches the frame's delta time.
149    # - If the app runs slower than this, animation playback may slow down (see 'CompensatePlayDelayInSecs').
150    # - For performance benchmarks, turn this off or set a very high target in `timeline.set_target_framerate`
151    carb.settings.get_settings().set("/app/player/useFixedTimeStepping", USE_FIXED_TIME_STEPPING)
152
153    # This compensates for frames that require more computation time than the frame's fixed delta time, by temporarily speeding up playback.
154    # The parameter represents the length of these "faster" playback periods, which means that it must be larger than the fixed frame time to take effect.
155    # Default: 0.0
156    # NOTE:
157    # - only effective if `useFixedTimeStepping` is set to True
158    # - setting a large value results in long fast playback after a huge lag spike
159    carb.settings.get_settings().set("/app/player/CompensatePlayDelayInSecs", PLAY_DELAY_COMPENSATION)
160
161    # If set to True, no frames are skipped and in every frame time advances by `1 / TimeCodesPerSecond`.
162    # Default: False
163    # NOTE:
164    # - only effective if `useFixedTimeStepping` is set to True
165    # - simulation is usually faster than real-time and processing is only limited by the frame rate of the runloop
166    # - useful for recording
167    # - same as `carb.settings.get_settings().set("/app/player/useFastMode", PLAY_EVERY_FRAME)`
168    timeline.set_play_every_frame(PLAY_EVERY_FRAME)
169
170    # Timeline sub-stepping, i.e. how many times updates are called (update events are dispatched) each frame.
171    # Default: 1
172    # NOTE: same as `carb.settings.get_settings().set("/app/player/timelineSubsampleRate", SUBSAMPLE_RATE)`
173    timeline.set_ticks_per_frame(SUBSAMPLE_RATE)
174
175    # Time codes per second for the stage
176    # NOTE: same as `stage.SetTimeCodesPerSecond(STAGE_FPS)` and `carb.settings.get_settings().set("/app/stage/timeCodesPerSecond", STAGE_FPS)`
177    timeline.set_time_codes_per_second(STAGE_FPS)
178
179
180# Create a PhysX scene to set the physics time step
181if USE_CUSTOM_PHYSX_FPS:
182    physx_scene = None
183    for prim in stage.Traverse():
184        if prim.IsA(UsdPhysics.Scene):
185            physx_scene = PhysxSchema.PhysxSceneAPI.Apply(prim)
186            break
187    if physx_scene is None:
188        physics_scene = UsdPhysics.Scene.Define(stage, "/PhysicsScene")
189        physx_scene = PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/PhysicsScene"))
190
191    # Time step for the physics simulation
192    # Default: 60.0
193    physx_scene.GetTimeStepsPerSecondAttr().Set(PHYSX_FPS)
194
195    # Minimum simulation frequency to prevent clamping; if the frame rate drops below this,
196    # physics steps are discarded to avoid app slowdown if the overall frame rate is too low.
197    # Default: 30.0
198    # NOTE: Matching `minFrameRate` with `TimeStepsPerSecond` ensures a single physics step per update.
199    carb.settings.get_settings().set("/persistent/simulation/minFrameRate", MIN_SIM_FPS)
200
201
202# Throttle Render/UI/Main thread update rate
203if LIMIT_APP_FPS:
204    # Enable rate limiting of the main run loop (UI, rendering, etc.)
205    # Default: False
206    carb.settings.get_settings().set("/app/runLoops/main/rateLimitEnabled", LIMIT_APP_FPS)
207
208    # FPS limit of the main run loop (UI, rendering, etc.)
209    # Default: 120
210    # NOTE: disabled if `/app/player/useFixedTimeStepping` is False
211    carb.settings.get_settings().set("/app/runLoops/main/rateLimitFrequency", int(APP_FPS))
212
213
214# Simulations can be selectively disabled (or toggled at specific times)
215if DISABLE_SIMULATIONS:
216    carb.settings.get_settings().set("/app/player/playSimulations", False)
217
218
219# Start the timeline
220timeline.set_current_time(0)
221timeline.set_end_time(SUBSCRIBER_WALL_TIME_LIMIT_SEC + 1)
222timeline.set_looping(False)
223timeline.play()
224timeline.commit()
225wall_start_time = time.time()
226
227# Subscribe and cache various events for a limited duration
228timeline_events = []
229timeline_sub = timeline.get_timeline_event_stream().create_subscription_to_pop(on_timeline_event)
230physx_events = []
231physx_sub = omni.physx.get_physx_interface().subscribe_physics_step_events(on_physics_step)
232stage_render_events = []
233stage_render_sub = carb.eventdispatcher.get_eventdispatcher().observe_event(
234    event_name=omni.usd.get_context().stage_rendering_event_name(omni.usd.StageRenderingEventType.NEW_FRAME, True),
235    on_event=on_stage_render_event,
236    observer_name="subscribers_and_events.on_stage_render_event",
237)
238app_update_events = []
239app_sub = carb.eventdispatcher.get_eventdispatcher().observe_event(
240    event_name=omni.kit.app.GLOBAL_EVENT_UPDATE,
241    on_event=on_app_update,
242    observer_name="subscribers_and_events.on_app_update",
243)
244
245# Run the application for a while to trigger the events, with a buffer to ensure subscribers have enough wall-clock time
246num_app_updates = int(math.ceil(SUBSCRIBER_WALL_TIME_LIMIT_SEC * STAGE_FPS * 4))
247for _ in range(num_app_updates):
248    simulation_app.update()
249
250print(f"Finished running the application for {num_app_updates} updates.")
251print(f"Wall time: {time.time() - wall_start_time:.4f} seconds")
252print(f"Number of timeline events: {len(timeline_events)}")
253print(f"Number of physics events: {len(physx_events)}")
254print(f"Number of stage render events: {len(stage_render_events)}")
255print(f"Number of app update events: {len(app_update_events)}")
256
257simulation_app.close()
Subscribers and Events at Custom FPS
  1import asyncio
  2import math
  3import time
  4
  5import carb.eventdispatcher
  6import carb.events
  7import carb.settings
  8import omni.kit.app
  9import omni.physx
 10import omni.timeline
 11import omni.usd
 12from pxr import PhysxSchema, UsdPhysics
 13
 14# TIMELINE / STAGE
 15USE_CUSTOM_TIMELINE_SETTINGS = False
 16USE_FIXED_TIME_STEPPING = False
 17PLAY_EVERY_FRAME = True
 18PLAY_DELAY_COMPENSATION = 0.0
 19SUBSAMPLE_RATE = 1
 20STAGE_FPS = 30.0
 21
 22# PHYSX
 23USE_CUSTOM_PHYSX_FPS = False
 24PHYSX_FPS = 60.0
 25MIN_SIM_FPS = 30
 26
 27# Simulations can also be enabled/disabled at runtime
 28DISABLE_SIMULATIONS = False
 29
 30# APP / RENDER
 31LIMIT_APP_FPS = False
 32APP_FPS = 120
 33
 34# Duration after which to clear subscribers and print the cached events
 35SUBSCRIBER_WALL_TIME_LIMIT_SEC = 0.5
 36PRINT_EVENTS = True
 37
 38
 39async def example_async():
 40    def on_timeline_event(event: omni.timeline.TimelineEventType):
 41        nonlocal wall_start_time
 42        nonlocal timeline_sub
 43        nonlocal timeline_events
 44        elapsed_wall_time = time.time() - wall_start_time
 45
 46        # Cache only time advance events
 47        if event.type == omni.timeline.TimelineEventType.CURRENT_TIME_TICKED.value:
 48            event_name = omni.timeline.TimelineEventType(event.type).name
 49            event_payload = event.payload
 50            timeline_events.append((elapsed_wall_time, event_name, event_payload))
 51
 52        # Clear subscriber and print cached events
 53        if elapsed_wall_time > SUBSCRIBER_WALL_TIME_LIMIT_SEC:
 54            if timeline_sub is not None:
 55                timeline_sub.unsubscribe()
 56                timeline_sub = None
 57            num_events = len(timeline_events)
 58            fps = num_events / SUBSCRIBER_WALL_TIME_LIMIT_SEC
 59            print(f"[timeline] captured {num_events} events with aprox {fps} FPS")
 60            if PRINT_EVENTS:
 61                for i, (wall_time, event_name, payload) in enumerate(timeline_events):
 62                    print(
 63                        f"\t[timeline][{i}]\ttime={wall_time:.4f};\tevent={event_name};\tpayload={payload}"
 64                    )
 65
 66    def on_physics_step(dt: float):
 67        nonlocal wall_start_time
 68        nonlocal physx_events
 69        nonlocal physx_sub
 70        elapsed_wall_time = time.time() - wall_start_time
 71
 72        # Cache physics events
 73        physx_events.append((elapsed_wall_time, dt))
 74
 75        # Clear subscriber and print cached events
 76        if elapsed_wall_time > SUBSCRIBER_WALL_TIME_LIMIT_SEC:
 77            # Physics unsubscription needs to be deferred from the callback function
 78            # see: '[Error] [omni.physx.plugin] Subscription cannot be changed during the event call'
 79            async def clear_physx_sub_async():
 80                nonlocal physx_sub
 81                if physx_sub is not None:
 82                    physx_sub.unsubscribe()
 83                    physx_sub = None
 84
 85            asyncio.ensure_future(clear_physx_sub_async())
 86            num_events = len(physx_events)
 87            fps = num_events / SUBSCRIBER_WALL_TIME_LIMIT_SEC
 88            print(f"[physics] captured {num_events} events with aprox {fps} FPS")
 89            if PRINT_EVENTS:
 90                for i, (wall_time, dt) in enumerate(physx_events):
 91                    print(f"\t[physics][{i}]\ttime={wall_time:.4f};\tdt={dt};")
 92
 93    def on_stage_render_event(event: carb.eventdispatcher.Event):
 94        nonlocal wall_start_time
 95        nonlocal stage_render_sub
 96        nonlocal stage_render_events
 97        elapsed_wall_time = time.time() - wall_start_time
 98
 99        event_name = event.event_name
100        event_payload = event.payload
101        stage_render_events.append((elapsed_wall_time, event_name, event_payload))
102
103        if elapsed_wall_time > SUBSCRIBER_WALL_TIME_LIMIT_SEC:
104            if stage_render_sub is not None:
105                stage_render_sub.reset
106                stage_render_sub = None
107            num_events = len(stage_render_events)
108            fps = num_events / SUBSCRIBER_WALL_TIME_LIMIT_SEC
109            print(f"[stage render] captured {num_events} events with aprox {fps} FPS")
110            if PRINT_EVENTS:
111                for i, (wall_time, event_name, payload) in enumerate(
112                    stage_render_events
113                ):
114                    print(
115                        f"\t[stage render][{i}]\ttime={wall_time:.4f};\tevent={event_name};\tpayload={payload}"
116                    )
117
118    def on_app_update(event: carb.eventdispatcher.Event):
119        nonlocal wall_start_time
120        nonlocal app_sub
121        nonlocal app_update_events
122        elapsed_wall_time = time.time() - wall_start_time
123
124        # Cache app update events
125        event_name = event.event_name
126        event_payload = event.payload
127        app_update_events.append((elapsed_wall_time, event_name, event_payload))
128
129        if elapsed_wall_time > SUBSCRIBER_WALL_TIME_LIMIT_SEC:
130            if app_sub is not None:
131                app_sub.reset()
132                app_sub = None
133            num_events = len(app_update_events)
134            fps = num_events / SUBSCRIBER_WALL_TIME_LIMIT_SEC
135            print(f"[app] captured {num_events} events with aprox {fps} FPS")
136            if PRINT_EVENTS:
137                for i, (wall_time, event_name, payload) in enumerate(app_update_events):
138                    print(
139                        f"\t[app][{i}]\ttime={wall_time:.4f};\tevent={event_name};\tpayload={payload}"
140                    )
141
142    stage = omni.usd.get_context().get_stage()
143    timeline = omni.timeline.get_timeline_interface()
144
145    if USE_CUSTOM_TIMELINE_SETTINGS:
146        # Ideal to make simulation and animation synchronized.
147        # Default: True in editor, False in standalone.
148        # NOTE:
149        # - It may limit the frame rate (see 'timeline.set_play_every_frame') such that the elapsed wall clock time matches the frame's delta time.
150        # - If the app runs slower than this, animation playback may slow down (see 'CompensatePlayDelayInSecs').
151        # - For performance benchmarks, turn this off or set a very high target in `timeline.set_target_framerate`
152        carb.settings.get_settings().set(
153            "/app/player/useFixedTimeStepping", USE_FIXED_TIME_STEPPING
154        )
155
156        # This compensates for frames that require more computation time than the frame's fixed delta time, by temporarily speeding up playback.
157        # The parameter represents the length of these "faster" playback periods, which means that it must be larger than the fixed frame time to take effect.
158        # Default: 0.0
159        # NOTE:
160        # - only effective if `useFixedTimeStepping` is set to True
161        # - setting a large value results in long fast playback after a huge lag spike
162        carb.settings.get_settings().set(
163            "/app/player/CompensatePlayDelayInSecs", PLAY_DELAY_COMPENSATION
164        )
165
166        # If set to True, no frames are skipped and in every frame time advances by `1 / TimeCodesPerSecond`.
167        # Default: False
168        # NOTE:
169        # - only effective if `useFixedTimeStepping` is set to True
170        # - simulation is usually faster than real-time and processing is only limited by the frame rate of the runloop
171        # - useful for recording
172        # - same as `carb.settings.get_settings().set("/app/player/useFastMode", PLAY_EVERY_FRAME)`
173        timeline.set_play_every_frame(PLAY_EVERY_FRAME)
174
175        # Timeline sub-stepping, i.e. how many times updates are called (update events are dispatched) each frame.
176        # Default: 1
177        # NOTE: same as `carb.settings.get_settings().set("/app/player/timelineSubsampleRate", SUBSAMPLE_RATE)`
178        timeline.set_ticks_per_frame(SUBSAMPLE_RATE)
179
180        # Time codes per second for the stage
181        # NOTE: same as `stage.SetTimeCodesPerSecond(STAGE_FPS)` and `carb.settings.get_settings().set("/app/stage/timeCodesPerSecond", STAGE_FPS)`
182        timeline.set_time_codes_per_second(STAGE_FPS)
183
184    # Create a PhysX scene to set the physics time step
185    if USE_CUSTOM_PHYSX_FPS:
186        physx_scene = None
187        for prim in stage.Traverse():
188            if prim.IsA(UsdPhysics.Scene):
189                physx_scene = PhysxSchema.PhysxSceneAPI.Apply(prim)
190                break
191        if physx_scene is None:
192            physics_scene = UsdPhysics.Scene.Define(stage, "/PhysicsScene")
193            physx_scene = PhysxSchema.PhysxSceneAPI.Apply(
194                stage.GetPrimAtPath("/PhysicsScene")
195            )
196
197        # Time step for the physics simulation
198        # Default: 60.0
199        physx_scene.GetTimeStepsPerSecondAttr().Set(PHYSX_FPS)
200
201        # Minimum simulation frequency to prevent clamping; if the frame rate drops below this,
202        # physics steps are discarded to avoid app slowdown if the overall frame rate is too low.
203        # Default: 30.0
204        # NOTE: Matching `minFrameRate` with `TimeStepsPerSecond` ensures a single physics step per update.
205        carb.settings.get_settings().set(
206            "/persistent/simulation/minFrameRate", MIN_SIM_FPS
207        )
208
209    # Throttle Render/UI/Main thread update rate
210    if LIMIT_APP_FPS:
211        # Enable rate limiting of the main run loop (UI, rendering, etc.)
212        # Default: False
213        carb.settings.get_settings().set(
214            "/app/runLoops/main/rateLimitEnabled", LIMIT_APP_FPS
215        )
216
217        # FPS limit of the main run loop (UI, rendering, etc.)
218        # Default: 120
219        # NOTE: disabled if `/app/player/useFixedTimeStepping` is False
220        carb.settings.get_settings().set(
221            "/app/runLoops/main/rateLimitFrequency", int(APP_FPS)
222        )
223
224    # Simulations can be selectively disabled (or toggled at specific times)
225    if DISABLE_SIMULATIONS:
226        carb.settings.get_settings().set("/app/player/playSimulations", False)
227
228    # Start the timeline
229    timeline.set_current_time(0)
230    timeline.set_end_time(SUBSCRIBER_WALL_TIME_LIMIT_SEC + 1)
231    timeline.set_looping(False)
232    timeline.play()
233    timeline.commit()
234    wall_start_time = time.time()
235
236    # Subscribe and cache various events for a limited duration
237    timeline_events = []
238    timeline_sub = timeline.get_timeline_event_stream().create_subscription_to_pop(
239        on_timeline_event
240    )
241    physx_events = []
242    physx_sub = omni.physx.get_physx_interface().subscribe_physics_step_events(
243        on_physics_step
244    )
245    stage_render_events = []
246    stage_render_sub = carb.eventdispatcher.get_eventdispatcher().observe_event(
247        event_name=omni.usd.get_context().stage_rendering_event_name(
248            omni.usd.StageRenderingEventType.NEW_FRAME, True
249        ),
250        on_event=on_stage_render_event,
251        observer_name="subscribers_and_events.on_stage_render_event",
252    )
253
254    app_update_events = []
255    app_sub = carb.eventdispatcher.get_eventdispatcher().observe_event(
256        event_name=omni.kit.app.GLOBAL_EVENT_UPDATE,
257        on_event=on_app_update,
258        observer_name="subscribers_and_events.on_app_update",
259    )
260
261    # Run the application for a while to trigger the events, with a buffer to ensure subscribers have enough wall-clock time
262    num_app_updates = int(math.ceil(SUBSCRIBER_WALL_TIME_LIMIT_SEC * STAGE_FPS * 4))
263    for _ in range(num_app_updates):
264        await omni.kit.app.get_app().next_update_async()
265
266    print(f"Finished running the application for {num_app_updates} updates.")
267    print(f"Wall time: {time.time() - wall_start_time:.4f} seconds")
268    print(f"Number of timeline events: {len(timeline_events)}")
269    print(f"Number of physics events: {len(physx_events)}")
270    print(f"Number of stage render events: {len(stage_render_events)}")
271    print(f"Number of app update events: {len(app_update_events)}")
272
273    # Cleanup
274    timeline.stop()
275    if app_sub is not None:
276        app_sub.reset()
277        app_sub = None
278    if stage_render_sub is not None:
279        stage_render_sub.reset()
280        stage_render_sub = None
281    if physx_sub is not None:
282        physx_sub.unsubscribe()
283        physx_sub = None
284    if timeline_sub is not None:
285        timeline_sub.unsubscribe()
286        timeline_sub = None
287
288
289asyncio.ensure_future(example_async())

Accessing Writer and Annotator Data at Custom FPS#

Example of how to trigger a writer and access annotator data at a custom FPS, with product rendering disabled when the data is not needed. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):

./python.sh standalone_examples/api/isaacsim.replicator.examples/custom_fps_writer_annotator.py

Note

It is currently not possible to change timeline (stage) FPS after the replicator graph creation as it causes a graph reset. This issue is being addressed. As a workaround make sure you are setting the timeline (stage) parameters before creating the replicator graph.

Accessing Writer and Annotator Data at Custom FPS
  1from isaacsim import SimulationApp
  2
  3simulation_app = SimulationApp({"headless": False})
  4
  5import os
  6
  7import carb.settings
  8import omni.kit.app
  9import omni.replicator.core as rep
 10import omni.timeline
 11import omni.usd
 12
 13# NOTE: To avoid FPS delta misses make sure the sensor framerate is divisible by the timeline framerate
 14STAGE_FPS = 100.0
 15SENSOR_FPS = 10.0
 16SENSOR_DT = 1.0 / SENSOR_FPS
 17
 18
 19def run_custom_fps_example(duration_seconds):
 20    # Create a new stage
 21    omni.usd.get_context().new_stage()
 22
 23    # Disable capture on play (data will only be accessed at custom times)
 24    carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False)
 25
 26    # Make sure fixed time stepping is set (the timeline will be advanced with the same delta time)
 27    carb.settings.get_settings().set("/app/player/useFixedTimeStepping", True)
 28
 29    # Set the timeline parameters
 30    timeline = omni.timeline.get_timeline_interface()
 31    timeline.set_looping(False)
 32    timeline.set_current_time(0.0)
 33    timeline.set_end_time(10)
 34    timeline.set_time_codes_per_second(STAGE_FPS)
 35    timeline.play()
 36    timeline.commit()
 37
 38    # Create a light and a semantically annoated cube
 39    rep.create.light()
 40    rep.create.cube(semantics=[("class", "cube")])
 41
 42    # Create a render product and disable it (it will re-enabled when data is needed)
 43    rp = rep.create.render_product("/OmniverseKit_Persp", (512, 512), name="rp")
 44    rp.hydra_texture.set_updates_enabled(False)
 45
 46    # Create a writer and an annotator as different ways to access the data
 47    out_dir_rgb = os.path.join(os.getcwd(), "_out_writer_fps_rgb")
 48    print(f"Writer data will be written to: {out_dir_rgb}")
 49    writer_rgb = rep.WriterRegistry.get("BasicWriter")
 50    writer_rgb.initialize(output_dir=out_dir_rgb, rgb=True)
 51    writer_rgb.attach(rp)
 52    annot_depth = rep.AnnotatorRegistry.get_annotator("distance_to_camera")
 53    annot_depth.attach(rp)
 54
 55    # Run the simulation for the given number of frames and access the data at the desired framerates
 56    written_frames = 0
 57    previous_time = timeline.get_current_time()
 58    elapsed_time = 0.0
 59    loop_iteration_count = 0
 60    while timeline.get_current_time() < duration_seconds:
 61        current_time = timeline.get_current_time()
 62        delta_time = current_time - previous_time
 63        elapsed_time += delta_time
 64        print(
 65            f"[{loop_iteration_count}] current_time={current_time:.4f}; delta_time={delta_time:.4f}; elapsed_time={elapsed_time:.4f}/{SENSOR_DT:.4f};"
 66        )
 67
 68        # Check if enough time has passed to trigger the sensor
 69        if elapsed_time >= SENSOR_DT:
 70            # Reset the elapsed time with the difference to the optimal trigger time (when the timeline fps is not divisible by the sensor framerate)
 71            elapsed_time = elapsed_time - SENSOR_DT
 72
 73            # Enable render products for data access
 74            rp.hydra_texture.set_updates_enabled(True)
 75
 76            # Step needs to be called after scheduling the write
 77            rep.orchestrator.step(delta_time=0.0, pause_timeline=False)
 78
 79            # After step, the annotator data is available and in sync with the stage
 80            annot_data = annot_depth.get_data()
 81
 82            # Count the number of frames written
 83            print(f"\t Writing frame {written_frames}; annotator data shape={annot_data.shape};")
 84            written_frames += 1
 85
 86            # Disable render products to avoid unnecessary rendering
 87            rp.hydra_texture.set_updates_enabled(False)
 88
 89        previous_time = current_time
 90        # Advance the app (timeline) by one frame
 91        simulation_app.update()
 92
 93    # Make sure the writer finishes writing the data to disk
 94    rep.orchestrator.wait_until_complete()
 95
 96
 97# Run the example.
 98# NOTE: The simulation duration is calculated to ensure 'target_num_writes' are captured.
 99# It includes the time for all sensor intervals plus a small buffer of a few stage frames.
100target_num_writes = 6
101duration_for_target_writes = (target_num_writes * SENSOR_DT) + (5.0 / STAGE_FPS)
102run_custom_fps_example(duration_seconds=duration_for_target_writes)
103
104# Close the application
105simulation_app.close()
Accessing Writer and Annotator Data at Custom FPS
 1import asyncio
 2import os
 3
 4import carb.settings
 5import omni.kit.app
 6import omni.replicator.core as rep
 7import omni.timeline
 8import omni.usd
 9
10# NOTE: To avoid FPS delta misses make sure the sensor framerate is divisible by the timeline framerate
11STAGE_FPS = 100.0
12SENSOR_FPS = 10.0
13SENSOR_DT = 1.0 / SENSOR_FPS
14
15async def run_custom_fps_example_async(duration_seconds):
16    # Create a new stage
17    await omni.usd.get_context().new_stage_async()
18
19    # Disable capture on play (data will only be accessed at custom times)
20    carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False)
21
22    # Make sure fixed time stepping is set (the timeline will be advanced with the same delta time)
23    carb.settings.get_settings().set("/app/player/useFixedTimeStepping", True)
24
25    # Set the timeline parameters
26    timeline = omni.timeline.get_timeline_interface()
27    timeline.set_looping(False)
28    timeline.set_current_time(0.0)
29    timeline.set_end_time(10)
30    timeline.set_time_codes_per_second(STAGE_FPS)
31    timeline.play()
32    timeline.commit()
33
34    # Create a light and a semantically annotated cube
35    rep.create.light()
36    rep.create.cube(semantics=[("class", "cube")])
37
38    # Create a render product and disable it (it will re-enabled when data is needed)
39    rp = rep.create.render_product("/OmniverseKit_Persp", (512, 512), name="rp")
40    rp.hydra_texture.set_updates_enabled(False)
41
42    # Create a writer and an annotator as different ways to access the data
43    out_dir_rgb = os.path.join(os.getcwd(), "_out_writer_fps_rgb")
44    print(f"Writer data will be written to: {out_dir_rgb}")
45    writer_rgb = rep.WriterRegistry.get("BasicWriter")
46    writer_rgb.initialize(output_dir=out_dir_rgb, rgb=True)
47    writer_rgb.attach(rp)
48    annot_depth = rep.AnnotatorRegistry.get_annotator("distance_to_camera")
49    annot_depth.attach(rp)
50
51    # Run the simulation for the given number of frames and access the data at the desired framerates
52    written_frames = 0
53    previous_time = timeline.get_current_time()
54    elapsed_time = 0.0
55    loop_iteration_count = 0
56    while timeline.get_current_time() < duration_seconds:
57        current_time = timeline.get_current_time()
58        delta_time = current_time - previous_time
59        elapsed_time += delta_time
60        print(
61            f"[{loop_iteration_count}] current_time={current_time:.4f}; delta_time={delta_time:.4f}; elapsed_time={elapsed_time:.4f}/{SENSOR_DT:.4f};"
62        )
63
64        # Check if enough time has passed to trigger the sensor
65        if elapsed_time >= SENSOR_DT:
66            # Reset the elapsed time with the difference to the optimal trigger time (when the timeline fps is not divisible by the sensor framerate)
67            elapsed_time = elapsed_time - SENSOR_DT
68
69            # Enable render products for data access
70            rp.hydra_texture.set_updates_enabled(True)
71
72            # Step needs to be called after scheduling the write
73            await rep.orchestrator.step_async(delta_time=0.0, pause_timeline=False)
74
75            # After step, the annotator data is available and in sync with the stage
76            annot_data = annot_depth.get_data()
77
78            # Count the number of frames written
79            print(f"\t Writing frame {written_frames}; annotator data shape={annot_data.shape};")
80            written_frames += 1
81
82            # Disable render products to avoid unnecessary rendering
83            rp.hydra_texture.set_updates_enabled(False)
84
85        previous_time = current_time
86        # Advance the app (timeline) by one frame
87        await omni.kit.app.get_app().next_update_async()
88        loop_iteration_count += 1
89
90    # Make sure the writer finishes writing the data to disk
91    await rep.orchestrator.wait_until_complete_async()
92
93# Run the example.
94# NOTE: The simulation duration is calculated to ensure 'target_num_writes' are captured.
95# It includes the time for all sensor intervals plus a small buffer of a few stage frames.
96target_num_writes = 6
97duration_for_target_writes = (target_num_writes * SENSOR_DT) + (5.0 / STAGE_FPS)
98await run_custom_fps_example_async(duration_seconds=duration_for_target_writes)

Cosmos Writer Example#

This example demonstrates how to use the CosmosWriter for synthetic data generation. The CosmosWriter generates specialized data for NVIDIA’s Cosmos platform, producing key modalities including RGB, shaded instance segmentation, instance segmentation, depth, and edge maps. These visual features help Cosmos models better distinguish object boundaries and understand 3D form, enabling higher-quality synthetic data generation. For more information about using Cosmos for Synthetic Dataset Augmentation, refer to the detailed case study. The standalone example can also be run directly (on Windows use python.bat instead of python.sh):

./python.sh standalone_examples/api/isaacsim.replicator.examples/cosmos_writer_warehouse.py
Cosmos Writer Example
  1from isaacsim import SimulationApp
  2
  3simulation_app = SimulationApp(launch_config={"headless": False})
  4
  5import os
  6
  7import carb
  8import omni.replicator.core as rep
  9import omni.timeline
 10import omni.usd
 11from isaacsim.core.utils.stage import add_reference_to_stage
 12from isaacsim.storage.native import get_assets_root_path
 13from pxr import UsdGeom
 14
 15STAGE_URL = "/Isaac/Samples/Replicator/Stage/full_warehouse_worker_and_anim_cameras.usd"
 16CARTER_NAV_ASSET_URL = "/Isaac/Samples/Replicator/OmniGraph/nova_carter_nav_only.usd"
 17CARTER_NAV_PATH = "/NavWorld/CarterNav"
 18CARTER_CHASSIS_PATH = f"{CARTER_NAV_PATH}/chassis_link"
 19CARTER_NAV_TARGET_PATH = f"{CARTER_NAV_PATH}/targetXform"
 20CARTER_CAMERA_PATH = f"{CARTER_NAV_PATH}/chassis_link/sensors/front_hawk/left/camera_left"
 21CARTER_NAV_POSITION = (-6, 4, 0)
 22CARTER_NAV_TARGET_POSITION = (3, 3, 0)
 23
 24
 25def advance_timeline_by_duration(duration: float, max_updates: int = 1000):
 26    timeline = omni.timeline.get_timeline_interface()
 27    current_time = timeline.get_current_time()
 28    target_time = current_time + duration
 29
 30    if timeline.get_end_time() < target_time:
 31        timeline.set_end_time(1000000)
 32
 33    if not timeline.is_playing():
 34        timeline.play()
 35
 36    print(f"Advancing timeline from {current_time:.4f}s to {target_time:.4f}s")
 37    step_count = 0
 38    while current_time < target_time:
 39        if step_count >= max_updates:
 40            print(f"Max updates reached: {step_count}, finishing timeline advance.")
 41            break
 42
 43        prev_time = current_time
 44        simulation_app.update()
 45        current_time = timeline.get_current_time()
 46        step_count += 1
 47
 48        if step_count % 10 == 0:
 49            print(f"\tStep {step_count}, {current_time:.4f}s/{target_time:.4f}s")
 50
 51        if current_time <= prev_time:
 52            print(f"Warning: Timeline did not advance at update {step_count} (time: {current_time:.4f}s).")
 53    print(f"Finished advancing timeline to {timeline.get_end_time():.4f}s in {step_count} steps")
 54
 55
 56def run_sdg_pipeline(camera_path, num_frames, capture_interval, use_instance_id=True, segmentation_mapping=None):
 57    rp = rep.create.render_product(camera_path, (1280, 720))
 58    cosmos_writer = rep.WriterRegistry.get("CosmosWriter")
 59    backend = rep.backends.get("DiskBackend")
 60    out_dir = os.path.join(os.getcwd(), f"_out_cosmos")
 61    print(f"output_directory: {out_dir}")
 62    backend.initialize(output_dir=out_dir)
 63    cosmos_writer.initialize(
 64        backend=backend, use_instance_id=use_instance_id, segmentation_mapping=segmentation_mapping
 65    )
 66    cosmos_writer.attach(rp)
 67
 68    # Make sure the timeline is playing
 69    timeline = omni.timeline.get_timeline_interface()
 70    if not timeline.is_playing():
 71        timeline.play()
 72
 73    print(f"Starting SDG pipeline. Capturing {num_frames} frames, every {capture_interval} simulation step(s).")
 74    frames_captured_count = 0
 75    simulation_step_index = 0
 76    while frames_captured_count < num_frames:
 77        print(f"Simulation step {simulation_step_index}")
 78        if simulation_step_index % capture_interval == 0:
 79            print(f"\t Capturing frame {frames_captured_count + 1}/{num_frames}")
 80            rep.orchestrator.step(pause_timeline=False)
 81            frames_captured_count += 1
 82        else:
 83            simulation_app.update()
 84        simulation_step_index += 1
 85
 86    print("Waiting to finish processing and writing the data")
 87    rep.orchestrator.wait_until_complete()
 88    print(f"Finished SDG pipeline. Captured {frames_captured_count} frames")
 89    cosmos_writer.detach()
 90    rp.destroy()
 91    timeline.pause()
 92
 93
 94def run_example(num_frames, capture_interval=1, start_delay=None, use_instance_id=True, segmentation_mapping=None):
 95    assets_root_path = get_assets_root_path()
 96    stage_path = assets_root_path + STAGE_URL
 97    print(f"Opening stage: '{stage_path}'")
 98    omni.usd.get_context().open_stage(stage_path)
 99    stage = omni.usd.get_context().get_stage()
100
101    # Enable script nodes
102    carb.settings.get_settings().set_bool("/app/omni.graph.scriptnode/opt_in", True)
103
104    # Disable capture on play on the new stage, data is captured manually using the step function
105    rep.orchestrator.set_capture_on_play(False)
106
107    # Load carter nova asset with its navigation graph
108    carter_url_path = assets_root_path + CARTER_NAV_ASSET_URL
109    print(f"Loading carter nova asset: '{carter_url_path}' at prim path: '{CARTER_NAV_PATH}'")
110    carter_nav_prim = add_reference_to_stage(usd_path=carter_url_path, prim_path=CARTER_NAV_PATH)
111    if not carter_nav_prim.GetAttribute("xformOp:translate"):
112        UsdGeom.Xformable(carter_nav_prim).AddTranslateOp()
113    carter_nav_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_POSITION)
114
115    # Set the navigation target position
116    carter_navigation_target_prim = stage.GetPrimAtPath(CARTER_NAV_TARGET_PATH)
117    if not carter_navigation_target_prim.IsValid():
118        print(f"Carter navigation target prim not found at path: {CARTER_NAV_TARGET_PATH}, exiting")
119        return
120    if not carter_navigation_target_prim.GetAttribute("xformOp:translate"):
121        UsdGeom.Xformable(carter_navigation_target_prim).AddTranslateOp()
122    carter_navigation_target_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_TARGET_POSITION)
123
124    # Use the carter nova front hawk camera for capturing data
125    camera_prim = stage.GetPrimAtPath(CARTER_CAMERA_PATH)
126    if not camera_prim.IsValid():
127        print(f"Camera prim not found at path: {CARTER_CAMERA_PATH}, exiting")
128        return
129
130    # Advance the timeline with the start delay if provided
131    if start_delay is not None and start_delay > 0:
132        advance_timeline_by_duration(start_delay)
133
134    # Run the SDG pipeline
135    run_sdg_pipeline(camera_prim.GetPath(), num_frames, capture_interval, use_instance_id, segmentation_mapping)
136
137
138NUM_FRAMES = 120
139CAPTURE_INTERVAL = 4
140START_DELAY = 1.0
141run_example(num_frames=NUM_FRAMES, capture_interval=CAPTURE_INTERVAL, start_delay=START_DELAY, use_instance_id=True)
Cosmos Writer Example
import asyncio
import os

import carb
import omni.replicator.core as rep
import omni.timeline
import omni.usd
from isaacsim.core.utils.stage import add_reference_to_stage
from isaacsim.storage.native import get_assets_root_path_async
from pxr import UsdGeom

STAGE_URL = "/Isaac/Samples/Replicator/Stage/full_warehouse_worker_and_anim_cameras.usd"
CARTER_NAV_ASSET_URL = "/Isaac/Samples/Replicator/OmniGraph/nova_carter_nav_only.usd"
CARTER_NAV_PATH = "/NavWorld/CarterNav"
CARTER_CHASSIS_PATH = f"{CARTER_NAV_PATH}/chassis_link"
CARTER_NAV_TARGET_PATH = f"{CARTER_NAV_PATH}/targetXform"
CARTER_CAMERA_PATH = f"{CARTER_NAV_PATH}/chassis_link/sensors/front_hawk/left/camera_left"
CARTER_NAV_POSITION = (-6, 4, 0)
CARTER_NAV_TARGET_POSITION = (3, 3, 0)


async def advance_timeline_by_duration(duration: float, max_updates: int = 1000):
    timeline = omni.timeline.get_timeline_interface()
    current_time = timeline.get_current_time()
    target_time = current_time + duration

    if timeline.get_end_time() < target_time:
        timeline.set_end_time(1000000)

    if not timeline.is_playing():
        timeline.play()

    print(f"Advancing timeline from {current_time:.4f}s to {target_time:.4f}s")
    step_count = 0
    while current_time < target_time:
        if step_count >= max_updates:
            print(f"Max updates reached: {step_count}, finishing timeline advance.")
            break

        prev_time = current_time
        await omni.kit.app.get_app().next_update_async()
        current_time = timeline.get_current_time()
        step_count += 1

        if step_count % 10 == 0:
            print(f"\tStep {step_count}, {current_time:.4f}s/{target_time:.4f}s")

        if current_time <= prev_time:
            print(f"Warning: Timeline did not advance at update {step_count} (time: {current_time:.4f}s).")
    print(f"Finished advancing timeline to {timeline.get_end_time():.4f}s in {step_count} steps")


async def run_sdg_pipeline(camera_path, num_frames, capture_interval, use_instance_id=True, segmentation_mapping=None):
    rp = rep.create.render_product(camera_path, (1280, 720))
    cosmos_writer = rep.WriterRegistry.get("CosmosWriter")
    backend = rep.backends.get("DiskBackend")
    out_dir = os.path.join(os.getcwd(), f"_out_cosmos")
    print(f"output_directory: {out_dir}")
    backend.initialize(output_dir=out_dir)
    cosmos_writer.initialize(
        backend=backend, use_instance_id=use_instance_id, segmentation_mapping=segmentation_mapping
    )
    cosmos_writer.attach(rp)

    # Make sure the timeline is playing
    timeline = omni.timeline.get_timeline_interface()
    if not timeline.is_playing():
        timeline.play()

    print(f"Starting SDG pipeline. Capturing {num_frames} frames, every {capture_interval} simulation step(s).")
    frames_captured_count = 0
    simulation_step_index = 0
    while frames_captured_count < num_frames:
        print(f"Simulation step {simulation_step_index}")
        if simulation_step_index % capture_interval == 0:
            print(f"\t Capturing frame {frames_captured_count + 1}/{num_frames}")
            await rep.orchestrator.step_async(pause_timeline=False)
            frames_captured_count += 1
        else:
            await omni.kit.app.get_app().next_update_async()
        simulation_step_index += 1

    print("Waiting to finish processing and writing the data")
    await rep.orchestrator.wait_until_complete_async()
    print(f"Finished SDG pipeline. Captured {frames_captured_count} frames")
    cosmos_writer.detach()
    rp.destroy()
    timeline.pause()


async def run_example_async(
    num_frames, capture_interval=1, start_delay=None, use_instance_id=True, segmentation_mapping=None
):
    assets_root_path = await get_assets_root_path_async()
    stage_path = assets_root_path + STAGE_URL
    print(f"Opening stage: '{stage_path}'")
    await omni.usd.get_context().open_stage_async(stage_path)
    stage = omni.usd.get_context().get_stage()

    # Enable script nodes
    carb.settings.get_settings().set_bool("/app/omni.graph.scriptnode/opt_in", True)

    # Disable capture on play on the new stage, data is captured manually using the step function
    rep.orchestrator.set_capture_on_play(False)

    # Load carter nova asset with its navigation graph
    carter_url_path = assets_root_path + CARTER_NAV_ASSET_URL
    print(f"Loading carter nova asset: '{carter_url_path}' at prim path: '{CARTER_NAV_PATH}'")
    carter_nav_prim = add_reference_to_stage(usd_path=carter_url_path, prim_path=CARTER_NAV_PATH)
    if not carter_nav_prim.GetAttribute("xformOp:translate"):
        UsdGeom.Xformable(carter_nav_prim).AddTranslateOp()
    carter_nav_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_POSITION)

    # Set the navigation target position
    carter_navigation_target_prim = stage.GetPrimAtPath(CARTER_NAV_TARGET_PATH)
    if not carter_navigation_target_prim.IsValid():
        print(f"Carter navigation target prim not found at path: {CARTER_NAV_TARGET_PATH}, exiting")
        return
    if not carter_navigation_target_prim.GetAttribute("xformOp:translate"):
        UsdGeom.Xformable(carter_navigation_target_prim).AddTranslateOp()
    carter_navigation_target_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_TARGET_POSITION)

    # Use the carter nova front hawk camera for capturing data
    camera_prim = stage.GetPrimAtPath(CARTER_CAMERA_PATH)
    if not camera_prim.IsValid():
        print(f"Camera prim not found at path: {CARTER_CAMERA_PATH}, exiting")
        return

    # Advance the timeline with the start delay if provided
    if start_delay is not None and start_delay > 0:
        await advance_timeline_by_duration(start_delay)

    # Run the SDG pipeline
    await run_sdg_pipeline(camera_prim.GetPath(), num_frames, capture_interval, use_instance_id, segmentation_mapping)


NUM_FRAMES = 120
CAPTURE_INTERVAL = 4
START_DELAY = 1.0
asyncio.ensure_future(
    run_example_async(num_frames=NUM_FRAMES, capture_interval=CAPTURE_INTERVAL, start_delay=START_DELAY, use_instance_id=True)
)