Cosmos Synthetic Data Generation#
This tutorial demonstrates generating multi-modal synthetic data for NVIDIA Cosmos using the CosmosWriter
in Isaac Sim. The writer captures synchronized RGB, depth, segmentation, and edge data from a robot navigating a warehouse environment.
The generated data serves as ground truth input for Cosmos Transfer, which transforms low-resolution control signals into high-quality visual simulations through its Multi-ControlNet architecture.

Prerequisites#
Familiarity with the omni.replicator extension and its writers
Basic understanding of Isaac Sim’s SDG Getting Started Scripts
Running simulations as Standalone Applications or via the Script Editor.
What the CosmosWriter Generates#
The writer outputs five synchronized modalities from the robot’s camera:
RGB - Color imagery (vis control)
Depth - Distance-to-camera for spatial understanding
Segmentation - Instance masks for object tracking
Shaded Segmentation - Instance masks with realistic shading
Edges - Canny edge detection for boundaries
These modalities correspond to Cosmos Transfer’s control branches:
vis: Uses RGB imagery with bilateral blurring
edge: Applies Canny edge detection (tunable thresholds)
depth: Depth maps for 3D structure understanding
seg: Segmentation masks for object identification
Each control branch can be weighted (0.0-1.0) to balance adherence vs. creative freedom in the generated output.
Implementation#
This example demonstrates a Carter Nova robot autonomously navigating through a warehouse environment. As the robot moves from its starting position to a target location, the CosmosWriter
captures synchronized multi-modal data (RGB, depth, segmentation, shaded segmentation, and edges) from the robot’s front camera. The captured data is organized into clips, with each clip containing a sequence of frames that can be used as input for Cosmos Transfer.
The example can be run as a standalone application using the following commands in the terminal (on Windows use python.bat
instead of python.sh
):
./python.sh standalone_examples/api/isaacsim.replicator.examples/cosmos_writer_warehouse.py
from isaacsim import SimulationApp
simulation_app = SimulationApp(launch_config={"headless": False})
import os
import carb
import omni.replicator.core as rep
import omni.timeline
import omni.usd
from isaacsim.core.utils.stage import add_reference_to_stage
from isaacsim.storage.native import get_assets_root_path
from pxr import UsdGeom
# Capture parameters
START_DELAY = 0.1 # Timeline duration delay before capturing the first clip
NUM_CLIPS = 2 # Number of video clips to capture with the CosmosWriter
NUM_FRAMES_PER_CLIP = 10 # Number of frames for each clip
CAPTURE_INTERVAL = 2 # Capture interval between frames (capture every N simulation steps)
# Stage and asset paths
STAGE_URL = "/Isaac/Samples/Replicator/Stage/full_warehouse_worker_and_anim_cameras.usd"
CARTER_NAV_ASSET_URL = "/Isaac/Samples/Replicator/OmniGraph/nova_carter_nav_only.usd"
CARTER_NAV_PATH = "/NavWorld/CarterNav"
CARTER_NAV_TARGET_PATH = f"{CARTER_NAV_PATH}/targetXform"
CARTER_CAMERA_PATH = f"{CARTER_NAV_PATH}/chassis_link/sensors/front_hawk/left/camera_left"
CARTER_NAV_POSITION = (-6, 4, 0)
CARTER_NAV_TARGET_POSITION = (3, 3, 0)
def advance_timeline_by_duration(duration: float, max_updates: int = 1000):
timeline = omni.timeline.get_timeline_interface()
current_time = timeline.get_current_time()
target_time = current_time + duration
if timeline.get_end_time() < target_time:
timeline.set_end_time(1000000)
if not timeline.is_playing():
timeline.play()
print(f"Advancing timeline from {current_time:.4f}s to {target_time:.4f}s")
step_count = 0
while current_time < target_time:
if step_count >= max_updates:
print(f"Max updates reached: {step_count}, finishing timeline advance.")
break
prev_time = current_time
simulation_app.update()
current_time = timeline.get_current_time()
step_count += 1
if step_count % 10 == 0:
print(f"\tStep {step_count}, {current_time:.4f}s/{target_time:.4f}s")
if current_time <= prev_time:
print(f"Warning: Timeline did not advance at update {step_count} (time: {current_time:.4f}s).")
print(f"Finished advancing timeline to {timeline.get_end_time():.4f}s in {step_count} steps")
def run_sdg_pipeline(
camera_path, num_clips, num_frames_per_clip, capture_interval, use_instance_id=True, segmentation_mapping=None
):
rp = rep.create.render_product(camera_path, (1280, 720))
cosmos_writer = rep.WriterRegistry.get("CosmosWriter")
backend = rep.backends.get("DiskBackend")
out_dir = os.path.join(os.getcwd(), f"_out_cosmos_warehouse")
print(f"output_directory: {out_dir}")
backend.initialize(output_dir=out_dir)
cosmos_writer.initialize(
backend=backend, use_instance_id=use_instance_id, segmentation_mapping=segmentation_mapping
)
cosmos_writer.attach(rp)
# Make sure the timeline is playing
timeline = omni.timeline.get_timeline_interface()
if not timeline.is_playing():
timeline.play()
print(
f"Starting SDG pipeline. Capturing {num_clips} clips with {num_frames_per_clip} frames each, every {capture_interval} simulation step(s)."
)
for clip_index in range(num_clips):
print(f"Starting clip {clip_index + 1}/{num_clips}")
frames_captured_count = 0
simulation_step_index = 0
while frames_captured_count < num_frames_per_clip:
print(f"Simulation step {simulation_step_index}")
if simulation_step_index % capture_interval == 0:
print(f"\t Capturing frame {frames_captured_count + 1}/{num_frames_per_clip} for clip {clip_index + 1}")
rep.orchestrator.step(pause_timeline=False)
frames_captured_count += 1
else:
simulation_app.update()
simulation_step_index += 1
print(f"Finished clip {clip_index + 1}/{num_clips}. Captured {frames_captured_count} frames")
# Move to next clip if not the last clip
if clip_index < num_clips - 1:
print(f"Moving to next clip...")
cosmos_writer.next_clip()
print("Waiting to finish processing and writing the data")
rep.orchestrator.wait_until_complete()
print(f"Finished SDG pipeline. Captured {num_clips} clips with {num_frames_per_clip} frames each")
cosmos_writer.detach()
rp.destroy()
timeline.pause()
def run_example(
num_clips,
num_frames_per_clip,
capture_interval,
start_delay=0.0,
use_instance_id=True,
segmentation_mapping=None,
):
assets_root_path = get_assets_root_path()
stage_path = assets_root_path + STAGE_URL
print(f"Opening stage: '{stage_path}'")
omni.usd.get_context().open_stage(stage_path)
stage = omni.usd.get_context().get_stage()
# Enable script nodes
carb.settings.get_settings().set_bool("/app/omni.graph.scriptnode/opt_in", True)
# Disable capture on play on the new stage, data is captured manually using the step function
rep.orchestrator.set_capture_on_play(False)
# Set DLSS to Quality mode (2) for best SDG results (Options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
carb.settings.get_settings().set("rtx/post/dlss/execMode", 2)
# Load carter nova asset with its navigation graph
carter_url_path = assets_root_path + CARTER_NAV_ASSET_URL
print(f"Loading carter nova asset: '{carter_url_path}' at prim path: '{CARTER_NAV_PATH}'")
carter_nav_prim = add_reference_to_stage(usd_path=carter_url_path, prim_path=CARTER_NAV_PATH)
if not carter_nav_prim.GetAttribute("xformOp:translate"):
UsdGeom.Xformable(carter_nav_prim).AddTranslateOp()
carter_nav_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_POSITION)
# Set the navigation target position
carter_navigation_target_prim = stage.GetPrimAtPath(CARTER_NAV_TARGET_PATH)
if not carter_navigation_target_prim.IsValid():
print(f"Carter navigation target prim not found at path: {CARTER_NAV_TARGET_PATH}, exiting")
return
if not carter_navigation_target_prim.GetAttribute("xformOp:translate"):
UsdGeom.Xformable(carter_navigation_target_prim).AddTranslateOp()
carter_navigation_target_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_TARGET_POSITION)
# Use the carter nova front hawk camera for capturing data
camera_prim = stage.GetPrimAtPath(CARTER_CAMERA_PATH)
if not camera_prim.IsValid():
print(f"Camera prim not found at path: {CARTER_CAMERA_PATH}, exiting")
return
# Advance the timeline with the start delay if provided
if start_delay is not None and start_delay > 0:
advance_timeline_by_duration(start_delay)
# Run the SDG pipeline
run_sdg_pipeline(
camera_prim.GetPath(), num_clips, num_frames_per_clip, capture_interval, use_instance_id, segmentation_mapping
)
# Setup the environment and run the example
run_example(
num_clips=NUM_CLIPS,
num_frames_per_clip=NUM_FRAMES_PER_CLIP,
capture_interval=CAPTURE_INTERVAL,
start_delay=START_DELAY,
use_instance_id=True,
)
simulation_app.close()
import asyncio
import os
import carb
import omni.replicator.core as rep
import omni.timeline
import omni.usd
from isaacsim.core.utils.stage import add_reference_to_stage
from isaacsim.storage.native import get_assets_root_path_async
from pxr import UsdGeom
# Capture parameters
START_DELAY = 0.1 # Timeline duration delay before capturing the first clip
NUM_CLIPS = 3 # Number of video clips to capture with the CosmosWriter
NUM_FRAMES_PER_CLIP = 120 # Number of frames for each clip
CAPTURE_INTERVAL = 2 # Capture interval between frames (capture every N simulation steps)
# Stage and asset paths
STAGE_URL = "/Isaac/Samples/Replicator/Stage/full_warehouse_worker_and_anim_cameras.usd"
CARTER_NAV_ASSET_URL = "/Isaac/Samples/Replicator/OmniGraph/nova_carter_nav_only.usd"
CARTER_NAV_PATH = "/NavWorld/CarterNav"
CARTER_NAV_TARGET_PATH = f"{CARTER_NAV_PATH}/targetXform"
CARTER_CAMERA_PATH = f"{CARTER_NAV_PATH}/chassis_link/sensors/front_hawk/left/camera_left"
CARTER_NAV_POSITION = (-6, 4, 0)
CARTER_NAV_TARGET_POSITION = (3, 3, 0)
async def advance_timeline_by_duration_async(duration: float, max_updates: int = 1000):
timeline = omni.timeline.get_timeline_interface()
current_time = timeline.get_current_time()
target_time = current_time + duration
if timeline.get_end_time() < target_time:
timeline.set_end_time(1000000)
if not timeline.is_playing():
timeline.play()
print(f"Advancing timeline from {current_time:.4f}s to {target_time:.4f}s")
step_count = 0
while current_time < target_time:
if step_count >= max_updates:
print(f"Max updates reached: {step_count}, finishing timeline advance.")
break
prev_time = current_time
await omni.kit.app.get_app().next_update_async()
current_time = timeline.get_current_time()
step_count += 1
if step_count % 10 == 0:
print(f"\tStep {step_count}, {current_time:.4f}s/{target_time:.4f}s")
if current_time <= prev_time:
print(f"Warning: Timeline did not advance at update {step_count} (time: {current_time:.4f}s).")
print(f"Finished advancing timeline to {timeline.get_end_time():.4f}s in {step_count} steps")
async def run_sdg_pipeline_async(
camera_path,
num_clips,
num_frames_per_clip,
capture_interval,
use_instance_id=True,
segmentation_mapping=None,
):
rp = rep.create.render_product(camera_path, (1280, 720))
cosmos_writer = rep.WriterRegistry.get("CosmosWriter")
backend = rep.backends.get("DiskBackend")
out_dir = os.path.join(os.getcwd(), f"_out_cosmos_warehouse")
print(f"output_directory: {out_dir}")
backend.initialize(output_dir=out_dir)
cosmos_writer.initialize(
backend=backend, use_instance_id=use_instance_id, segmentation_mapping=segmentation_mapping
)
cosmos_writer.attach(rp)
# Make sure the timeline is playing
timeline = omni.timeline.get_timeline_interface()
if not timeline.is_playing():
timeline.play()
print(
f"Starting SDG pipeline. Capturing {num_clips} clips with {num_frames_per_clip} frames each, every {capture_interval} simulation step(s)."
)
for clip_index in range(num_clips):
print(f"Starting clip {clip_index + 1}/{num_clips}")
frames_captured_count = 0
simulation_step_index = 0
while frames_captured_count < num_frames_per_clip:
print(f"Simulation step {simulation_step_index}")
if simulation_step_index % capture_interval == 0:
print(
f"\t Capturing frame {frames_captured_count + 1}/{num_frames_per_clip} for clip {clip_index + 1}"
)
await rep.orchestrator.step_async(pause_timeline=False)
frames_captured_count += 1
else:
await omni.kit.app.get_app().next_update_async()
simulation_step_index += 1
print(f"Finished clip {clip_index + 1}/{num_clips}. Captured {frames_captured_count} frames")
# Move to next clip if not the last clip
if clip_index < num_clips - 1:
print(f"Moving to next clip...")
cosmos_writer.next_clip()
print("Waiting to finish processing and writing the data")
await rep.orchestrator.wait_until_complete_async()
print(f"Finished SDG pipeline. Captured {num_clips} clips with {num_frames_per_clip} frames each")
cosmos_writer.detach()
rp.destroy()
timeline.pause()
async def run_example_async(
num_clips,
num_frames_per_clip,
capture_interval,
start_delay=0.0,
use_instance_id=True,
segmentation_mapping=None,
):
assets_root_path = await get_assets_root_path_async()
stage_path = assets_root_path + STAGE_URL
print(f"Opening stage: '{stage_path}'")
omni.usd.get_context().open_stage(stage_path)
stage = omni.usd.get_context().get_stage()
# Enable script nodes
carb.settings.get_settings().set_bool("/app/omni.graph.scriptnode/opt_in", True)
# Disable capture on play on the new stage, data is captured manually using the step function
rep.orchestrator.set_capture_on_play(False)
# Set DLSS to Quality mode (2) for best SDG results (Options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
carb.settings.get_settings().set("rtx/post/dlss/execMode", 2)
# Load carter nova asset with its navigation graph
carter_url_path = assets_root_path + CARTER_NAV_ASSET_URL
print(f"Loading carter nova asset: '{carter_url_path}' at prim path: '{CARTER_NAV_PATH}'")
carter_nav_prim = add_reference_to_stage(usd_path=carter_url_path, prim_path=CARTER_NAV_PATH)
if not carter_nav_prim.GetAttribute("xformOp:translate"):
UsdGeom.Xformable(carter_nav_prim).AddTranslateOp()
carter_nav_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_POSITION)
# Set the navigation target position
carter_navigation_target_prim = stage.GetPrimAtPath(CARTER_NAV_TARGET_PATH)
if not carter_navigation_target_prim.IsValid():
print(f"Carter navigation target prim not found at path: {CARTER_NAV_TARGET_PATH}, exiting")
return
if not carter_navigation_target_prim.GetAttribute("xformOp:translate"):
UsdGeom.Xformable(carter_navigation_target_prim).AddTranslateOp()
carter_navigation_target_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_TARGET_POSITION)
# Use the carter nova front hawk camera for capturing data
camera_prim = stage.GetPrimAtPath(CARTER_CAMERA_PATH)
if not camera_prim.IsValid():
print(f"Camera prim not found at path: {CARTER_CAMERA_PATH}, exiting")
return
# Advance the timeline with the start delay if provided
if start_delay is not None and start_delay > 0:
await advance_timeline_by_duration_async(start_delay)
# Run the SDG pipeline
await run_sdg_pipeline_async(
camera_prim.GetPath(),
num_clips,
num_frames_per_clip,
capture_interval,
use_instance_id,
segmentation_mapping,
)
# Setup the environment and run the example
asyncio.ensure_future(run_example_async(
num_clips=NUM_CLIPS,
num_frames_per_clip=NUM_FRAMES_PER_CLIP,
capture_interval=CAPTURE_INTERVAL,
start_delay=START_DELAY,
use_instance_id=True,
))
This tab explains how the warehouse navigation example works and how the CosmosWriter captures multi-modal data during robot movement.
Script Overview
The script simulates a Carter Nova robot navigating through a warehouse while capturing synchronized multi-modal data from its front camera. The robot moves from a starting position to a target location, and the CosmosWriter generates ground truth data for Cosmos Transfer.
Main Execution Flow
# Load warehouse environment
stage_path = assets_root_path + STAGE_URL
omni.usd.get_context().open_stage(stage_path)
# Add Carter Nova robot with navigation
carter_nav_prim = add_reference_to_stage(usd_path=carter_url_path, prim_path=CARTER_NAV_PATH)
carter_nav_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_POSITION)
# Set navigation target
carter_navigation_target_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_TARGET_POSITION)
# Run SDG pipeline
run_sdg_pipeline(camera_path, num_clips, num_frames_per_clip, capture_interval)
Key Configuration Parameters
Capture Parameters
NUM_CLIPS = 2
: Generate 2 separate video clipsNUM_FRAMES_PER_CLIP = 10
: Each clip contains 10 framesCAPTURE_INTERVAL = 2
: Capture every 2nd simulation stepSTART_DELAY = 0.1
: Custom delay to start capturing at a specific time
Data Capture Pipeline
The run_sdg_pipeline
function orchestrates the entire capture process:
SDG Pipeline Implementation
def run_sdg_pipeline(camera_path, num_clips, num_frames_per_clip, capture_interval, use_instance_id=True):
# Create render product from robot's camera
rp = rep.create.render_product(camera_path, (1280, 720))
# Initialize CosmosWriter
cosmos_writer = rep.WriterRegistry.get("CosmosWriter")
backend = rep.backends.get("DiskBackend")
backend.initialize(output_dir="_out_cosmos_warehouse")
cosmos_writer.initialize(backend=backend, use_instance_id=use_instance_id)
cosmos_writer.attach(rp)
# Capture multiple clips
for clip_index in range(num_clips):
# Capture frames for current clip
frames_captured_count = 0
while frames_captured_count < num_frames_per_clip:
if simulation_step_index % capture_interval == 0:
rep.orchestrator.step(pause_timeline=False)
frames_captured_count += 1
else:
simulation_app.update()
# Move to next clip
if clip_index < num_clips - 1:
cosmos_writer.next_clip()
Key aspects:
- The render product is created from the robot’s front camera at 1280x720 resolution
- pause_timeline=False
allows the robot to continue moving during capture
- The simulation advances between captures to show navigation progress
CosmosWriter Configuration
Writer Modes and Parameters
The CosmosWriter supports two segmentation modes:
Instance ID Mode (default):
cosmos_writer.initialize( backend=backend, use_instance_id=True, # Automatic object tracking segmentation_mapping=None # No semantic labels needed )
Semantic Segmentation Mode:
segmentation_mapping = { "floor": [255, 0, 0, 255], "rack": [0, 255, 0, 255] } cosmos_writer.initialize( backend=backend, segmentation_mapping=segmentation_mapping # Overrides instance ID )
Timeline Management
The script uses a helper function to advance the timeline before starting capture:
Timeline Advancement
def advance_timeline_by_duration(duration: float, max_updates: int = 1000): timeline = omni.timeline.get_timeline_interface() current_time = timeline.get_current_time() target_time = current_time + duration while current_time < target_time: simulation_app.update() current_time = timeline.get_current_time() This ensures the scene is fully initialized and the robot begins moving before data capture starts.
Output Structure#
The CosmosWriter
generates organized multi-modal data optimized for Cosmos Transfer. Each clip represents a continuous sequence of frames captured during robot navigation:
_out_cosmos_warehouse/
clip_0000/ # First clip sequence
rgb/ # Standard color images
rgb_0000.png, rgb_0001.png, ...
depth/ # Colorized depth visualization
depth_0000.png, depth_0001.png, ...
segmentation/ # Instance/semantic masks
segmentation_0000.png, segmentation_0001.png, ...
shaded_seg/ # Segmentation with realistic shading
shaded_seg_0000.png, shaded_seg_0001.png, ...
edges/ # Canny edge detection results
edges_0000.png, edges_0001.png, ...
rgb.mp4 # Combined RGB video
depth.mp4 # Combined depth video
segmentation.mp4 # Combined segmentation video
shaded_seg.mp4 # Combined shaded segmentation video
edges.mp4 # Combined edges video
clip_0001/ # Next clip sequence
Advanced Usage#
Custom Segmentation Colors:
Map specific semantic labels to custom colors when you need consistent class identification across datasets. Use this when training models that require specific object classes to maintain the same color/ID across all training data, ensuring Cosmos Transfer preserves class relationships.
segmentation_mapping = {
"floor": [255, 0, 0, 255], # Red
"wall": [0, 255, 0, 255], # Green
"rack": [0, 0, 255, 255] # Blue
}
# Note: This overrides instance ID mode and requires semantic annotations
cosmos_writer.initialize(
backend=backend,
segmentation_mapping=segmentation_mapping
)
Edge Detection Tuning:
Adjust Canny edge detection parameters for the hysteresis procedure when generating edge maps. The Canny algorithm uses two thresholds:
Low threshold: Edges with gradient magnitude above this value are considered as potential edges
High threshold: Edges with gradient magnitude above this value are definitely edges
Lower threshold values detect more edges (including noise), while higher values produce cleaner output with only strong edges. Values typically range from 10-200.
cosmos_writer.initialize(
backend=backend,
use_instance_id=True,
canny_threshold_low=10, # Low threshold for hysteresis
canny_threshold_high=100 # High threshold for hysteresis
)
Using Data with Cosmos Transfer#
The generated data can be used with Cosmos Transfer to create high-quality visual simulations. Here’s how the modalities map to Transfer’s control branches:
Basic Single Control Example:
{
"prompt": "A modern warehouse with autonomous robots...",
"input_video_path": "_out_cosmos_warehouse/clip_0000/rgb.mp4",
"edge": {
"control_weight": 1.0
}
}
Multi-Modal Control Example:
{
"prompt": "High-quality warehouse simulation...",
"input_video_path": "_out_cosmos_warehouse/clip_0000/rgb.mp4",
"vis": {"control_weight": 0.25},
"edge": {"control_weight": 0.25},
"depth": {
"input_control": "_out_cosmos_warehouse/clip_0000/depth.mp4",
"control_weight": 0.25
},
"seg": {
"input_control": "_out_cosmos_warehouse/clip_0000/segmentation.mp4",
"control_weight": 0.25
}
}
Key Considerations:
Control Weights: Values 0.0-1.0 control adherence (higher = stricter following, lower = more creative freedom)
Automatic Normalization: If total weights > 1.0, they’re normalized automatically
Prompting: Focus on single scenes with rich descriptions; avoid camera control instructions
Safety: Human faces are automatically blurred by Cosmos Guardrail
For advanced features like spatiotemporal control maps and prompt upsampling, refer to the Cosmos Transfer documentation.
Summary#
This tutorial demonstrated using the CosmosWriter to generate synchronized multi-modal data from a robot navigating a warehouse. The output provides ground truth for Cosmos Transfer to create high-quality visual simulations for physical AI applications.