Cosmos Synthetic Data Generation#

This tutorial demonstrates generating multi-modal synthetic data for NVIDIA Cosmos using the CosmosWriter in Isaac Sim. The writer captures synchronized RGB, depth, segmentation, and edge data from a robot navigating a warehouse environment.

The generated data serves as ground truth input for Cosmos Transfer, which transforms low-resolution control signals into high-quality visual simulations through its Multi-ControlNet architecture.

Multi-modal data captured from robot perspective: RGB, depth, segmentation, shaded segmentation, and edge maps

Prerequisites#

What the CosmosWriter Generates#

The writer outputs five synchronized modalities from the robot’s camera:

  • RGB - Color imagery (vis control)

  • Depth - Distance-to-camera for spatial understanding

  • Segmentation - Instance masks for object tracking

  • Shaded Segmentation - Instance masks with realistic shading

  • Edges - Canny edge detection for boundaries

These modalities correspond to Cosmos Transfer’s control branches:

  • vis: Uses RGB imagery with bilateral blurring

  • edge: Applies Canny edge detection (tunable thresholds)

  • depth: Depth maps for 3D structure understanding

  • seg: Segmentation masks for object identification

Each control branch can be weighted (0.0-1.0) to balance adherence vs. creative freedom in the generated output.

Implementation#

This example demonstrates a Carter Nova robot autonomously navigating through a warehouse environment. As the robot moves from its starting position to a target location, the CosmosWriter captures synchronized multi-modal data (RGB, depth, segmentation, shaded segmentation, and edges) from the robot’s front camera. The captured data is organized into clips, with each clip containing a sequence of frames that can be used as input for Cosmos Transfer.

The example can be run as a standalone application using the following commands in the terminal (on Windows use python.bat instead of python.sh):

./python.sh standalone_examples/api/isaacsim.replicator.examples/cosmos_writer_warehouse.py
from isaacsim import SimulationApp

simulation_app = SimulationApp(launch_config={"headless": False})

import os

import carb
import omni.replicator.core as rep
import omni.timeline
import omni.usd
from isaacsim.core.utils.stage import add_reference_to_stage
from isaacsim.storage.native import get_assets_root_path
from pxr import UsdGeom

# Capture parameters
START_DELAY = 0.1  # Timeline duration delay before capturing the first clip
NUM_CLIPS = 2  # Number of video clips to capture with the CosmosWriter
NUM_FRAMES_PER_CLIP = 10  # Number of frames for each clip
CAPTURE_INTERVAL = 2  # Capture interval between frames (capture every N simulation steps)

# Stage and asset paths
STAGE_URL = "/Isaac/Samples/Replicator/Stage/full_warehouse_worker_and_anim_cameras.usd"
CARTER_NAV_ASSET_URL = "/Isaac/Samples/Replicator/OmniGraph/nova_carter_nav_only.usd"
CARTER_NAV_PATH = "/NavWorld/CarterNav"
CARTER_NAV_TARGET_PATH = f"{CARTER_NAV_PATH}/targetXform"
CARTER_CAMERA_PATH = f"{CARTER_NAV_PATH}/chassis_link/sensors/front_hawk/left/camera_left"
CARTER_NAV_POSITION = (-6, 4, 0)
CARTER_NAV_TARGET_POSITION = (3, 3, 0)


def advance_timeline_by_duration(duration: float, max_updates: int = 1000):
    timeline = omni.timeline.get_timeline_interface()
    current_time = timeline.get_current_time()
    target_time = current_time + duration

    if timeline.get_end_time() < target_time:
        timeline.set_end_time(1000000)

    if not timeline.is_playing():
        timeline.play()

    print(f"Advancing timeline from {current_time:.4f}s to {target_time:.4f}s")
    step_count = 0
    while current_time < target_time:
        if step_count >= max_updates:
            print(f"Max updates reached: {step_count}, finishing timeline advance.")
            break

        prev_time = current_time
        simulation_app.update()
        current_time = timeline.get_current_time()
        step_count += 1

        if step_count % 10 == 0:
            print(f"\tStep {step_count}, {current_time:.4f}s/{target_time:.4f}s")

        if current_time <= prev_time:
            print(f"Warning: Timeline did not advance at update {step_count} (time: {current_time:.4f}s).")
    print(f"Finished advancing timeline to {timeline.get_end_time():.4f}s in {step_count} steps")


def run_sdg_pipeline(
    camera_path, num_clips, num_frames_per_clip, capture_interval, use_instance_id=True, segmentation_mapping=None
):
    rp = rep.create.render_product(camera_path, (1280, 720))
    cosmos_writer = rep.WriterRegistry.get("CosmosWriter")
    backend = rep.backends.get("DiskBackend")
    out_dir = os.path.join(os.getcwd(), f"_out_cosmos_warehouse")
    print(f"output_directory: {out_dir}")
    backend.initialize(output_dir=out_dir)
    cosmos_writer.initialize(
        backend=backend, use_instance_id=use_instance_id, segmentation_mapping=segmentation_mapping
    )
    cosmos_writer.attach(rp)

    # Make sure the timeline is playing
    timeline = omni.timeline.get_timeline_interface()
    if not timeline.is_playing():
        timeline.play()

    print(
        f"Starting SDG pipeline. Capturing {num_clips} clips with {num_frames_per_clip} frames each, every {capture_interval} simulation step(s)."
    )

    for clip_index in range(num_clips):
        print(f"Starting clip {clip_index + 1}/{num_clips}")

        frames_captured_count = 0
        simulation_step_index = 0
        while frames_captured_count < num_frames_per_clip:
            print(f"Simulation step {simulation_step_index}")
            if simulation_step_index % capture_interval == 0:
                print(f"\t Capturing frame {frames_captured_count + 1}/{num_frames_per_clip} for clip {clip_index + 1}")
                rep.orchestrator.step(pause_timeline=False)
                frames_captured_count += 1
            else:
                simulation_app.update()
            simulation_step_index += 1

        print(f"Finished clip {clip_index + 1}/{num_clips}. Captured {frames_captured_count} frames")

        # Move to next clip if not the last clip
        if clip_index < num_clips - 1:
            print(f"Moving to next clip...")
            cosmos_writer.next_clip()

    print("Waiting to finish processing and writing the data")
    rep.orchestrator.wait_until_complete()
    print(f"Finished SDG pipeline. Captured {num_clips} clips with {num_frames_per_clip} frames each")
    cosmos_writer.detach()
    rp.destroy()
    timeline.pause()


def run_example(
    num_clips,
    num_frames_per_clip,
    capture_interval,
    start_delay=0.0,
    use_instance_id=True,
    segmentation_mapping=None,
):
    assets_root_path = get_assets_root_path()
    stage_path = assets_root_path + STAGE_URL
    print(f"Opening stage: '{stage_path}'")
    omni.usd.get_context().open_stage(stage_path)
    stage = omni.usd.get_context().get_stage()

    # Enable script nodes
    carb.settings.get_settings().set_bool("/app/omni.graph.scriptnode/opt_in", True)

    # Disable capture on play on the new stage, data is captured manually using the step function
    rep.orchestrator.set_capture_on_play(False)

    # Set DLSS to Quality mode (2) for best SDG results (Options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
    carb.settings.get_settings().set("rtx/post/dlss/execMode", 2)

    # Load carter nova asset with its navigation graph
    carter_url_path = assets_root_path + CARTER_NAV_ASSET_URL
    print(f"Loading carter nova asset: '{carter_url_path}' at prim path: '{CARTER_NAV_PATH}'")
    carter_nav_prim = add_reference_to_stage(usd_path=carter_url_path, prim_path=CARTER_NAV_PATH)

    if not carter_nav_prim.GetAttribute("xformOp:translate"):
        UsdGeom.Xformable(carter_nav_prim).AddTranslateOp()
    carter_nav_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_POSITION)

    # Set the navigation target position
    carter_navigation_target_prim = stage.GetPrimAtPath(CARTER_NAV_TARGET_PATH)
    if not carter_navigation_target_prim.IsValid():
        print(f"Carter navigation target prim not found at path: {CARTER_NAV_TARGET_PATH}, exiting")
        return
    if not carter_navigation_target_prim.GetAttribute("xformOp:translate"):
        UsdGeom.Xformable(carter_navigation_target_prim).AddTranslateOp()
    carter_navigation_target_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_TARGET_POSITION)

    # Use the carter nova front hawk camera for capturing data
    camera_prim = stage.GetPrimAtPath(CARTER_CAMERA_PATH)
    if not camera_prim.IsValid():
        print(f"Camera prim not found at path: {CARTER_CAMERA_PATH}, exiting")
        return

    # Advance the timeline with the start delay if provided
    if start_delay is not None and start_delay > 0:
        advance_timeline_by_duration(start_delay)

    # Run the SDG pipeline
    run_sdg_pipeline(
        camera_prim.GetPath(), num_clips, num_frames_per_clip, capture_interval, use_instance_id, segmentation_mapping
    )


# Setup the environment and run the example
run_example(
    num_clips=NUM_CLIPS,
    num_frames_per_clip=NUM_FRAMES_PER_CLIP,
    capture_interval=CAPTURE_INTERVAL,
    start_delay=START_DELAY,
    use_instance_id=True,
)

simulation_app.close()
import asyncio
import os

import carb
import omni.replicator.core as rep
import omni.timeline
import omni.usd
from isaacsim.core.utils.stage import add_reference_to_stage
from isaacsim.storage.native import get_assets_root_path_async
from pxr import UsdGeom

# Capture parameters
START_DELAY = 0.1  # Timeline duration delay before capturing the first clip
NUM_CLIPS = 3  # Number of video clips to capture with the CosmosWriter
NUM_FRAMES_PER_CLIP = 120  # Number of frames for each clip
CAPTURE_INTERVAL = 2  # Capture interval between frames (capture every N simulation steps)

# Stage and asset paths
STAGE_URL = "/Isaac/Samples/Replicator/Stage/full_warehouse_worker_and_anim_cameras.usd"
CARTER_NAV_ASSET_URL = "/Isaac/Samples/Replicator/OmniGraph/nova_carter_nav_only.usd"
CARTER_NAV_PATH = "/NavWorld/CarterNav"
CARTER_NAV_TARGET_PATH = f"{CARTER_NAV_PATH}/targetXform"
CARTER_CAMERA_PATH = f"{CARTER_NAV_PATH}/chassis_link/sensors/front_hawk/left/camera_left"
CARTER_NAV_POSITION = (-6, 4, 0)
CARTER_NAV_TARGET_POSITION = (3, 3, 0)

async def advance_timeline_by_duration_async(duration: float, max_updates: int = 1000):
    timeline = omni.timeline.get_timeline_interface()
    current_time = timeline.get_current_time()
    target_time = current_time + duration

    if timeline.get_end_time() < target_time:
        timeline.set_end_time(1000000)

    if not timeline.is_playing():
        timeline.play()

    print(f"Advancing timeline from {current_time:.4f}s to {target_time:.4f}s")
    step_count = 0
    while current_time < target_time:
        if step_count >= max_updates:
            print(f"Max updates reached: {step_count}, finishing timeline advance.")
            break

        prev_time = current_time
        await omni.kit.app.get_app().next_update_async()
        current_time = timeline.get_current_time()
        step_count += 1

        if step_count % 10 == 0:
            print(f"\tStep {step_count}, {current_time:.4f}s/{target_time:.4f}s")

        if current_time <= prev_time:
            print(f"Warning: Timeline did not advance at update {step_count} (time: {current_time:.4f}s).")
    print(f"Finished advancing timeline to {timeline.get_end_time():.4f}s in {step_count} steps")

async def run_sdg_pipeline_async(
    camera_path,
    num_clips,
    num_frames_per_clip,
    capture_interval,
    use_instance_id=True,
    segmentation_mapping=None,
):
    rp = rep.create.render_product(camera_path, (1280, 720))
    cosmos_writer = rep.WriterRegistry.get("CosmosWriter")
    backend = rep.backends.get("DiskBackend")
    out_dir = os.path.join(os.getcwd(), f"_out_cosmos_warehouse")
    print(f"output_directory: {out_dir}")
    backend.initialize(output_dir=out_dir)
    cosmos_writer.initialize(
        backend=backend, use_instance_id=use_instance_id, segmentation_mapping=segmentation_mapping
    )
    cosmos_writer.attach(rp)

    # Make sure the timeline is playing
    timeline = omni.timeline.get_timeline_interface()
    if not timeline.is_playing():
        timeline.play()

    print(
        f"Starting SDG pipeline. Capturing {num_clips} clips with {num_frames_per_clip} frames each, every {capture_interval} simulation step(s)."
    )

    for clip_index in range(num_clips):
        print(f"Starting clip {clip_index + 1}/{num_clips}")

        frames_captured_count = 0
        simulation_step_index = 0
        while frames_captured_count < num_frames_per_clip:
            print(f"Simulation step {simulation_step_index}")
            if simulation_step_index % capture_interval == 0:
                print(
                    f"\t Capturing frame {frames_captured_count + 1}/{num_frames_per_clip} for clip {clip_index + 1}"
                )
                await rep.orchestrator.step_async(pause_timeline=False)
                frames_captured_count += 1
            else:
                await omni.kit.app.get_app().next_update_async()
            simulation_step_index += 1

        print(f"Finished clip {clip_index + 1}/{num_clips}. Captured {frames_captured_count} frames")

        # Move to next clip if not the last clip
        if clip_index < num_clips - 1:
            print(f"Moving to next clip...")
            cosmos_writer.next_clip()

    print("Waiting to finish processing and writing the data")
    await rep.orchestrator.wait_until_complete_async()
    print(f"Finished SDG pipeline. Captured {num_clips} clips with {num_frames_per_clip} frames each")
    cosmos_writer.detach()
    rp.destroy()
    timeline.pause()

async def run_example_async(
    num_clips,
    num_frames_per_clip,
    capture_interval,
    start_delay=0.0,
    use_instance_id=True,
    segmentation_mapping=None,
):
    assets_root_path = await get_assets_root_path_async()
    stage_path = assets_root_path + STAGE_URL
    print(f"Opening stage: '{stage_path}'")
    omni.usd.get_context().open_stage(stage_path)
    stage = omni.usd.get_context().get_stage()

    # Enable script nodes
    carb.settings.get_settings().set_bool("/app/omni.graph.scriptnode/opt_in", True)

    # Disable capture on play on the new stage, data is captured manually using the step function
    rep.orchestrator.set_capture_on_play(False)

    # Set DLSS to Quality mode (2) for best SDG results (Options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
    carb.settings.get_settings().set("rtx/post/dlss/execMode", 2)

    # Load carter nova asset with its navigation graph
    carter_url_path = assets_root_path + CARTER_NAV_ASSET_URL
    print(f"Loading carter nova asset: '{carter_url_path}' at prim path: '{CARTER_NAV_PATH}'")
    carter_nav_prim = add_reference_to_stage(usd_path=carter_url_path, prim_path=CARTER_NAV_PATH)

    if not carter_nav_prim.GetAttribute("xformOp:translate"):
        UsdGeom.Xformable(carter_nav_prim).AddTranslateOp()
    carter_nav_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_POSITION)

    # Set the navigation target position
    carter_navigation_target_prim = stage.GetPrimAtPath(CARTER_NAV_TARGET_PATH)
    if not carter_navigation_target_prim.IsValid():
        print(f"Carter navigation target prim not found at path: {CARTER_NAV_TARGET_PATH}, exiting")
        return
    if not carter_navigation_target_prim.GetAttribute("xformOp:translate"):
        UsdGeom.Xformable(carter_navigation_target_prim).AddTranslateOp()
    carter_navigation_target_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_TARGET_POSITION)

    # Use the carter nova front hawk camera for capturing data
    camera_prim = stage.GetPrimAtPath(CARTER_CAMERA_PATH)
    if not camera_prim.IsValid():
        print(f"Camera prim not found at path: {CARTER_CAMERA_PATH}, exiting")
        return

    # Advance the timeline with the start delay if provided
    if start_delay is not None and start_delay > 0:
        await advance_timeline_by_duration_async(start_delay)

    # Run the SDG pipeline
    await run_sdg_pipeline_async(
        camera_prim.GetPath(),
        num_clips,
        num_frames_per_clip,
        capture_interval,
        use_instance_id,
        segmentation_mapping,
    )

# Setup the environment and run the example
asyncio.ensure_future(run_example_async(
    num_clips=NUM_CLIPS,
    num_frames_per_clip=NUM_FRAMES_PER_CLIP,
    capture_interval=CAPTURE_INTERVAL,
    start_delay=START_DELAY,
    use_instance_id=True,
))

This tab explains how the warehouse navigation example works and how the CosmosWriter captures multi-modal data during robot movement.

Script Overview

The script simulates a Carter Nova robot navigating through a warehouse while capturing synchronized multi-modal data from its front camera. The robot moves from a starting position to a target location, and the CosmosWriter generates ground truth data for Cosmos Transfer.

Main Execution Flow
# Load warehouse environment
stage_path = assets_root_path + STAGE_URL
omni.usd.get_context().open_stage(stage_path)

# Add Carter Nova robot with navigation
carter_nav_prim = add_reference_to_stage(usd_path=carter_url_path, prim_path=CARTER_NAV_PATH)
carter_nav_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_POSITION)

# Set navigation target
carter_navigation_target_prim.GetAttribute("xformOp:translate").Set(CARTER_NAV_TARGET_POSITION)

# Run SDG pipeline
run_sdg_pipeline(camera_path, num_clips, num_frames_per_clip, capture_interval)

Key Configuration Parameters

Capture Parameters
  • NUM_CLIPS = 2: Generate 2 separate video clips

  • NUM_FRAMES_PER_CLIP = 10: Each clip contains 10 frames

  • CAPTURE_INTERVAL = 2: Capture every 2nd simulation step

  • START_DELAY = 0.1: Custom delay to start capturing at a specific time

Data Capture Pipeline

The run_sdg_pipeline function orchestrates the entire capture process:

SDG Pipeline Implementation
def run_sdg_pipeline(camera_path, num_clips, num_frames_per_clip, capture_interval, use_instance_id=True):
    # Create render product from robot's camera
    rp = rep.create.render_product(camera_path, (1280, 720))

    # Initialize CosmosWriter
    cosmos_writer = rep.WriterRegistry.get("CosmosWriter")
    backend = rep.backends.get("DiskBackend")
    backend.initialize(output_dir="_out_cosmos_warehouse")
    cosmos_writer.initialize(backend=backend, use_instance_id=use_instance_id)
    cosmos_writer.attach(rp)

    # Capture multiple clips
    for clip_index in range(num_clips):
        # Capture frames for current clip
        frames_captured_count = 0
        while frames_captured_count < num_frames_per_clip:
            if simulation_step_index % capture_interval == 0:
                rep.orchestrator.step(pause_timeline=False)
                frames_captured_count += 1
            else:
                simulation_app.update()

        # Move to next clip
        if clip_index < num_clips - 1:
            cosmos_writer.next_clip()

Key aspects: - The render product is created from the robot’s front camera at 1280x720 resolution - pause_timeline=False allows the robot to continue moving during capture - The simulation advances between captures to show navigation progress

CosmosWriter Configuration

Writer Modes and Parameters

The CosmosWriter supports two segmentation modes:

  1. Instance ID Mode (default):

    cosmos_writer.initialize(
        backend=backend,
        use_instance_id=True,      # Automatic object tracking
        segmentation_mapping=None  # No semantic labels needed
    )
    
  2. Semantic Segmentation Mode:

    segmentation_mapping = {
        "floor": [255, 0, 0, 255],
        "rack": [0, 255, 0, 255]
    }
    cosmos_writer.initialize(
        backend=backend,
        segmentation_mapping=segmentation_mapping  # Overrides instance ID
    )
    

Timeline Management

The script uses a helper function to advance the timeline before starting capture:

Timeline Advancement
def advance_timeline_by_duration(duration: float, max_updates: int = 1000):
    timeline = omni.timeline.get_timeline_interface()
    current_time = timeline.get_current_time()
    target_time = current_time + duration

    while current_time < target_time:
        simulation_app.update()
        current_time = timeline.get_current_time()

This ensures the scene is fully initialized and the robot begins moving before data capture starts.

Output Structure#

The CosmosWriter generates organized multi-modal data optimized for Cosmos Transfer. Each clip represents a continuous sequence of frames captured during robot navigation:

_out_cosmos_warehouse/
  clip_0000/                    # First clip sequence
    rgb/                        # Standard color images
      rgb_0000.png, rgb_0001.png, ...
    depth/                      # Colorized depth visualization
      depth_0000.png, depth_0001.png, ...
    segmentation/              # Instance/semantic masks
      segmentation_0000.png, segmentation_0001.png, ...
    shaded_seg/                # Segmentation with realistic shading
      shaded_seg_0000.png, shaded_seg_0001.png, ...
    edges/                      # Canny edge detection results
      edges_0000.png, edges_0001.png, ...
    rgb.mp4                     # Combined RGB video
    depth.mp4                   # Combined depth video
    segmentation.mp4            # Combined segmentation video
    shaded_seg.mp4              # Combined shaded segmentation video
    edges.mp4                   # Combined edges video
  clip_0001/                    # Next clip sequence

Advanced Usage#

Custom Segmentation Colors:

Map specific semantic labels to custom colors when you need consistent class identification across datasets. Use this when training models that require specific object classes to maintain the same color/ID across all training data, ensuring Cosmos Transfer preserves class relationships.

segmentation_mapping = {
    "floor": [255, 0, 0, 255],   # Red
    "wall": [0, 255, 0, 255],    # Green
    "rack": [0, 0, 255, 255]     # Blue
}

# Note: This overrides instance ID mode and requires semantic annotations
cosmos_writer.initialize(
    backend=backend,
    segmentation_mapping=segmentation_mapping
)

Edge Detection Tuning:

Adjust Canny edge detection parameters for the hysteresis procedure when generating edge maps. The Canny algorithm uses two thresholds:

  • Low threshold: Edges with gradient magnitude above this value are considered as potential edges

  • High threshold: Edges with gradient magnitude above this value are definitely edges

Lower threshold values detect more edges (including noise), while higher values produce cleaner output with only strong edges. Values typically range from 10-200.

cosmos_writer.initialize(
    backend=backend,
    use_instance_id=True,
    canny_threshold_low=10,      # Low threshold for hysteresis
    canny_threshold_high=100     # High threshold for hysteresis
)

Using Data with Cosmos Transfer#

The generated data can be used with Cosmos Transfer to create high-quality visual simulations. Here’s how the modalities map to Transfer’s control branches:

Basic Single Control Example:

{
    "prompt": "A modern warehouse with autonomous robots...",
    "input_video_path": "_out_cosmos_warehouse/clip_0000/rgb.mp4",
    "edge": {
        "control_weight": 1.0
    }
}

Multi-Modal Control Example:

{
    "prompt": "High-quality warehouse simulation...",
    "input_video_path": "_out_cosmos_warehouse/clip_0000/rgb.mp4",
    "vis": {"control_weight": 0.25},
    "edge": {"control_weight": 0.25},
    "depth": {
        "input_control": "_out_cosmos_warehouse/clip_0000/depth.mp4",
        "control_weight": 0.25
    },
    "seg": {
        "input_control": "_out_cosmos_warehouse/clip_0000/segmentation.mp4",
        "control_weight": 0.25
    }
}

Key Considerations:

  • Control Weights: Values 0.0-1.0 control adherence (higher = stricter following, lower = more creative freedom)

  • Automatic Normalization: If total weights > 1.0, they’re normalized automatically

  • Prompting: Focus on single scenes with rich descriptions; avoid camera control instructions

  • Safety: Human faces are automatically blurred by Cosmos Guardrail

For advanced features like spatiotemporal control maps and prompt upsampling, refer to the Cosmos Transfer documentation.

Summary#

This tutorial demonstrated using the CosmosWriter to generate synchronized multi-modal data from a robot navigating a warehouse. The output provides ground truth for Cosmos Transfer to create high-quality visual simulations for physical AI applications.