Useful Snippets#
Various examples of Isaac Sim Replicator snippets that can be run as Standalone Applications or from the UI using the Script Editor.
Annotator and Custom Writer Data from Multiple Cameras#
Example on how to access data from multiple cameras in a scene using annotators or custom writers. The standalone example can also be run directly (on Windows use python.bat
instead of python.sh
):
./python.sh standalone_examples/api/isaacsim.replicator.examples/multi_camera.py
Annotator and Custom Writer Data from Multiple Cameras
1from isaacsim import SimulationApp
2
3simulation_app = SimulationApp(launch_config={"headless": False})
4
5import os
6import omni.usd
7import omni.kit
8import omni.replicator.core as rep
9from omni.replicator.core import AnnotatorRegistry, Writer
10from PIL import Image
11from pxr import UsdGeom, Sdf
12
13NUM_FRAMES = 5
14
15# Save rgb image to file
16def save_rgb(rgb_data, file_name):
17 rgb_img = Image.fromarray(rgb_data, "RGBA")
18 rgb_img.save(file_name + ".png")
19
20
21# Randomize cube color every frame using a replicator randomizer
22def cube_color_randomizer():
23 cube_prims = rep.get.prims(path_pattern="Cube")
24 with cube_prims:
25 rep.randomizer.color(colors=rep.distribution.uniform((0, 0, 0), (1, 1, 1)))
26 return cube_prims.node
27
28
29# Access data through a custom replicator writer
30class MyWriter(Writer):
31 def __init__(self, rgb: bool = True):
32 self._frame_id = 0
33 if rgb:
34 self.annotators.append(AnnotatorRegistry.get_annotator("rgb"))
35 # Create writer output directory
36 self.file_path = os.path.join(os.getcwd(), "_out_mc_writer", "")
37 print(f"Writing writer data to {self.file_path}")
38 dir = os.path.dirname(self.file_path)
39 os.makedirs(dir, exist_ok=True)
40
41 def write(self, data):
42 for annotator in data.keys():
43 annotator_split = annotator.split("-")
44 if len(annotator_split) > 1:
45 render_product_name = annotator_split[-1]
46 if annotator.startswith("rgb"):
47 save_rgb(data[annotator], f"{self.file_path}/{render_product_name}_frame_{self._frame_id}")
48 self._frame_id += 1
49
50
51rep.WriterRegistry.register(MyWriter)
52
53# Create a new stage with a dome light
54omni.usd.get_context().new_stage()
55stage = omni.usd.get_context().get_stage()
56dome_light = stage.DefinePrim("/World/DomeLight", "DomeLight")
57dome_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(900.0)
58
59# Create cube
60cube_prim = stage.DefinePrim("/World/Cube", "Cube")
61UsdGeom.Xformable(cube_prim).AddTranslateOp().Set((0.0, 5.0, 1.0))
62
63# Register cube color randomizer to trigger on every frame
64rep.randomizer.register(cube_color_randomizer)
65with rep.trigger.on_frame():
66 rep.randomizer.cube_color_randomizer()
67
68# Create cameras
69camera_prim1 = stage.DefinePrim("/World/Camera1", "Camera")
70UsdGeom.Xformable(camera_prim1).AddTranslateOp().Set((0.0, 10.0, 20.0))
71UsdGeom.Xformable(camera_prim1).AddRotateXYZOp().Set((-15.0, 0.0, 0.0))
72
73camera_prim2 = stage.DefinePrim("/World/Camera2", "Camera")
74UsdGeom.Xformable(camera_prim2).AddTranslateOp().Set((-10.0, 15.0, 15.0))
75UsdGeom.Xformable(camera_prim2).AddRotateXYZOp().Set((-45.0, 0.0, 45.0))
76
77# Create render products
78rp1 = rep.create.render_product(str(camera_prim1.GetPrimPath()), resolution=(320, 320))
79rp2 = rep.create.render_product(str(camera_prim2.GetPrimPath()), resolution=(640, 640))
80rp3 = rep.create.render_product("/OmniverseKit_Persp", (1024, 1024))
81
82# Access the data through a custom writer
83writer = rep.WriterRegistry.get("MyWriter")
84writer.initialize(rgb=True)
85writer.attach([rp1, rp2, rp3])
86
87# Access the data through annotators
88rgb_annotators = []
89for rp in [rp1, rp2, rp3]:
90 rgb = rep.AnnotatorRegistry.get_annotator("rgb")
91 rgb.attach(rp)
92 rgb_annotators.append(rgb)
93
94# Create annotator output directory
95file_path = os.path.join(os.getcwd(), "_out_mc_annot", "")
96print(f"Writing annotator data to {file_path}")
97dir = os.path.dirname(file_path)
98os.makedirs(dir, exist_ok=True)
99
100# Data will be captured manually using step
101rep.orchestrator.set_capture_on_play(False)
102
103for i in range(NUM_FRAMES):
104 # The step function provides new data to the annotators, triggers the randomizers and the writer
105 rep.orchestrator.step(rt_subframes=4)
106 for j, rgb_annot in enumerate(rgb_annotators):
107 save_rgb(rgb_annot.get_data(), f"{dir}/rp{j}_step_{i}")
108
109simulation_app.close()
Annotator and Custom Writer Data from Multiple Cameras
1import asyncio
2import os
3import omni.usd
4import omni.kit
5import omni.replicator.core as rep
6from omni.replicator.core import AnnotatorRegistry, Writer
7from PIL import Image
8from pxr import UsdGeom, Sdf
9
10NUM_FRAMES = 5
11
12# Save rgb image to file
13def save_rgb(rgb_data, file_name):
14 rgb_img = Image.fromarray(rgb_data, "RGBA")
15 rgb_img.save(file_name + ".png")
16
17
18# Randomize cube color every frame using a replicator randomizer
19def cube_color_randomizer():
20 cube_prims = rep.get.prims(path_pattern="Cube")
21 with cube_prims:
22 rep.randomizer.color(colors=rep.distribution.uniform((0, 0, 0), (1, 1, 1)))
23 return cube_prims.node
24
25
26# Access data through a custom replicator writer
27class MyWriter(Writer):
28 def __init__(self, rgb: bool = True):
29 self._frame_id = 0
30 if rgb:
31 self.annotators.append(AnnotatorRegistry.get_annotator("rgb"))
32 # Create writer output directory
33 self.file_path = os.path.join(os.getcwd(), "_out_mc_writer", "")
34 print(f"Writing writer data to {self.file_path}")
35 dir = os.path.dirname(self.file_path)
36 os.makedirs(dir, exist_ok=True)
37
38 def write(self, data):
39 for annotator in data.keys():
40 annotator_split = annotator.split("-")
41 if len(annotator_split) > 1:
42 render_product_name = annotator_split[-1]
43 if annotator.startswith("rgb"):
44 save_rgb(data[annotator], f"{self.file_path}/{render_product_name}_frame_{self._frame_id}")
45 self._frame_id += 1
46
47
48rep.WriterRegistry.register(MyWriter)
49
50# Create a new stage with a dome light
51omni.usd.get_context().new_stage()
52stage = omni.usd.get_context().get_stage()
53dome_light = stage.DefinePrim("/World/DomeLight", "DomeLight")
54dome_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(900.0)
55
56# Create cube
57cube_prim = stage.DefinePrim("/World/Cube", "Cube")
58UsdGeom.Xformable(cube_prim).AddTranslateOp().Set((0.0, 5.0, 1.0))
59
60# Register cube color randomizer to trigger on every frame
61rep.randomizer.register(cube_color_randomizer)
62with rep.trigger.on_frame():
63 rep.randomizer.cube_color_randomizer()
64
65# Create cameras
66camera_prim1 = stage.DefinePrim("/World/Camera1", "Camera")
67UsdGeom.Xformable(camera_prim1).AddTranslateOp().Set((0.0, 10.0, 20.0))
68UsdGeom.Xformable(camera_prim1).AddRotateXYZOp().Set((-15.0, 0.0, 0.0))
69
70camera_prim2 = stage.DefinePrim("/World/Camera2", "Camera")
71UsdGeom.Xformable(camera_prim2).AddTranslateOp().Set((-10.0, 15.0, 15.0))
72UsdGeom.Xformable(camera_prim2).AddRotateXYZOp().Set((-45.0, 0.0, 45.0))
73
74# Create render products
75rp1 = rep.create.render_product(str(camera_prim1.GetPrimPath()), resolution=(320, 320))
76rp2 = rep.create.render_product(str(camera_prim2.GetPrimPath()), resolution=(640, 640))
77rp3 = rep.create.render_product("/OmniverseKit_Persp", (1024, 1024))
78
79# Access the data through a custom writer
80writer = rep.WriterRegistry.get("MyWriter")
81writer.initialize(rgb=True)
82writer.attach([rp1, rp2, rp3])
83
84# Access the data through annotators
85rgb_annotators = []
86for rp in [rp1, rp2, rp3]:
87 rgb = rep.AnnotatorRegistry.get_annotator("rgb")
88 rgb.attach(rp)
89 rgb_annotators.append(rgb)
90
91# Create annotator output directory
92file_path = os.path.join(os.getcwd(), "_out_mc_annot", "")
93print(f"Writing annotator data to {file_path}")
94dir = os.path.dirname(file_path)
95os.makedirs(dir, exist_ok=True)
96
97# Data will be captured manually using step
98rep.orchestrator.set_capture_on_play(False)
99
100async def run_example_async():
101 for i in range(NUM_FRAMES):
102 # The step function provides new data to the annotators, triggers the randomizers and the writer
103 await rep.orchestrator.step_async(rt_subframes=4)
104 for j, rgb_annot in enumerate(rgb_annotators):
105 save_rgb(rgb_annot.get_data(), f"{dir}/rp{j}_step_{i}")
106
107
108asyncio.ensure_future(run_example_async())
Synthetic Data Access at Specific Simulation Timepoints#
Example on how to access synthetic data (rgb, semantic segmentation) from multiple cameras in a simulation scene at specific events using annotators or writers. The standalone example can also be run directly (on Windows use python.bat
instead of python.sh
):
./python.sh standalone_examples/api/isaacsim.replicator.examples/simulation_get_data.py
Synthetic Data Access at Specific Simulation Timepoints
1from isaacsim import SimulationApp
2
3simulation_app = SimulationApp(launch_config={"renderer": "RayTracedLighting", "headless": False})
4
5import json
6import os
7
8import carb.settings
9import numpy as np
10import omni
11import omni.replicator.core as rep
12from isaacsim.core.api import World
13from isaacsim.core.api.objects import DynamicCuboid
14from isaacsim.core.utils.semantics import add_update_semantics
15from PIL import Image
16
17
18# Util function to save rgb annotator data
19def write_rgb_data(rgb_data, file_path):
20 rgb_img = Image.fromarray(rgb_data, "RGBA")
21 rgb_img.save(file_path + ".png")
22
23
24# Util function to save semantic segmentation annotator data
25def write_sem_data(sem_data, file_path):
26 id_to_labels = sem_data["info"]["idToLabels"]
27 with open(file_path + ".json", "w") as f:
28 json.dump(id_to_labels, f)
29 sem_image_data = np.frombuffer(sem_data["data"], dtype=np.uint8).reshape(*sem_data["data"].shape, -1)
30 sem_img = Image.fromarray(sem_image_data, "RGBA")
31 sem_img.save(file_path + ".png")
32
33
34# Create a new stage with the default ground plane
35omni.usd.get_context().new_stage()
36
37# Setup the simulation world
38world = World()
39world.scene.add_default_ground_plane()
40world.reset()
41
42# Setting capture on play to False will prevent the replicator from capturing data each frame
43carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False)
44
45# Create a camera and render product to collect the data from
46cam = rep.create.camera(position=(5, 5, 5), look_at=(0, 0, 0))
47rp = rep.create.render_product(cam, (512, 512))
48
49# Set the output directory for the data
50out_dir = os.path.join(os.getcwd(), "_out_sim_event")
51os.makedirs(out_dir, exist_ok=True)
52print(f"Outputting data to {out_dir}..")
53
54# Example of using a writer to save the data
55writer = rep.WriterRegistry.get("BasicWriter")
56writer.initialize(
57 output_dir=f"{out_dir}/writer", rgb=True, semantic_segmentation=True, colorize_semantic_segmentation=True
58)
59writer.attach(rp)
60
61# Run a preview to ensure the replicator graph is initialized
62rep.orchestrator.preview()
63
64# Example of accessing the data directly from annotators
65rgb_annot = rep.AnnotatorRegistry.get_annotator("rgb")
66rgb_annot.attach(rp)
67sem_annot = rep.AnnotatorRegistry.get_annotator("semantic_segmentation", init_params={"colorize": True})
68sem_annot.attach(rp)
69
70# Spawn and drop a few cubes, capture data when they stop moving
71for i in range(5):
72 cuboid = world.scene.add(DynamicCuboid(prim_path=f"/World/Cuboid_{i}", name=f"Cuboid_{i}", position=(0, 0, 10 + i)))
73 add_update_semantics(cuboid.prim, "Cuboid")
74
75 for s in range(500):
76 world.step(render=False)
77 vel = np.linalg.norm(cuboid.get_linear_velocity())
78 if vel < 0.1:
79 print(f"Cube_{i} stopped moving after {s} simulation steps, writing data..")
80 # Trigger the writer and update the annotators with new data
81 rep.orchestrator.step(rt_subframes=4, delta_time=0.0, pause_timeline=False)
82 write_rgb_data(rgb_annot.get_data(), f"{out_dir}/Cube_{i}_step_{s}_rgb")
83 write_sem_data(sem_annot.get_data(), f"{out_dir}/Cube_{i}_step_{s}_sem")
84 break
85
86simulation_app.close()
Synthetic Data Access at Specific Simulation Timepoints
1import asyncio
2import json
3import os
4
5import carb.settings
6import numpy as np
7import omni
8import omni.replicator.core as rep
9from isaacsim.core.api import World
10from isaacsim.core.api.objects import DynamicCuboid
11from isaacsim.core.utils.semantics import add_update_semantics
12from PIL import Image
13
14
15# Util function to save rgb annotator data
16def write_rgb_data(rgb_data, file_path):
17 rgb_img = Image.fromarray(rgb_data, "RGBA")
18 rgb_img.save(file_path + ".png")
19
20
21# Util function to save semantic segmentation annotator data
22def write_sem_data(sem_data, file_path):
23 id_to_labels = sem_data["info"]["idToLabels"]
24 with open(file_path + ".json", "w") as f:
25 json.dump(id_to_labels, f)
26 sem_image_data = np.frombuffer(sem_data["data"], dtype=np.uint8).reshape(*sem_data["data"].shape, -1)
27 sem_img = Image.fromarray(sem_image_data, "RGBA")
28 sem_img.save(file_path + ".png")
29
30
31# Create a new stage with the default ground plane
32omni.usd.get_context().new_stage()
33
34# Setup the simulation world
35world = World()
36world.scene.add_default_ground_plane()
37
38
39# Setting capture on play to False will prevent the replicator from capturing data each frame
40carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False)
41
42# Create a camera and render product to collect the data from
43cam = rep.create.camera(position=(5, 5, 5), look_at=(0, 0, 0))
44rp = rep.create.render_product(cam, (512, 512))
45
46# Set the output directory for the data
47out_dir = os.path.join(os.getcwd(), "_out_sim_event")
48os.makedirs(out_dir, exist_ok=True)
49print(f"Outputting data to {out_dir}..")
50
51# Example of using a writer to save the data
52writer = rep.WriterRegistry.get("BasicWriter")
53writer.initialize(
54 output_dir=f"{out_dir}/writer", rgb=True, semantic_segmentation=True, colorize_semantic_segmentation=True
55)
56writer.attach(rp)
57
58# Run a preview to ensure the replicator graph is initialized
59rep.orchestrator.preview()
60
61# Example of accessing the data directly from annotators
62rgb_annot = rep.AnnotatorRegistry.get_annotator("rgb")
63rgb_annot.attach(rp)
64sem_annot = rep.AnnotatorRegistry.get_annotator("semantic_segmentation", init_params={"colorize": True})
65sem_annot.attach(rp)
66
67
68async def run_example_async():
69 await world.initialize_simulation_context_async()
70 await world.reset_async()
71
72 # Spawn and drop a few cubes, capture data when they stop moving
73 for i in range(5):
74 cuboid = world.scene.add(
75 DynamicCuboid(prim_path=f"/World/Cuboid_{i}", name=f"Cuboid_{i}", position=(0, 0, 10 + i))
76 )
77 add_update_semantics(cuboid.prim, "Cuboid")
78
79 for s in range(500):
80 await omni.kit.app.get_app().next_update_async()
81 vel = np.linalg.norm(cuboid.get_linear_velocity())
82 if vel < 0.1:
83 print(f"Cube_{i} stopped moving after {s} simulation steps, writing data..")
84 # Trigger the writer and update the annotators with new data
85 await rep.orchestrator.step_async(rt_subframes=4, delta_time=0.0, pause_timeline=False)
86 write_rgb_data(rgb_annot.get_data(), f"{out_dir}/Cube_{i}_step_{s}_rgb")
87 write_sem_data(sem_annot.get_data(), f"{out_dir}/Cube_{i}_step_{s}_sem")
88 break
89
90
91asyncio.ensure_future(run_example_async())
Custom Event Randomization and Writing#
The following example showcases the use of custom events to trigger randomizations and data writing at various times throughout the simulation. The standalone example can also be run directly (on Windows use python.bat
instead of python.sh
):
./python.sh standalone_examples/api/isaacsim.replicator.examples/custom_event_and_write.py
Custom Event Randomization and Writing
1from isaacsim import SimulationApp
2
3simulation_app = SimulationApp(launch_config={"headless": False})
4
5import os
6
7import omni.replicator.core as rep
8import omni.usd
9
10omni.usd.get_context().new_stage()
11distance_light = rep.create.light(rotation=(315, 0, 0), intensity=4000, light_type="distant")
12
13large_cube = rep.create.cube(scale=1.25, position=(1, 1, 0))
14small_cube = rep.create.cube(scale=0.75, position=(-1, -1, 0))
15large_cube_prim = large_cube.get_output_prims()["prims"][0]
16small_cube_prim = small_cube.get_output_prims()["prims"][0]
17
18rp = rep.create.render_product("/OmniverseKit_Persp", (512, 512))
19writer = rep.WriterRegistry.get("BasicWriter")
20out_dir = os.path.join(os.getcwd(), "_out_custom_event")
21print(f"Writing data to {out_dir}")
22writer.initialize(output_dir=out_dir, rgb=True)
23writer.attach(rp)
24
25with rep.trigger.on_custom_event(event_name="randomize_large_cube"):
26 with large_cube:
27 rep.randomizer.rotation()
28
29with rep.trigger.on_custom_event(event_name="randomize_small_cube"):
30 with small_cube:
31 rep.randomizer.rotation()
32
33
34def run_example():
35 print(f"Randomizing small cube")
36 rep.utils.send_og_event(event_name="randomize_small_cube")
37 print("Capturing frame")
38 rep.orchestrator.step(rt_subframes=8)
39
40 print("Moving small cube")
41 small_cube_prim.GetAttribute("xformOp:translate").Set((-2, -2, 0))
42 print("Capturing frame")
43 rep.orchestrator.step(rt_subframes=8)
44
45 print(f"Randomizing large cube")
46 rep.utils.send_og_event(event_name="randomize_large_cube")
47 print("Capturing frame")
48 rep.orchestrator.step(rt_subframes=8)
49
50 print("Moving large cube")
51 large_cube_prim.GetAttribute("xformOp:translate").Set((2, 2, 0))
52 print("Capturing frame")
53 rep.orchestrator.step(rt_subframes=8)
54
55 # Wait until all the data is saved to disk
56 rep.orchestrator.wait_until_complete()
57
58
59run_example()
60
61simulation_app.close()
Custom Event Randomization and Writing
1import asyncio
2import os
3
4import omni.replicator.core as rep
5import omni.usd
6
7omni.usd.get_context().new_stage()
8distance_light = rep.create.light(rotation=(315, 0, 0), intensity=4000, light_type="distant")
9
10large_cube = rep.create.cube(scale=1.25, position=(1, 1, 0))
11small_cube = rep.create.cube(scale=0.75, position=(-1, -1, 0))
12large_cube_prim = large_cube.get_output_prims()["prims"][0]
13small_cube_prim = small_cube.get_output_prims()["prims"][0]
14
15rp = rep.create.render_product("/OmniverseKit_Persp", (512, 512))
16writer = rep.WriterRegistry.get("BasicWriter")
17out_dir = os.path.join(os.getcwd(), "_out_custom_event")
18print(f"Writing data to {out_dir}")
19writer.initialize(output_dir=out_dir, rgb=True)
20writer.attach(rp)
21
22with rep.trigger.on_custom_event(event_name="randomize_large_cube"):
23 with large_cube:
24 rep.randomizer.rotation()
25
26with rep.trigger.on_custom_event(event_name="randomize_small_cube"):
27 with small_cube:
28 rep.randomizer.rotation()
29
30
31async def run_example_async():
32 print(f"Randomizing small cube")
33 rep.utils.send_og_event(event_name="randomize_small_cube")
34 print("Capturing frame")
35 await rep.orchestrator.step_async(rt_subframes=8)
36
37 print("Moving small cube")
38 small_cube_prim.GetAttribute("xformOp:translate").Set((-2, -2, 0))
39 print("Capturing frame")
40 await rep.orchestrator.step_async(rt_subframes=8)
41
42 print(f"Randomizing large cube")
43 rep.utils.send_og_event(event_name="randomize_large_cube")
44 print("Capturing frame")
45 await rep.orchestrator.step_async(rt_subframes=8)
46
47 print("Moving large cube")
48 large_cube_prim.GetAttribute("xformOp:translate").Set((2, 2, 0))
49 print("Capturing frame")
50 await rep.orchestrator.step_async(rt_subframes=8)
51
52 # Wait until all the data is saved to disk
53 await rep.orchestrator.wait_until_complete_async()
54
55
56asyncio.ensure_future(run_example_async())
Motion Blur#
This example demonstrates how to capture motion blur data using RTX Real-Time and RTX Interactive (Path Tracing) rendering modes. For the RTX - Real-Time mode, details on the motion blur parameters can be found here. For the RTX – Interactive (Path Tracing) mode, motion blur is achieved by rendering multiple subframes (/omni/replicator/pathTracedMotionBlurSubSamples
) and combining them to create the effect. The example uses animated and physics-enabled assets with synchronized motion. Keyframe animated assets can be advanced at any custom delta time due to their interpolated motion, whereas physics-enabled assets require a custom physics FPS to ensure motion samples at any custom delta time. The example showcases how to compute the target physics FPS, change it if needed, and restore the original physics FPS after capturing the motion blur.
The standalone example can also be run directly (on Windows use python.bat
instead of python.sh
):
./python.sh standalone_examples/api/isaacsim.replicator.examples/motion_blur.py
Motion Blur
1from isaacsim import SimulationApp
2
3simulation_app = SimulationApp({"headless": False})
4
5import os
6
7import carb.settings
8import omni.kit.app
9import omni.replicator.core as rep
10import omni.timeline
11import omni.usd
12from isaacsim.storage.native import get_assets_root_path
13from pxr import PhysxSchema, Sdf, UsdGeom, UsdPhysics
14
15# Paths to the animated and physics-ready assets
16PHYSICS_ASSET_URL = "/Isaac/Props/YCB/Axis_Aligned_Physics/003_cracker_box.usd"
17ANIM_ASSET_URL = "/Isaac/Props/YCB/Axis_Aligned/003_cracker_box.usd"
18
19# -z velocities and start locations of the animated (left side) and physics (right side) assets (stage units/s)
20ASSET_VELOCITIES = [0, 5, 10]
21ASSET_X_MIRRORED_LOCATIONS = [(0.5, 0, 0.3), (0.3, 0, 0.3), (0.1, 0, 0.3)]
22
23# Used to calculate how many frames to animate the assets to maintain the same velocity as the physics assets
24ANIMATION_DURATION = 10
25
26# Create a new stage with animated and physics-enabled assets with synchronized motion
27def setup_stage():
28 # Create new stage
29 omni.usd.get_context().new_stage()
30 stage = omni.usd.get_context().get_stage()
31 timeline = omni.timeline.get_timeline_interface()
32 timeline.set_end_time(ANIMATION_DURATION)
33
34 # Create lights
35 dome_light = stage.DefinePrim("/World/DomeLight", "DomeLight")
36 dome_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(100.0)
37 distant_light = stage.DefinePrim("/World/DistantLight", "DistantLight")
38 if not distant_light.GetAttribute("xformOp:rotateXYZ"):
39 UsdGeom.Xformable(distant_light).AddRotateXYZOp()
40 distant_light.GetAttribute("xformOp:rotateXYZ").Set((-75, 0, 0))
41 distant_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(2500)
42
43 # Setup the physics assets with gravity disabled and the requested velocity
44 assets_root_path = get_assets_root_path()
45 physics_asset_url = assets_root_path + PHYSICS_ASSET_URL
46 for loc, vel in zip(ASSET_X_MIRRORED_LOCATIONS, ASSET_VELOCITIES):
47 prim = stage.DefinePrim(f"/World/physics_asset_{int(abs(vel))}", "Xform")
48 prim.GetReferences().AddReference(physics_asset_url)
49 if not prim.GetAttribute("xformOp:translate"):
50 UsdGeom.Xformable(prim).AddTranslateOp()
51 prim.GetAttribute("xformOp:translate").Set(loc)
52 prim.GetAttribute("physxRigidBody:disableGravity").Set(True)
53 prim.GetAttribute("physxRigidBody:angularDamping").Set(0.0)
54 prim.GetAttribute("physxRigidBody:linearDamping").Set(0.0)
55 prim.GetAttribute("physics:velocity").Set((0, 0, -vel))
56
57 # Setup animated assets maintaining the same velocity as the physics assets
58 anim_asset_url = assets_root_path + ANIM_ASSET_URL
59 for loc, vel in zip(ASSET_X_MIRRORED_LOCATIONS, ASSET_VELOCITIES):
60 start_loc = (-loc[0], loc[1], loc[2])
61 prim = stage.DefinePrim(f"/World/anim_asset_{int(abs(vel))}", "Xform")
62 prim.GetReferences().AddReference(anim_asset_url)
63 if not prim.GetAttribute("xformOp:translate"):
64 UsdGeom.Xformable(prim).AddTranslateOp()
65 anim_distance = vel * ANIMATION_DURATION
66 end_loc = (start_loc[0], start_loc[1], start_loc[2] - anim_distance)
67 end_keyframe = timeline.get_time_codes_per_seconds() * ANIMATION_DURATION
68 # Timesampled keyframe (animated) translation
69 prim.GetAttribute("xformOp:translate").Set(start_loc, time=0)
70 prim.GetAttribute("xformOp:translate").Set(end_loc, time=end_keyframe)
71
72
73# Capture motion blur frames with the given delta time step and render mode
74def run_motion_blur_example(num_frames=3, custom_delta_time=None, use_path_tracing=True, pt_subsamples=8, pt_spp=64):
75 # Create a new stage with the assets
76 setup_stage()
77 stage = omni.usd.get_context().get_stage()
78
79 # Set replicator settings (capture only on request and enable motion blur)
80 carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False)
81 carb.settings.get_settings().set("/omni/replicator/captureMotionBlur", True)
82
83 # Set motion blur settings based on the render mode
84 if use_path_tracing:
85 print(f"[MotionBlur] Setting PathTracing render mode motion blur settings")
86 carb.settings.get_settings().set("/rtx/rendermode", "PathTracing")
87 # (int): Total number of samples for each rendered pixel, per frame.
88 carb.settings.get_settings().set("/rtx/pathtracing/spp", pt_spp)
89 # (int): Maximum number of samples to accumulate per pixel. When this count is reached the rendering stops until a scene or setting change is detected, restarting the rendering process. Set to 0 to remove this limit.
90 carb.settings.get_settings().set("/rtx/pathtracing/totalSpp", pt_spp)
91 carb.settings.get_settings().set("/rtx/pathtracing/optixDenoiser/enabled", 0)
92 # Number of sub samples to render if in PathTracing render mode and motion blur is enabled.
93 carb.settings.get_settings().set("/omni/replicator/pathTracedMotionBlurSubSamples", pt_subsamples)
94 else:
95 print(f"[MotionBlur] Setting RayTracedLighting render mode motion blur settings")
96 carb.settings.get_settings().set("/rtx/rendermode", "RayTracedLighting")
97 # 0: Disabled, 1: TAA, 2: FXAA, 3: DLSS, 4:RTXAA
98 carb.settings.get_settings().set("/rtx/post/aa/op", 2)
99 # (float): The fraction of the largest screen dimension to use as the maximum motion blur diameter.
100 carb.settings.get_settings().set("/rtx/post/motionblur/maxBlurDiameterFraction", 0.02)
101 # (float): Exposure time fraction in frames (1.0 = one frame duration) to sample.
102 carb.settings.get_settings().set("/rtx/post/motionblur/exposureFraction", 1.0)
103 # (int): Number of samples to use in the filter. A higher number improves quality at the cost of performance.
104 carb.settings.get_settings().set("/rtx/post/motionblur/numSamples", 8)
105
106 # Setup camera and writer
107 camera = rep.create.camera(position=(0, 1.5, 0), look_at=(0, 0, 0), name="MotionBlurCam")
108 render_product = rep.create.render_product(camera, (1280, 720))
109 basic_writer = rep.WriterRegistry.get("BasicWriter")
110 delta_time_str = "None" if custom_delta_time is None else f"{custom_delta_time:.4f}"
111 render_mode_str = f"pt_subsamples_{pt_subsamples}_spp_{pt_spp}" if use_path_tracing else "rt"
112 output_directory = os.path.join(os.getcwd(), f"_out_motion_blur_dt_{delta_time_str}_{render_mode_str}")
113 print(f"[MotionBlur] Output directory: {output_directory}")
114 basic_writer.initialize(output_dir=output_directory, rgb=True)
115 basic_writer.attach(render_product)
116
117 # Run a few updates to make sure all materials are fully loaded for capture
118 for _ in range(50):
119 simulation_app.update()
120
121 # Use the physics scene to modify the physics FPS (if needed) to guarantee motion samples at any custom delta time
122 physx_scene = None
123 for prim in stage.Traverse():
124 if prim.IsA(UsdPhysics.Scene):
125 physx_scene = PhysxSchema.PhysxSceneAPI.Apply(prim)
126 break
127 if physx_scene is None:
128 print(f"[MotionBlur] Creating a new PhysicsScene")
129 physics_scene = UsdPhysics.Scene.Define(stage, "/PhysicsScene")
130 physx_scene = PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/PhysicsScene"))
131
132 # Check the target physics depending on the custom delta time and the render mode
133 target_physics_fps = stage.GetTimeCodesPerSecond() if custom_delta_time is None else 1 / custom_delta_time
134 if use_path_tracing:
135 target_physics_fps *= pt_subsamples
136
137 # Check if the physics FPS needs to be increased to match the custom delta time
138 orig_physics_fps = physx_scene.GetTimeStepsPerSecondAttr().Get()
139 if target_physics_fps > orig_physics_fps:
140 print(f"[MotionBlur] Changing physics FPS from {orig_physics_fps} to {target_physics_fps}")
141 physx_scene.GetTimeStepsPerSecondAttr().Set(target_physics_fps)
142
143 # Start the timeline for physics updates in the step function
144 timeline = omni.timeline.get_timeline_interface()
145 timeline.play()
146
147 # Capture frames
148 for i in range(num_frames):
149 print(f"[MotionBlur] \tCapturing frame {i}")
150 rep.orchestrator.step(delta_time=custom_delta_time)
151
152 # Restore the original physics FPS
153 if target_physics_fps > orig_physics_fps:
154 print(f"[MotionBlur] Restoring physics FPS from {target_physics_fps} to {orig_physics_fps}")
155 physx_scene.GetTimeStepsPerSecondAttr().Set(orig_physics_fps)
156
157 # Switch back to the raytracing render mode
158 if use_path_tracing:
159 print(f"[MotionBlur] Restoring render mode to RayTracedLighting")
160 carb.settings.get_settings().set("/rtx/rendermode", "RayTracedLighting")
161
162 # Wait until all the data is saved to disk
163 rep.orchestrator.wait_until_complete()
164
165
166def run_motion_blur_examples():
167 motion_blur_step_duration = [None, 1 / 30, 1 / 60, 1 / 240]
168 for custom_delta_time in motion_blur_step_duration:
169 # RayTracing examples
170 run_motion_blur_example(custom_delta_time=custom_delta_time, use_path_tracing=False)
171 # PathTracing examples
172 spps = [32, 128]
173 motion_blur_sub_samples = [4, 16]
174 for motion_blur_sub_sample in motion_blur_sub_samples:
175 for spp in spps:
176 run_motion_blur_example(
177 custom_delta_time=custom_delta_time,
178 use_path_tracing=True,
179 pt_subsamples=motion_blur_sub_sample,
180 pt_spp=spp,
181 )
182
183
184run_motion_blur_examples()
185
186simulation_app.close()
Motion Blur
1import asyncio
2import os
3
4import carb.settings
5import omni.kit.app
6import omni.replicator.core as rep
7import omni.timeline
8import omni.usd
9from isaacsim.storage.native import get_assets_root_path
10from pxr import PhysxSchema, Sdf, UsdGeom, UsdPhysics
11
12# Paths to the animated and physics-ready assets
13PHYSICS_ASSET_URL = "/Isaac/Props/YCB/Axis_Aligned_Physics/003_cracker_box.usd"
14ANIM_ASSET_URL = "/Isaac/Props/YCB/Axis_Aligned/003_cracker_box.usd"
15
16# -z velocities and start locations of the animated (left side) and physics (right side) assets (stage units/s)
17ASSET_VELOCITIES = [0, 5, 10]
18ASSET_X_MIRRORED_LOCATIONS = [(0.5, 0, 0.3), (0.3, 0, 0.3), (0.1, 0, 0.3)]
19
20# Used to calculate how many frames to animate the assets to maintain the same velocity as the physics assets
21ANIMATION_DURATION = 10
22
23# Create a new stage with animated and physics-enabled assets with synchronized motion
24def setup_stage():
25 # Create new stage
26 omni.usd.get_context().new_stage()
27 stage = omni.usd.get_context().get_stage()
28 timeline = omni.timeline.get_timeline_interface()
29 timeline.set_end_time(ANIMATION_DURATION)
30
31 # Create lights
32 dome_light = stage.DefinePrim("/World/DomeLight", "DomeLight")
33 dome_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(100.0)
34 distant_light = stage.DefinePrim("/World/DistantLight", "DistantLight")
35 if not distant_light.GetAttribute("xformOp:rotateXYZ"):
36 UsdGeom.Xformable(distant_light).AddRotateXYZOp()
37 distant_light.GetAttribute("xformOp:rotateXYZ").Set((-75, 0, 0))
38 distant_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(2500)
39
40 # Setup the physics assets with gravity disabled and the requested velocity
41 assets_root_path = get_assets_root_path()
42 physics_asset_url = assets_root_path + PHYSICS_ASSET_URL
43 for loc, vel in zip(ASSET_X_MIRRORED_LOCATIONS, ASSET_VELOCITIES):
44 prim = stage.DefinePrim(f"/World/physics_asset_{int(abs(vel))}", "Xform")
45 prim.GetReferences().AddReference(physics_asset_url)
46 if not prim.GetAttribute("xformOp:translate"):
47 UsdGeom.Xformable(prim).AddTranslateOp()
48 prim.GetAttribute("xformOp:translate").Set(loc)
49 prim.GetAttribute("physxRigidBody:disableGravity").Set(True)
50 prim.GetAttribute("physxRigidBody:angularDamping").Set(0.0)
51 prim.GetAttribute("physxRigidBody:linearDamping").Set(0.0)
52 prim.GetAttribute("physics:velocity").Set((0, 0, -vel))
53
54 # Setup animated assets maintaining the same velocity as the physics assets
55 anim_asset_url = assets_root_path + ANIM_ASSET_URL
56 for loc, vel in zip(ASSET_X_MIRRORED_LOCATIONS, ASSET_VELOCITIES):
57 start_loc = (-loc[0], loc[1], loc[2])
58 prim = stage.DefinePrim(f"/World/anim_asset_{int(abs(vel))}", "Xform")
59 prim.GetReferences().AddReference(anim_asset_url)
60 if not prim.GetAttribute("xformOp:translate"):
61 UsdGeom.Xformable(prim).AddTranslateOp()
62 anim_distance = vel * ANIMATION_DURATION
63 end_loc = (start_loc[0], start_loc[1], start_loc[2] - anim_distance)
64 end_keyframe = timeline.get_time_codes_per_seconds() * ANIMATION_DURATION
65 # Timesampled keyframe (animated) translation
66 prim.GetAttribute("xformOp:translate").Set(start_loc, time=0)
67 prim.GetAttribute("xformOp:translate").Set(end_loc, time=end_keyframe)
68
69
70# Capture motion blur frames with the given delta time step and render mode
71async def run_motion_blur_example_async(
72 num_frames=3, custom_delta_time=None, use_path_tracing=True, pt_subsamples=8, pt_spp=64
73):
74 # Create a new stage with the assets
75 setup_stage()
76 stage = omni.usd.get_context().get_stage()
77
78 # Set replicator settings (capture only on request and enable motion blur)
79 carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False)
80 carb.settings.get_settings().set("/omni/replicator/captureMotionBlur", True)
81
82 # Set motion blur settings based on the render mode
83 if use_path_tracing:
84 print(f"[MotionBlur] Setting PathTracing render mode motion blur settings")
85 carb.settings.get_settings().set("/rtx/rendermode", "PathTracing")
86 # (int): Total number of samples for each rendered pixel, per frame.
87 carb.settings.get_settings().set("/rtx/pathtracing/spp", pt_spp)
88 # (int): Maximum number of samples to accumulate per pixel. When this count is reached the rendering stops until a scene or setting change is detected, restarting the rendering process. Set to 0 to remove this limit.
89 carb.settings.get_settings().set("/rtx/pathtracing/totalSpp", pt_spp)
90 carb.settings.get_settings().set("/rtx/pathtracing/optixDenoiser/enabled", 0)
91 # Number of sub samples to render if in PathTracing render mode and motion blur is enabled.
92 carb.settings.get_settings().set("/omni/replicator/pathTracedMotionBlurSubSamples", pt_subsamples)
93 else:
94 print(f"[MotionBlur] Setting RayTracedLighting render mode motion blur settings")
95 carb.settings.get_settings().set("/rtx/rendermode", "RayTracedLighting")
96 # 0: Disabled, 1: TAA, 2: FXAA, 3: DLSS, 4:RTXAA
97 carb.settings.get_settings().set("/rtx/post/aa/op", 2)
98 # (float): The fraction of the largest screen dimension to use as the maximum motion blur diameter.
99 carb.settings.get_settings().set("/rtx/post/motionblur/maxBlurDiameterFraction", 0.02)
100 # (float): Exposure time fraction in frames (1.0 = one frame duration) to sample.
101 carb.settings.get_settings().set("/rtx/post/motionblur/exposureFraction", 1.0)
102 # (int): Number of samples to use in the filter. A higher number improves quality at the cost of performance.
103 carb.settings.get_settings().set("/rtx/post/motionblur/numSamples", 8)
104
105 # Setup camera and writer
106 camera = rep.create.camera(position=(0, 1.5, 0), look_at=(0, 0, 0), name="MotionBlurCam")
107 render_product = rep.create.render_product(camera, (1280, 720))
108 basic_writer = rep.WriterRegistry.get("BasicWriter")
109 delta_time_str = "None" if custom_delta_time is None else f"{custom_delta_time:.4f}"
110 render_mode_str = f"pt_subsamples_{pt_subsamples}_spp_{pt_spp}" if use_path_tracing else "rt"
111 output_directory = os.path.join(os.getcwd(), f"_out_motion_blur_dt_{delta_time_str}_{render_mode_str}")
112 print(f"[MotionBlur] Output directory: {output_directory}")
113 basic_writer.initialize(output_dir=output_directory, rgb=True)
114 basic_writer.attach(render_product)
115
116 # Run a few updates to make sure all materials are fully loaded for capture
117 for _ in range(50):
118 await omni.kit.app.get_app().next_update_async()
119
120 # Use the physics scene to modify the physics FPS (if needed) to guarantee motion samples at any custom delta time
121 physx_scene = None
122 for prim in stage.Traverse():
123 if prim.IsA(UsdPhysics.Scene):
124 physx_scene = PhysxSchema.PhysxSceneAPI.Apply(prim)
125 break
126 if physx_scene is None:
127 print(f"[MotionBlur] Creating a new PhysicsScene")
128 physics_scene = UsdPhysics.Scene.Define(stage, "/PhysicsScene")
129 physx_scene = PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/PhysicsScene"))
130
131 # Check the target physics depending on the custom delta time and the render mode
132 target_physics_fps = stage.GetTimeCodesPerSecond() if custom_delta_time is None else 1 / custom_delta_time
133 if use_path_tracing:
134 target_physics_fps *= pt_subsamples
135
136 # Check if the physics FPS needs to be increased to match the custom delta time
137 orig_physics_fps = physx_scene.GetTimeStepsPerSecondAttr().Get()
138 if target_physics_fps > orig_physics_fps:
139 print(f"[MotionBlur] Changing physics FPS from {orig_physics_fps} to {target_physics_fps}")
140 physx_scene.GetTimeStepsPerSecondAttr().Set(target_physics_fps)
141
142 # Start the timeline for physics updates in the step function
143 timeline = omni.timeline.get_timeline_interface()
144 timeline.play()
145
146 # Capture frames
147 for i in range(num_frames):
148 print(f"[MotionBlur] \tCapturing frame {i}")
149 await rep.orchestrator.step_async(delta_time=custom_delta_time)
150
151 # Restore the original physics FPS
152 if target_physics_fps > orig_physics_fps:
153 print(f"[MotionBlur] Restoring physics FPS from {target_physics_fps} to {orig_physics_fps}")
154 physx_scene.GetTimeStepsPerSecondAttr().Set(orig_physics_fps)
155
156 # Switch back to the raytracing render mode
157 if use_path_tracing:
158 print(f"[MotionBlur] Restoring render mode to RayTracedLighting")
159 carb.settings.get_settings().set("/rtx/rendermode", "RayTracedLighting")
160
161 # Wait until all the data is saved to disk
162 await rep.orchestrator.wait_until_complete_async()
163
164
165async def run_motion_blur_examples_async():
166 motion_blur_step_duration = [None, 1 / 30, 1 / 60, 1 / 240]
167 for custom_delta_time in motion_blur_step_duration:
168 # RayTracing examples
169 await run_motion_blur_example_async(custom_delta_time=custom_delta_time, use_path_tracing=False)
170 # PathTracing examples
171 spps = [32, 128]
172 motion_blur_sub_samples = [4, 16]
173 for motion_blur_sub_sample in motion_blur_sub_samples:
174 for spp in spps:
175 await run_motion_blur_example_async(
176 custom_delta_time=custom_delta_time,
177 use_path_tracing=True,
178 pt_subsamples=motion_blur_sub_sample,
179 pt_spp=spp,
180 )
181
182
183asyncio.ensure_future(run_motion_blur_examples_async())
Subscribers and Events at Custom FPS#
Examples of subscribing to various events (such as stage, physics, and render/app), setting custom update rates, and adjusting various related settings. The standalone example can also be run directly (on Windows use python.bat
instead of python.sh
):
./python.sh standalone_examples/api/isaacsim.replicator.examples/subscribers_and_events.py
Subscribers and Events at Custom FPS
1from isaacsim import SimulationApp
2
3simulation_app = SimulationApp({"headless": False})
4
5import asyncio
6import math
7import time
8
9import carb.eventdispatcher
10import carb.events
11import carb.settings
12import omni.kit.app
13import omni.physx
14import omni.timeline
15import omni.usd
16from pxr import PhysxSchema, UsdPhysics
17
18# TIMELINE / STAGE
19USE_CUSTOM_TIMELINE_SETTINGS = False
20USE_FIXED_TIME_STEPPING = False
21PLAY_EVERY_FRAME = True
22PLAY_DELAY_COMPENSATION = 0.0
23SUBSAMPLE_RATE = 1
24STAGE_FPS = 30.0
25
26# PHYSX
27USE_CUSTOM_PHYSX_FPS = False
28PHYSX_FPS = 60.0
29MIN_SIM_FPS = 30
30
31# Simulations can also be enabled/disabled at runtime
32DISABLE_SIMULATIONS = False
33
34# APP / RENDER
35LIMIT_APP_FPS = False
36APP_FPS = 120
37
38# Duration after which to clear subscribers and print the cached events
39SUBSCRIBER_WALL_TIME_LIMIT_SEC = 0.5
40PRINT_EVENTS = True
41
42
43def on_timeline_event(event: omni.timeline.TimelineEventType):
44 global wall_start_time
45 global timeline_sub
46 global timeline_events
47 elapsed_wall_time = time.time() - wall_start_time
48
49 # Cache only time advance events
50 if event.type == omni.timeline.TimelineEventType.CURRENT_TIME_TICKED.value:
51 event_name = omni.timeline.TimelineEventType(event.type).name
52 event_payload = event.payload
53 timeline_events.append((elapsed_wall_time, event_name, event_payload))
54
55 # Clear subscriber and print cached events
56 if elapsed_wall_time > SUBSCRIBER_WALL_TIME_LIMIT_SEC:
57 if timeline_sub is not None:
58 timeline_sub.unsubscribe()
59 timeline_sub = None
60 num_events = len(timeline_events)
61 fps = num_events / SUBSCRIBER_WALL_TIME_LIMIT_SEC
62 print(f"[timeline] captured {num_events} events with aprox {fps} FPS")
63 if PRINT_EVENTS:
64 for i, (wall_time, event_name, payload) in enumerate(timeline_events):
65 print(f"\t[timeline][{i}]\ttime={wall_time:.4f};\tevent={event_name};\tpayload={payload}")
66
67
68def on_physics_step(dt: float):
69 global wall_start_time
70 global physx_events
71 global physx_sub
72 elapsed_wall_time = time.time() - wall_start_time
73
74 # Cache physics events
75 physx_events.append((elapsed_wall_time, dt))
76
77 # Clear subscriber and print cached events
78 if elapsed_wall_time > SUBSCRIBER_WALL_TIME_LIMIT_SEC:
79 # Physics unsubscription needs to be defered from the callback function
80 # see: '[Error] [omni.physx.plugin] Subscription cannot be changed during the event call'
81 async def clear_physx_sub_async():
82 global physx_sub
83 if physx_sub is not None:
84 physx_sub.unsubscribe()
85 physx_sub = None
86
87 asyncio.ensure_future(clear_physx_sub_async())
88 num_events = len(physx_events)
89 fps = num_events / SUBSCRIBER_WALL_TIME_LIMIT_SEC
90 print(f"[physics] captured {num_events} events with aprox {fps} FPS")
91 if PRINT_EVENTS:
92 for i, (wall_time, dt) in enumerate(physx_events):
93 print(f"\t[physics][{i}]\ttime={wall_time:.4f};\tdt={dt};")
94
95
96def on_stage_render_event(event: carb.eventdispatcher.Event):
97 global wall_start_time
98 global stage_render_sub
99 global stage_render_events
100 elapsed_wall_time = time.time() - wall_start_time
101
102 event_name = event.event_name
103 event_payload = event.payload
104 stage_render_events.append((elapsed_wall_time, event_name, event_payload))
105
106 if elapsed_wall_time > SUBSCRIBER_WALL_TIME_LIMIT_SEC:
107 if stage_render_sub is not None:
108 stage_render_sub.reset()
109 stage_render_sub = None
110 num_events = len(stage_render_events)
111 fps = num_events / SUBSCRIBER_WALL_TIME_LIMIT_SEC
112 print(f"[stage render] captured {num_events} events with aprox {fps} FPS")
113 if PRINT_EVENTS:
114 for i, (wall_time, event_name, payload) in enumerate(stage_render_events):
115 print(f"\t[stage render][{i}]\ttime={wall_time:.4f};\tevent={event_name};\tpayload={payload}")
116
117
118def on_app_update(event: carb.eventdispatcher.Event):
119 global wall_start_time
120 global app_sub
121 global app_update_events
122 elapsed_wall_time = time.time() - wall_start_time
123
124 event_name = event.event_name
125 event_payload = event.payload
126 app_update_events.append((elapsed_wall_time, event_name, event_payload))
127
128 if elapsed_wall_time > SUBSCRIBER_WALL_TIME_LIMIT_SEC:
129 if app_sub is not None:
130 app_sub.reset()
131 app_sub = None
132 num_events = len(app_update_events)
133 fps = num_events / SUBSCRIBER_WALL_TIME_LIMIT_SEC
134 print(f"[app] captured {num_events} events with aprox {fps} FPS")
135 if PRINT_EVENTS:
136 for i, (wall_time, event_name, payload) in enumerate(app_update_events):
137 print(f"\t[app][{i}]\ttime={wall_time:.4f};\tevent={event_name};\tpayload={payload}")
138
139
140stage = omni.usd.get_context().get_stage()
141timeline = omni.timeline.get_timeline_interface()
142
143
144if USE_CUSTOM_TIMELINE_SETTINGS:
145 # Ideal to make simulation and animation synchronized.
146 # Default: True in editor, False in standalone.
147 # NOTE:
148 # - It may limit the frame rate (see 'timeline.set_play_every_frame') such that the elapsed wall clock time matches the frame's delta time.
149 # - If the app runs slower than this, animation playback may slow down (see 'CompensatePlayDelayInSecs').
150 # - For performance benchmarks, turn this off or set a very high target in `timeline.set_target_framerate`
151 carb.settings.get_settings().set("/app/player/useFixedTimeStepping", USE_FIXED_TIME_STEPPING)
152
153 # This compensates for frames that require more computation time than the frame's fixed delta time, by temporarily speeding up playback.
154 # The parameter represents the length of these "faster" playback periods, which means that it must be larger than the fixed frame time to take effect.
155 # Default: 0.0
156 # NOTE:
157 # - only effective if `useFixedTimeStepping` is set to True
158 # - setting a large value results in long fast playback after a huge lag spike
159 carb.settings.get_settings().set("/app/player/CompensatePlayDelayInSecs", PLAY_DELAY_COMPENSATION)
160
161 # If set to True, no frames are skipped and in every frame time advances by `1 / TimeCodesPerSecond`.
162 # Default: False
163 # NOTE:
164 # - only effective if `useFixedTimeStepping` is set to True
165 # - simulation is usually faster than real-time and processing is only limited by the frame rate of the runloop
166 # - useful for recording
167 # - same as `carb.settings.get_settings().set("/app/player/useFastMode", PLAY_EVERY_FRAME)`
168 timeline.set_play_every_frame(PLAY_EVERY_FRAME)
169
170 # Timeline sub-stepping, i.e. how many times updates are called (update events are dispatched) each frame.
171 # Default: 1
172 # NOTE: same as `carb.settings.get_settings().set("/app/player/timelineSubsampleRate", SUBSAMPLE_RATE)`
173 timeline.set_ticks_per_frame(SUBSAMPLE_RATE)
174
175 # Time codes per second for the stage
176 # NOTE: same as `stage.SetTimeCodesPerSecond(STAGE_FPS)` and `carb.settings.get_settings().set("/app/stage/timeCodesPerSecond", STAGE_FPS)`
177 timeline.set_time_codes_per_second(STAGE_FPS)
178
179
180# Create a PhysX scene to set the physics time step
181if USE_CUSTOM_PHYSX_FPS:
182 physx_scene = None
183 for prim in stage.Traverse():
184 if prim.IsA(UsdPhysics.Scene):
185 physx_scene = PhysxSchema.PhysxSceneAPI.Apply(prim)
186 break
187 if physx_scene is None:
188 physics_scene = UsdPhysics.Scene.Define(stage, "/PhysicsScene")
189 physx_scene = PhysxSchema.PhysxSceneAPI.Apply(stage.GetPrimAtPath("/PhysicsScene"))
190
191 # Time step for the physics simulation
192 # Default: 60.0
193 physx_scene.GetTimeStepsPerSecondAttr().Set(PHYSX_FPS)
194
195 # Minimum simulation frequency to prevent clamping; if the frame rate drops below this,
196 # physics steps are discarded to avoid app slowdown if the overall frame rate is too low.
197 # Default: 30.0
198 # NOTE: Matching `minFrameRate` with `TimeStepsPerSecond` ensures a single physics step per update.
199 carb.settings.get_settings().set("/persistent/simulation/minFrameRate", MIN_SIM_FPS)
200
201
202# Throttle Render/UI/Main thread update rate
203if LIMIT_APP_FPS:
204 # Enable rate limiting of the main run loop (UI, rendering, etc.)
205 # Default: False
206 carb.settings.get_settings().set("/app/runLoops/main/rateLimitEnabled", LIMIT_APP_FPS)
207
208 # FPS limit of the main run loop (UI, rendering, etc.)
209 # Default: 120
210 # NOTE: disabled if `/app/player/useFixedTimeStepping` is False
211 carb.settings.get_settings().set("/app/runLoops/main/rateLimitFrequency", int(APP_FPS))
212
213
214# Simulations can be selectively disabled (or toggled at specific times)
215if DISABLE_SIMULATIONS:
216 carb.settings.get_settings().set("/app/player/playSimulations", False)
217
218
219# Start the timeline
220timeline.set_current_time(0)
221timeline.set_end_time(SUBSCRIBER_WALL_TIME_LIMIT_SEC + 1)
222timeline.set_looping(False)
223timeline.play()
224timeline.commit()
225wall_start_time = time.time()
226
227# Subscribe and cache various events for a limited duration
228timeline_events = []
229timeline_sub = timeline.get_timeline_event_stream().create_subscription_to_pop(on_timeline_event)
230physx_events = []
231physx_sub = omni.physx.get_physx_interface().subscribe_physics_step_events(on_physics_step)
232stage_render_events = []
233stage_render_sub = carb.eventdispatcher.get_eventdispatcher().observe_event(
234 event_name=omni.usd.get_context().stage_rendering_event_name(omni.usd.StageRenderingEventType.NEW_FRAME, True),
235 on_event=on_stage_render_event,
236 observer_name="subscribers_and_events.on_stage_render_event",
237)
238app_update_events = []
239app_sub = carb.eventdispatcher.get_eventdispatcher().observe_event(
240 event_name=omni.kit.app.GLOBAL_EVENT_UPDATE,
241 on_event=on_app_update,
242 observer_name="subscribers_and_events.on_app_update",
243)
244
245# Run the application for a while to trigger the events, with a buffer to ensure subscribers have enough wall-clock time
246num_app_updates = int(math.ceil(SUBSCRIBER_WALL_TIME_LIMIT_SEC * STAGE_FPS * 4))
247for _ in range(num_app_updates):
248 simulation_app.update()
249
250print(f"Finished running the application for {num_app_updates} updates.")
251print(f"Wall time: {time.time() - wall_start_time:.4f} seconds")
252print(f"Number of timeline events: {len(timeline_events)}")
253print(f"Number of physics events: {len(physx_events)}")
254print(f"Number of stage render events: {len(stage_render_events)}")
255print(f"Number of app update events: {len(app_update_events)}")
256
257simulation_app.close()
Subscribers and Events at Custom FPS
1import asyncio
2import math
3import time
4
5import carb.eventdispatcher
6import carb.events
7import carb.settings
8import omni.kit.app
9import omni.physx
10import omni.timeline
11import omni.usd
12from pxr import PhysxSchema, UsdPhysics
13
14# TIMELINE / STAGE
15USE_CUSTOM_TIMELINE_SETTINGS = False
16USE_FIXED_TIME_STEPPING = False
17PLAY_EVERY_FRAME = True
18PLAY_DELAY_COMPENSATION = 0.0
19SUBSAMPLE_RATE = 1
20STAGE_FPS = 30.0
21
22# PHYSX
23USE_CUSTOM_PHYSX_FPS = False
24PHYSX_FPS = 60.0
25MIN_SIM_FPS = 30
26
27# Simulations can also be enabled/disabled at runtime
28DISABLE_SIMULATIONS = False
29
30# APP / RENDER
31LIMIT_APP_FPS = False
32APP_FPS = 120
33
34# Duration after which to clear subscribers and print the cached events
35SUBSCRIBER_WALL_TIME_LIMIT_SEC = 0.5
36PRINT_EVENTS = True
37
38
39async def example_async():
40 def on_timeline_event(event: omni.timeline.TimelineEventType):
41 nonlocal wall_start_time
42 nonlocal timeline_sub
43 nonlocal timeline_events
44 elapsed_wall_time = time.time() - wall_start_time
45
46 # Cache only time advance events
47 if event.type == omni.timeline.TimelineEventType.CURRENT_TIME_TICKED.value:
48 event_name = omni.timeline.TimelineEventType(event.type).name
49 event_payload = event.payload
50 timeline_events.append((elapsed_wall_time, event_name, event_payload))
51
52 # Clear subscriber and print cached events
53 if elapsed_wall_time > SUBSCRIBER_WALL_TIME_LIMIT_SEC:
54 if timeline_sub is not None:
55 timeline_sub.unsubscribe()
56 timeline_sub = None
57 num_events = len(timeline_events)
58 fps = num_events / SUBSCRIBER_WALL_TIME_LIMIT_SEC
59 print(f"[timeline] captured {num_events} events with aprox {fps} FPS")
60 if PRINT_EVENTS:
61 for i, (wall_time, event_name, payload) in enumerate(timeline_events):
62 print(
63 f"\t[timeline][{i}]\ttime={wall_time:.4f};\tevent={event_name};\tpayload={payload}"
64 )
65
66 def on_physics_step(dt: float):
67 nonlocal wall_start_time
68 nonlocal physx_events
69 nonlocal physx_sub
70 elapsed_wall_time = time.time() - wall_start_time
71
72 # Cache physics events
73 physx_events.append((elapsed_wall_time, dt))
74
75 # Clear subscriber and print cached events
76 if elapsed_wall_time > SUBSCRIBER_WALL_TIME_LIMIT_SEC:
77 # Physics unsubscription needs to be deferred from the callback function
78 # see: '[Error] [omni.physx.plugin] Subscription cannot be changed during the event call'
79 async def clear_physx_sub_async():
80 nonlocal physx_sub
81 if physx_sub is not None:
82 physx_sub.unsubscribe()
83 physx_sub = None
84
85 asyncio.ensure_future(clear_physx_sub_async())
86 num_events = len(physx_events)
87 fps = num_events / SUBSCRIBER_WALL_TIME_LIMIT_SEC
88 print(f"[physics] captured {num_events} events with aprox {fps} FPS")
89 if PRINT_EVENTS:
90 for i, (wall_time, dt) in enumerate(physx_events):
91 print(f"\t[physics][{i}]\ttime={wall_time:.4f};\tdt={dt};")
92
93 def on_stage_render_event(event: carb.eventdispatcher.Event):
94 nonlocal wall_start_time
95 nonlocal stage_render_sub
96 nonlocal stage_render_events
97 elapsed_wall_time = time.time() - wall_start_time
98
99 event_name = event.event_name
100 event_payload = event.payload
101 stage_render_events.append((elapsed_wall_time, event_name, event_payload))
102
103 if elapsed_wall_time > SUBSCRIBER_WALL_TIME_LIMIT_SEC:
104 if stage_render_sub is not None:
105 stage_render_sub.reset
106 stage_render_sub = None
107 num_events = len(stage_render_events)
108 fps = num_events / SUBSCRIBER_WALL_TIME_LIMIT_SEC
109 print(f"[stage render] captured {num_events} events with aprox {fps} FPS")
110 if PRINT_EVENTS:
111 for i, (wall_time, event_name, payload) in enumerate(
112 stage_render_events
113 ):
114 print(
115 f"\t[stage render][{i}]\ttime={wall_time:.4f};\tevent={event_name};\tpayload={payload}"
116 )
117
118 def on_app_update(event: carb.eventdispatcher.Event):
119 nonlocal wall_start_time
120 nonlocal app_sub
121 nonlocal app_update_events
122 elapsed_wall_time = time.time() - wall_start_time
123
124 # Cache app update events
125 event_name = event.event_name
126 event_payload = event.payload
127 app_update_events.append((elapsed_wall_time, event_name, event_payload))
128
129 if elapsed_wall_time > SUBSCRIBER_WALL_TIME_LIMIT_SEC:
130 if app_sub is not None:
131 app_sub.reset()
132 app_sub = None
133 num_events = len(app_update_events)
134 fps = num_events / SUBSCRIBER_WALL_TIME_LIMIT_SEC
135 print(f"[app] captured {num_events} events with aprox {fps} FPS")
136 if PRINT_EVENTS:
137 for i, (wall_time, event_name, payload) in enumerate(app_update_events):
138 print(
139 f"\t[app][{i}]\ttime={wall_time:.4f};\tevent={event_name};\tpayload={payload}"
140 )
141
142 stage = omni.usd.get_context().get_stage()
143 timeline = omni.timeline.get_timeline_interface()
144
145 if USE_CUSTOM_TIMELINE_SETTINGS:
146 # Ideal to make simulation and animation synchronized.
147 # Default: True in editor, False in standalone.
148 # NOTE:
149 # - It may limit the frame rate (see 'timeline.set_play_every_frame') such that the elapsed wall clock time matches the frame's delta time.
150 # - If the app runs slower than this, animation playback may slow down (see 'CompensatePlayDelayInSecs').
151 # - For performance benchmarks, turn this off or set a very high target in `timeline.set_target_framerate`
152 carb.settings.get_settings().set(
153 "/app/player/useFixedTimeStepping", USE_FIXED_TIME_STEPPING
154 )
155
156 # This compensates for frames that require more computation time than the frame's fixed delta time, by temporarily speeding up playback.
157 # The parameter represents the length of these "faster" playback periods, which means that it must be larger than the fixed frame time to take effect.
158 # Default: 0.0
159 # NOTE:
160 # - only effective if `useFixedTimeStepping` is set to True
161 # - setting a large value results in long fast playback after a huge lag spike
162 carb.settings.get_settings().set(
163 "/app/player/CompensatePlayDelayInSecs", PLAY_DELAY_COMPENSATION
164 )
165
166 # If set to True, no frames are skipped and in every frame time advances by `1 / TimeCodesPerSecond`.
167 # Default: False
168 # NOTE:
169 # - only effective if `useFixedTimeStepping` is set to True
170 # - simulation is usually faster than real-time and processing is only limited by the frame rate of the runloop
171 # - useful for recording
172 # - same as `carb.settings.get_settings().set("/app/player/useFastMode", PLAY_EVERY_FRAME)`
173 timeline.set_play_every_frame(PLAY_EVERY_FRAME)
174
175 # Timeline sub-stepping, i.e. how many times updates are called (update events are dispatched) each frame.
176 # Default: 1
177 # NOTE: same as `carb.settings.get_settings().set("/app/player/timelineSubsampleRate", SUBSAMPLE_RATE)`
178 timeline.set_ticks_per_frame(SUBSAMPLE_RATE)
179
180 # Time codes per second for the stage
181 # NOTE: same as `stage.SetTimeCodesPerSecond(STAGE_FPS)` and `carb.settings.get_settings().set("/app/stage/timeCodesPerSecond", STAGE_FPS)`
182 timeline.set_time_codes_per_second(STAGE_FPS)
183
184 # Create a PhysX scene to set the physics time step
185 if USE_CUSTOM_PHYSX_FPS:
186 physx_scene = None
187 for prim in stage.Traverse():
188 if prim.IsA(UsdPhysics.Scene):
189 physx_scene = PhysxSchema.PhysxSceneAPI.Apply(prim)
190 break
191 if physx_scene is None:
192 physics_scene = UsdPhysics.Scene.Define(stage, "/PhysicsScene")
193 physx_scene = PhysxSchema.PhysxSceneAPI.Apply(
194 stage.GetPrimAtPath("/PhysicsScene")
195 )
196
197 # Time step for the physics simulation
198 # Default: 60.0
199 physx_scene.GetTimeStepsPerSecondAttr().Set(PHYSX_FPS)
200
201 # Minimum simulation frequency to prevent clamping; if the frame rate drops below this,
202 # physics steps are discarded to avoid app slowdown if the overall frame rate is too low.
203 # Default: 30.0
204 # NOTE: Matching `minFrameRate` with `TimeStepsPerSecond` ensures a single physics step per update.
205 carb.settings.get_settings().set(
206 "/persistent/simulation/minFrameRate", MIN_SIM_FPS
207 )
208
209 # Throttle Render/UI/Main thread update rate
210 if LIMIT_APP_FPS:
211 # Enable rate limiting of the main run loop (UI, rendering, etc.)
212 # Default: False
213 carb.settings.get_settings().set(
214 "/app/runLoops/main/rateLimitEnabled", LIMIT_APP_FPS
215 )
216
217 # FPS limit of the main run loop (UI, rendering, etc.)
218 # Default: 120
219 # NOTE: disabled if `/app/player/useFixedTimeStepping` is False
220 carb.settings.get_settings().set(
221 "/app/runLoops/main/rateLimitFrequency", int(APP_FPS)
222 )
223
224 # Simulations can be selectively disabled (or toggled at specific times)
225 if DISABLE_SIMULATIONS:
226 carb.settings.get_settings().set("/app/player/playSimulations", False)
227
228 # Start the timeline
229 timeline.set_current_time(0)
230 timeline.set_end_time(SUBSCRIBER_WALL_TIME_LIMIT_SEC + 1)
231 timeline.set_looping(False)
232 timeline.play()
233 timeline.commit()
234 wall_start_time = time.time()
235
236 # Subscribe and cache various events for a limited duration
237 timeline_events = []
238 timeline_sub = timeline.get_timeline_event_stream().create_subscription_to_pop(
239 on_timeline_event
240 )
241 physx_events = []
242 physx_sub = omni.physx.get_physx_interface().subscribe_physics_step_events(
243 on_physics_step
244 )
245 stage_render_events = []
246 stage_render_sub = carb.eventdispatcher.get_eventdispatcher().observe_event(
247 event_name=omni.usd.get_context().stage_rendering_event_name(
248 omni.usd.StageRenderingEventType.NEW_FRAME, True
249 ),
250 on_event=on_stage_render_event,
251 observer_name="subscribers_and_events.on_stage_render_event",
252 )
253
254 app_update_events = []
255 app_sub = carb.eventdispatcher.get_eventdispatcher().observe_event(
256 event_name=omni.kit.app.GLOBAL_EVENT_UPDATE,
257 on_event=on_app_update,
258 observer_name="subscribers_and_events.on_app_update",
259 )
260
261 # Run the application for a while to trigger the events, with a buffer to ensure subscribers have enough wall-clock time
262 num_app_updates = int(math.ceil(SUBSCRIBER_WALL_TIME_LIMIT_SEC * STAGE_FPS * 4))
263 for _ in range(num_app_updates):
264 await omni.kit.app.get_app().next_update_async()
265
266 print(f"Finished running the application for {num_app_updates} updates.")
267 print(f"Wall time: {time.time() - wall_start_time:.4f} seconds")
268 print(f"Number of timeline events: {len(timeline_events)}")
269 print(f"Number of physics events: {len(physx_events)}")
270 print(f"Number of stage render events: {len(stage_render_events)}")
271 print(f"Number of app update events: {len(app_update_events)}")
272
273 # Cleanup
274 timeline.stop()
275 if app_sub is not None:
276 app_sub.reset()
277 app_sub = None
278 if stage_render_sub is not None:
279 stage_render_sub.reset()
280 stage_render_sub = None
281 if physx_sub is not None:
282 physx_sub.unsubscribe()
283 physx_sub = None
284 if timeline_sub is not None:
285 timeline_sub.unsubscribe()
286 timeline_sub = None
287
288
289asyncio.ensure_future(example_async())
Accessing Writer and Annotator Data at Custom FPS#
Example of how to trigger a writer and access annotator data at a custom FPS, with product rendering disabled when the data is not needed. The standalone example can also be run directly (on Windows use python.bat
instead of python.sh
):
./python.sh standalone_examples/api/isaacsim.replicator.examples/custom_fps_writer_annotator.py
Note
It is currently not possible to change timeline (stage) FPS after the replicator graph creation as it causes a graph reset. This issue is being addressed. As a workaround make sure you are setting the timeline (stage) parameters before creating the replicator graph.
Accessing Writer and Annotator Data at Custom FPS
1from isaacsim import SimulationApp
2
3simulation_app = SimulationApp({"headless": False})
4
5import os
6
7import carb.settings
8import omni.kit.app
9import omni.replicator.core as rep
10import omni.timeline
11import omni.usd
12
13# NOTE: To avoid FPS delta misses make sure the sensor framerate is divisible by the timeline framerate
14STAGE_FPS = 100.0
15SENSOR_FPS = 10.0
16SENSOR_DT = 1.0 / SENSOR_FPS
17
18
19def run_custom_fps_example(duration_seconds):
20 # Create a new stage
21 omni.usd.get_context().new_stage()
22
23 # Disable capture on play (data will only be accessed at custom times)
24 carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False)
25
26 # Make sure fixed time stepping is set (the timeline will be advanced with the same delta time)
27 carb.settings.get_settings().set("/app/player/useFixedTimeStepping", True)
28
29 # Set the timeline parameters
30 timeline = omni.timeline.get_timeline_interface()
31 timeline.set_looping(False)
32 timeline.set_current_time(0.0)
33 timeline.set_end_time(10)
34 timeline.set_time_codes_per_second(STAGE_FPS)
35 timeline.play()
36 timeline.commit()
37
38 # Create a light and a semantically annoated cube
39 rep.create.light()
40 rep.create.cube(semantics=[("class", "cube")])
41
42 # Create a render product and disable it (it will re-enabled when data is needed)
43 rp = rep.create.render_product("/OmniverseKit_Persp", (512, 512), name="rp")
44 rp.hydra_texture.set_updates_enabled(False)
45
46 # Create a writer and an annotator as different ways to access the data
47 out_dir_rgb = os.path.join(os.getcwd(), "_out_writer_fps_rgb")
48 print(f"Writer data will be written to: {out_dir_rgb}")
49 writer_rgb = rep.WriterRegistry.get("BasicWriter")
50 writer_rgb.initialize(output_dir=out_dir_rgb, rgb=True)
51 writer_rgb.attach(rp)
52 annot_depth = rep.AnnotatorRegistry.get_annotator("distance_to_camera")
53 annot_depth.attach(rp)
54
55 # Run the simulation for the given number of frames and access the data at the desired framerates
56 written_frames = 0
57 previous_time = timeline.get_current_time()
58 elapsed_time = 0.0
59 loop_iteration_count = 0
60 while timeline.get_current_time() < duration_seconds:
61 current_time = timeline.get_current_time()
62 delta_time = current_time - previous_time
63 elapsed_time += delta_time
64 print(
65 f"[{loop_iteration_count}] current_time={current_time:.4f}; delta_time={delta_time:.4f}; elapsed_time={elapsed_time:.4f}/{SENSOR_DT:.4f};"
66 )
67
68 # Check if enough time has passed to trigger the sensor
69 if elapsed_time >= SENSOR_DT:
70 # Reset the elapsed time with the difference to the optimal trigger time (when the timeline fps is not divisible by the sensor framerate)
71 elapsed_time = elapsed_time - SENSOR_DT
72
73 # Enable render products for data access
74 rp.hydra_texture.set_updates_enabled(True)
75
76 # Step needs to be called after scheduling the write
77 rep.orchestrator.step(delta_time=0.0, pause_timeline=False)
78
79 # After step, the annotator data is available and in sync with the stage
80 annot_data = annot_depth.get_data()
81
82 # Count the number of frames written
83 print(f"\t Writing frame {written_frames}; annotator data shape={annot_data.shape};")
84 written_frames += 1
85
86 # Disable render products to avoid unnecessary rendering
87 rp.hydra_texture.set_updates_enabled(False)
88
89 previous_time = current_time
90 # Advance the app (timeline) by one frame
91 simulation_app.update()
92
93 # Make sure the writer finishes writing the data to disk
94 rep.orchestrator.wait_until_complete()
95
96
97# Run the example.
98# NOTE: The simulation duration is calculated to ensure 'target_num_writes' are captured.
99# It includes the time for all sensor intervals plus a small buffer of a few stage frames.
100target_num_writes = 6
101duration_for_target_writes = (target_num_writes * SENSOR_DT) + (5.0 / STAGE_FPS)
102run_custom_fps_example(duration_seconds=duration_for_target_writes)
103
104# Close the application
105simulation_app.close()
Accessing Writer and Annotator Data at Custom FPS
1import asyncio
2import os
3
4import carb.settings
5import omni.kit.app
6import omni.replicator.core as rep
7import omni.timeline
8import omni.usd
9
10# NOTE: To avoid FPS delta misses make sure the sensor framerate is divisible by the timeline framerate
11STAGE_FPS = 100.0
12SENSOR_FPS = 10.0
13SENSOR_DT = 1.0 / SENSOR_FPS
14
15async def run_custom_fps_example_async(duration_seconds):
16 # Create a new stage
17 await omni.usd.get_context().new_stage_async()
18
19 # Disable capture on play (data will only be accessed at custom times)
20 carb.settings.get_settings().set("/omni/replicator/captureOnPlay", False)
21
22 # Make sure fixed time stepping is set (the timeline will be advanced with the same delta time)
23 carb.settings.get_settings().set("/app/player/useFixedTimeStepping", True)
24
25 # Set the timeline parameters
26 timeline = omni.timeline.get_timeline_interface()
27 timeline.set_looping(False)
28 timeline.set_current_time(0.0)
29 timeline.set_end_time(10)
30 timeline.set_time_codes_per_second(STAGE_FPS)
31 timeline.play()
32 timeline.commit()
33
34 # Create a light and a semantically annotated cube
35 rep.create.light()
36 rep.create.cube(semantics=[("class", "cube")])
37
38 # Create a render product and disable it (it will re-enabled when data is needed)
39 rp = rep.create.render_product("/OmniverseKit_Persp", (512, 512), name="rp")
40 rp.hydra_texture.set_updates_enabled(False)
41
42 # Create a writer and an annotator as different ways to access the data
43 out_dir_rgb = os.path.join(os.getcwd(), "_out_writer_fps_rgb")
44 print(f"Writer data will be written to: {out_dir_rgb}")
45 writer_rgb = rep.WriterRegistry.get("BasicWriter")
46 writer_rgb.initialize(output_dir=out_dir_rgb, rgb=True)
47 writer_rgb.attach(rp)
48 annot_depth = rep.AnnotatorRegistry.get_annotator("distance_to_camera")
49 annot_depth.attach(rp)
50
51 # Run the simulation for the given number of frames and access the data at the desired framerates
52 written_frames = 0
53 previous_time = timeline.get_current_time()
54 elapsed_time = 0.0
55 loop_iteration_count = 0
56 while timeline.get_current_time() < duration_seconds:
57 current_time = timeline.get_current_time()
58 delta_time = current_time - previous_time
59 elapsed_time += delta_time
60 print(
61 f"[{loop_iteration_count}] current_time={current_time:.4f}; delta_time={delta_time:.4f}; elapsed_time={elapsed_time:.4f}/{SENSOR_DT:.4f};"
62 )
63
64 # Check if enough time has passed to trigger the sensor
65 if elapsed_time >= SENSOR_DT:
66 # Reset the elapsed time with the difference to the optimal trigger time (when the timeline fps is not divisible by the sensor framerate)
67 elapsed_time = elapsed_time - SENSOR_DT
68
69 # Enable render products for data access
70 rp.hydra_texture.set_updates_enabled(True)
71
72 # Step needs to be called after scheduling the write
73 await rep.orchestrator.step_async(delta_time=0.0, pause_timeline=False)
74
75 # After step, the annotator data is available and in sync with the stage
76 annot_data = annot_depth.get_data()
77
78 # Count the number of frames written
79 print(f"\t Writing frame {written_frames}; annotator data shape={annot_data.shape};")
80 written_frames += 1
81
82 # Disable render products to avoid unnecessary rendering
83 rp.hydra_texture.set_updates_enabled(False)
84
85 previous_time = current_time
86 # Advance the app (timeline) by one frame
87 await omni.kit.app.get_app().next_update_async()
88 loop_iteration_count += 1
89
90 # Make sure the writer finishes writing the data to disk
91 await rep.orchestrator.wait_until_complete_async()
92
93# Run the example.
94# NOTE: The simulation duration is calculated to ensure 'target_num_writes' are captured.
95# It includes the time for all sensor intervals plus a small buffer of a few stage frames.
96target_num_writes = 6
97duration_for_target_writes = (target_num_writes * SENSOR_DT) + (5.0 / STAGE_FPS)
98await run_custom_fps_example_async(duration_seconds=duration_for_target_writes)
Cosmos Writer Example#
This example demonstrates how to use the CosmosWriter
for synthetic data generation. The CosmosWriter generates specialized data for NVIDIA’s Cosmos platform, producing key modalities including RGB, shaded instance segmentation, instance segmentation, depth, and edge maps. These visual features help Cosmos models better distinguish object boundaries and understand 3D form, enabling higher-quality synthetic data generation. For more information about using Cosmos for Synthetic Dataset Augmentation, refer to the detailed case study. The standalone example can also be run directly (on Windows use python.bat
instead of python.sh
):
./python.sh standalone_examples/api/isaacsim.replicator.examples/cosmos_writer_warehouse.py
Cosmos Writer Example
1from isaacsim import SimulationApp
2
3simulation_app = SimulationApp(launch_config={"headless": False})
4
5import os
6
7import carb
8import omni.replicator.core as rep
9import omni.timeline
10import omni.usd
11from isaacsim.core.utils.stage import add_reference_to_stage
12from isaacsim.storage.native import get_assets_root_path
13from pxr import UsdGeom
14
15STAGE_URL = "/Isaac/Samples/Replicator/Stage/full_warehouse_worker_and_anim_cameras.usd"
16CARTER_NAV_ASSET_URL = "/Isaac/Samples/Replicator/OmniGraph/nova_carter_nav_only.usd"
17CARTER_NAV_PATH = "/NavWorld/CarterNav"
18CARTER_CHASSIS_PATH = f"{CARTER_NAV_PATH}/chassis_link"
19CARTER_NAV_TARGET_PATH = f"{CARTER_NAV_PATH}/targetXform"
20CARTER_CAMERA_PATH = f"{CARTER_NAV_PATH}/chassis_link/sensors/front_hawk/left/camera_left"
21CARTER_NAV_POSITION = (-6, 4, 0)
22CARTER_NAV_TARGET_POSITION = (3, 3, 0)
23
24
25def advance_timeline_by_duration(duration: float, max_updates: int = 1000):
26 timeline = omni.timeline.get_timeline_interface()
27 current_time = timeline.get_current_time()
28 target_time = current_time + duration
29
30 if timeline.get_end_time() < target_time:
31 timeline.set_end_time(1000000)
32
33 if not timeline.is_playing():
34 timeline.play()
35
36 print(f"Advancing timeline from {current_time:.4f}s to {target_time:.4f}s")
37 step_count = 0
38 while current_time < target_time:
39 if step_count >= max_updates:
40 print(f"Max updates reached: {step_count}, finishing timeline advance.")
41 break
42
43 prev_time = current_time
44 simulation_app.update()
45 current_time = timeline.get_current_time()
46 step_count += 1
47
48 if step_count % 10 == 0:
49 print(f"\tStep {step_count}, {current_time:.4f}s/{target_time:.4f}s")
50
51 if current_time <= prev_time:
52 print(f"Warning: Timeline did not advance at update {step_count} (time: {current_time:.4f}s).")
53 print(f"Finished advancing timeline to {timeline.get_end_time():.4f}s in {step_count} steps")
54
55
56def run_sdg_pipeline(camera_path, num_frames, capture_interval, use_instance_id=True, segmentation_mapping=None):
57 rp = rep.create.render_product(camera_path, (1280, 720))
58 cosmos_writer = rep.WriterRegistry.get("CosmosWriter")
59 backend = rep.backends.get("DiskBackend")
60 out_dir = os.path.join(os.getcwd(), f"_out_cosmos")
61 print(f"output_directory: {out_dir}")
62 backend.initialize(output_dir=out_dir)
63 cosmos_writer.initialize(
64 backend=backend, use_instance_id=use_instance_id, segmentation_mapping=segmentation_mapping
65 )
66 cosmos_writer.attach(rp)
67
68 # Make sure the timeline is playing
69 timeline = omni.timeline.get_timeline_interface()
70 if not timeline.is_playing():
71 timeline.play()
72
73 print(f"Starting SDG pipeline. Capturing {num_frames} frames, every {capture_interval} simulation step(s).")
74 frames_captured_count = 0
75 simulation_step_index = 0
76 while frames_captured_count < num_frames:
77 print(f"Simulation step {simulation_step_index}")
78 if simulation_step_index % capture_interval == 0:
79 print(f"\t Capturing frame {frames_captured_count + 1}/{num_frames}")
80 rep.orchestrator.step(pause_timeline=False)
81 frames_captured_count += 1
82 else:
83 simulation_app.update()
84 simulation_step_index += 1
85
86 print("Waiting to finish processing and writing the data")
87 rep.orchestrator.wait_until_complete()
88 print(f"Finished SDG pipeline. Captured {frames_captured_count} frames")
89 cosmos_writer.detach()
90 rp.destroy()
91 timeline.pause()
92
93
94def run_example(num_frames, capture_interval=1, start_delay=None, use_instance_id=True, segmentation_mapping=None):
95 assets_root_path = get_assets_root_path()
96 stage_path = assets_root_path + STAGE_URL
97 print(f"Opening stage: '{stage_path}'")
98 omni.usd.get_context().open_stage(stage_path)
99 stage = omni.usd.get_context().get_stage()
100
101 # Enable script nodes
102 carb.settings.get_settings().set_bool("/app/omni.graph.scriptnode/opt_in", True)
103
104 # Disable capture on play on the new stage, data is captured manually using the step function
105 rep.orchestrator.set_capture_on_play(False)
106
107 # Load carter nova asset with its navigation graph
108 carter_url_path = assets_root_path + CARTER_NAV_ASSET_URL
109 print(f"Loading carter nova asset: '{carter_url_path}' at prim path: '{CARTER_NAV_PATH}'")
110 carter_nav_prim = add_reference_to_stage(usd_path=carter_url_path, prim_path=CARTER_NAV_PATH)
111 if not carter_nav_prim.GetAttribute("xformOp:translate"):
112 UsdGeom.Xformable(carter_nav_prim).AddTranslateOp()
113 carter_nav_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_POSITION)
114
115 # Set the navigation target position
116 carter_navigation_target_prim = stage.GetPrimAtPath(CARTER_NAV_TARGET_PATH)
117 if not carter_navigation_target_prim.IsValid():
118 print(f"Carter navigation target prim not found at path: {CARTER_NAV_TARGET_PATH}, exiting")
119 return
120 if not carter_navigation_target_prim.GetAttribute("xformOp:translate"):
121 UsdGeom.Xformable(carter_navigation_target_prim).AddTranslateOp()
122 carter_navigation_target_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_TARGET_POSITION)
123
124 # Use the carter nova front hawk camera for capturing data
125 camera_prim = stage.GetPrimAtPath(CARTER_CAMERA_PATH)
126 if not camera_prim.IsValid():
127 print(f"Camera prim not found at path: {CARTER_CAMERA_PATH}, exiting")
128 return
129
130 # Advance the timeline with the start delay if provided
131 if start_delay is not None and start_delay > 0:
132 advance_timeline_by_duration(start_delay)
133
134 # Run the SDG pipeline
135 run_sdg_pipeline(camera_prim.GetPath(), num_frames, capture_interval, use_instance_id, segmentation_mapping)
136
137
138NUM_FRAMES = 120
139CAPTURE_INTERVAL = 4
140START_DELAY = 1.0
141run_example(num_frames=NUM_FRAMES, capture_interval=CAPTURE_INTERVAL, start_delay=START_DELAY, use_instance_id=True)
Cosmos Writer Example
import asyncio
import os
import carb
import omni.replicator.core as rep
import omni.timeline
import omni.usd
from isaacsim.core.utils.stage import add_reference_to_stage
from isaacsim.storage.native import get_assets_root_path_async
from pxr import UsdGeom
STAGE_URL = "/Isaac/Samples/Replicator/Stage/full_warehouse_worker_and_anim_cameras.usd"
CARTER_NAV_ASSET_URL = "/Isaac/Samples/Replicator/OmniGraph/nova_carter_nav_only.usd"
CARTER_NAV_PATH = "/NavWorld/CarterNav"
CARTER_CHASSIS_PATH = f"{CARTER_NAV_PATH}/chassis_link"
CARTER_NAV_TARGET_PATH = f"{CARTER_NAV_PATH}/targetXform"
CARTER_CAMERA_PATH = f"{CARTER_NAV_PATH}/chassis_link/sensors/front_hawk/left/camera_left"
CARTER_NAV_POSITION = (-6, 4, 0)
CARTER_NAV_TARGET_POSITION = (3, 3, 0)
async def advance_timeline_by_duration(duration: float, max_updates: int = 1000):
timeline = omni.timeline.get_timeline_interface()
current_time = timeline.get_current_time()
target_time = current_time + duration
if timeline.get_end_time() < target_time:
timeline.set_end_time(1000000)
if not timeline.is_playing():
timeline.play()
print(f"Advancing timeline from {current_time:.4f}s to {target_time:.4f}s")
step_count = 0
while current_time < target_time:
if step_count >= max_updates:
print(f"Max updates reached: {step_count}, finishing timeline advance.")
break
prev_time = current_time
await omni.kit.app.get_app().next_update_async()
current_time = timeline.get_current_time()
step_count += 1
if step_count % 10 == 0:
print(f"\tStep {step_count}, {current_time:.4f}s/{target_time:.4f}s")
if current_time <= prev_time:
print(f"Warning: Timeline did not advance at update {step_count} (time: {current_time:.4f}s).")
print(f"Finished advancing timeline to {timeline.get_end_time():.4f}s in {step_count} steps")
async def run_sdg_pipeline(camera_path, num_frames, capture_interval, use_instance_id=True, segmentation_mapping=None):
rp = rep.create.render_product(camera_path, (1280, 720))
cosmos_writer = rep.WriterRegistry.get("CosmosWriter")
backend = rep.backends.get("DiskBackend")
out_dir = os.path.join(os.getcwd(), f"_out_cosmos")
print(f"output_directory: {out_dir}")
backend.initialize(output_dir=out_dir)
cosmos_writer.initialize(
backend=backend, use_instance_id=use_instance_id, segmentation_mapping=segmentation_mapping
)
cosmos_writer.attach(rp)
# Make sure the timeline is playing
timeline = omni.timeline.get_timeline_interface()
if not timeline.is_playing():
timeline.play()
print(f"Starting SDG pipeline. Capturing {num_frames} frames, every {capture_interval} simulation step(s).")
frames_captured_count = 0
simulation_step_index = 0
while frames_captured_count < num_frames:
print(f"Simulation step {simulation_step_index}")
if simulation_step_index % capture_interval == 0:
print(f"\t Capturing frame {frames_captured_count + 1}/{num_frames}")
await rep.orchestrator.step_async(pause_timeline=False)
frames_captured_count += 1
else:
await omni.kit.app.get_app().next_update_async()
simulation_step_index += 1
print("Waiting to finish processing and writing the data")
await rep.orchestrator.wait_until_complete_async()
print(f"Finished SDG pipeline. Captured {frames_captured_count} frames")
cosmos_writer.detach()
rp.destroy()
timeline.pause()
async def run_example_async(
num_frames, capture_interval=1, start_delay=None, use_instance_id=True, segmentation_mapping=None
):
assets_root_path = await get_assets_root_path_async()
stage_path = assets_root_path + STAGE_URL
print(f"Opening stage: '{stage_path}'")
await omni.usd.get_context().open_stage_async(stage_path)
stage = omni.usd.get_context().get_stage()
# Enable script nodes
carb.settings.get_settings().set_bool("/app/omni.graph.scriptnode/opt_in", True)
# Disable capture on play on the new stage, data is captured manually using the step function
rep.orchestrator.set_capture_on_play(False)
# Load carter nova asset with its navigation graph
carter_url_path = assets_root_path + CARTER_NAV_ASSET_URL
print(f"Loading carter nova asset: '{carter_url_path}' at prim path: '{CARTER_NAV_PATH}'")
carter_nav_prim = add_reference_to_stage(usd_path=carter_url_path, prim_path=CARTER_NAV_PATH)
if not carter_nav_prim.GetAttribute("xformOp:translate"):
UsdGeom.Xformable(carter_nav_prim).AddTranslateOp()
carter_nav_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_POSITION)
# Set the navigation target position
carter_navigation_target_prim = stage.GetPrimAtPath(CARTER_NAV_TARGET_PATH)
if not carter_navigation_target_prim.IsValid():
print(f"Carter navigation target prim not found at path: {CARTER_NAV_TARGET_PATH}, exiting")
return
if not carter_navigation_target_prim.GetAttribute("xformOp:translate"):
UsdGeom.Xformable(carter_navigation_target_prim).AddTranslateOp()
carter_navigation_target_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_TARGET_POSITION)
# Use the carter nova front hawk camera for capturing data
camera_prim = stage.GetPrimAtPath(CARTER_CAMERA_PATH)
if not camera_prim.IsValid():
print(f"Camera prim not found at path: {CARTER_CAMERA_PATH}, exiting")
return
# Advance the timeline with the start delay if provided
if start_delay is not None and start_delay > 0:
await advance_timeline_by_duration(start_delay)
# Run the SDG pipeline
await run_sdg_pipeline(camera_prim.GetPath(), num_frames, capture_interval, use_instance_id, segmentation_mapping)
NUM_FRAMES = 120
CAPTURE_INTERVAL = 4
START_DELAY = 1.0
asyncio.ensure_future(
run_example_async(num_frames=NUM_FRAMES, capture_interval=CAPTURE_INTERVAL, start_delay=START_DELAY, use_instance_id=True)
)