Randomization Snippets#

Examples of randomization using USD and Isaac Sim APIs. These examples demonstrate how to randomize scenes for synthetic data generation (SDG) in scenarios where default replicator randomizers are not sufficient or applicable.

The snippets are designed to align with the structure and function names used in the replicator example snippets. In comparison they also have the option to write the data to disk by stetting write_data=True.

Prerequisites:

  • Familiarity with USD.

  • Ability to execute code from the Script Editor.

  • Understanding basic replicator concepts, such as subframes.

Randomizing Light Sources#

This snippet sets up a new environment containing a cube and a sphere. It then spawns a given number of lights and randomizes selected attributes for these lights over a specified number of frames.

../_images/isaac_tutorial_replicator_randomization_lights.gif
Randomizing Light Sources
import asyncio
import os

import numpy as np
import omni.kit.commands
import omni.replicator.core as rep
import omni.usd
from isaacsim.core.utils.semantics import add_labels
from pxr import Gf, Sdf, UsdGeom

omni.usd.get_context().new_stage()
stage = omni.usd.get_context().get_stage()

sphere = stage.DefinePrim("/World/Sphere", "Sphere")
UsdGeom.Xformable(sphere).AddTranslateOp().Set((0.0, 1.0, 1.0))
add_labels(sphere, labels=["sphere"], instance_name="class")

cube = stage.DefinePrim("/World/Cube", "Cube")
UsdGeom.Xformable(cube).AddTranslateOp().Set((0.0, -2.0, 2.0))
add_labels(cube, labels=["cube"], instance_name="class")

plane_path = "/World/Plane"
omni.kit.commands.execute("CreateMeshPrimWithDefaultXform", prim_path=plane_path, prim_type="Plane")
plane_prim = stage.GetPrimAtPath(plane_path)
plane_prim.CreateAttribute("xformOp:scale", Sdf.ValueTypeNames.Double3, False).Set(Gf.Vec3d(10, 10, 1))


def sphere_lights(num):
    lights = []
    for i in range(num):
        # "CylinderLight", "DiskLight", "DistantLight", "DomeLight", "RectLight", "SphereLight"
        prim_type = "SphereLight"
        next_free_path = omni.usd.get_stage_next_free_path(stage, f"/World/{prim_type}", False)
        light_prim = stage.DefinePrim(next_free_path, prim_type)
        UsdGeom.Xformable(light_prim).AddTranslateOp().Set((0.0, 0.0, 0.0))
        UsdGeom.Xformable(light_prim).AddRotateXYZOp().Set((0.0, 0.0, 0.0))
        UsdGeom.Xformable(light_prim).AddScaleOp().Set((1.0, 1.0, 1.0))
        light_prim.CreateAttribute("inputs:enableColorTemperature", Sdf.ValueTypeNames.Bool).Set(True)
        light_prim.CreateAttribute("inputs:colorTemperature", Sdf.ValueTypeNames.Float).Set(6500.0)
        light_prim.CreateAttribute("inputs:radius", Sdf.ValueTypeNames.Float).Set(0.5)
        light_prim.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(30000.0)
        light_prim.CreateAttribute("inputs:color", Sdf.ValueTypeNames.Color3f).Set((1.0, 1.0, 1.0))
        light_prim.CreateAttribute("inputs:exposure", Sdf.ValueTypeNames.Float).Set(0.0)
        light_prim.CreateAttribute("inputs:diffuse", Sdf.ValueTypeNames.Float).Set(1.0)
        light_prim.CreateAttribute("inputs:specular", Sdf.ValueTypeNames.Float).Set(1.0)
        lights.append(light_prim)
    return lights


async def run_randomizations_async(num_frames, lights, write_data, delay=None):
    if write_data:
        out_dir = os.path.join(os.getcwd(), "_out_rand_lights")
        print(f"Writing data to {out_dir}..")
        backend = rep.backends.get("DiskBackend")
        backend.initialize(output_dir=out_dir)
        writer = rep.WriterRegistry.get("BasicWriter")
        writer.initialize(backend=backend, rgb=True)
        cam = rep.functional.create.camera(position=(5, 5, 5), look_at=(0, 0, 0), name="Camera")
        rp = rep.create.render_product(cam, resolution=(512, 512))
        writer.attach(rp)

    for _ in range(num_frames):
        for light in lights:
            light.GetAttribute("xformOp:translate").Set(
                (np.random.uniform(-5, 5), np.random.uniform(-5, 5), np.random.uniform(4, 6))
            )
            scale_rand = np.random.uniform(0.5, 1.5)
            light.GetAttribute("xformOp:scale").Set((scale_rand, scale_rand, scale_rand))
            light.GetAttribute("inputs:colorTemperature").Set(np.random.normal(4500, 1500))
            light.GetAttribute("inputs:intensity").Set(np.random.normal(25000, 5000))
            light.GetAttribute("inputs:color").Set(
                (np.random.uniform(0.1, 0.9), np.random.uniform(0.1, 0.9), np.random.uniform(0.1, 0.9))
            )

        if write_data:
            await rep.orchestrator.step_async(rt_subframes=16)
        else:
            await omni.kit.app.get_app().next_update_async()
        # Optional delay between frames to better visualize the randomization in the viewport
        if delay is not None and delay > 0:
            await asyncio.sleep(delay)

    # Wait for the data to be written to disk and cleanup writer and render product
    if write_data:
        await rep.orchestrator.wait_until_complete_async()
        writer.detach()
        rp.destroy()


num_frames = 10
lights = sphere_lights(10)
asyncio.ensure_future(run_randomizations_async(num_frames=num_frames, lights=lights, write_data=True, delay=0.2))

Randomizing Textures#

The snippet sets up an environment, spawns a given number of cubes and spheres, and randomizes their textures for the given number of frames. After the randomizations their original materials are reassigned. The snippet also showcases how to create a new material and assign it to a prim.

../_images/isaac_tutorial_replicator_randomization_textures.gif
Randomizing Textures
import asyncio
import os

import numpy as np
import omni.replicator.core as rep
import omni.usd
from isaacsim.core.utils.semantics import add_labels, get_labels
from isaacsim.storage.native import get_assets_root_path_async
from pxr import Gf, Sdf, UsdGeom, UsdShade

omni.usd.get_context().new_stage()
stage = omni.usd.get_context().get_stage()
dome_light = stage.DefinePrim("/World/DomeLight", "DomeLight")
dome_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(1000.0)

sphere = stage.DefinePrim("/World/Sphere", "Sphere")
UsdGeom.Xformable(sphere).AddTranslateOp().Set((0.0, 0.0, 1.0))
add_labels(sphere, labels=["sphere"], instance_name="class")

num_cubes = 10
for _ in range(num_cubes):
    prim_type = "Cube"
    next_free_path = omni.usd.get_stage_next_free_path(stage, f"/World/{prim_type}", False)
    cube = stage.DefinePrim(next_free_path, prim_type)
    UsdGeom.Xformable(cube).AddTranslateOp().Set((np.random.uniform(-3.5, 3.5), np.random.uniform(-3.5, 3.5), 1))
    scale_rand = np.random.uniform(0.25, 0.5)
    UsdGeom.Xformable(cube).AddScaleOp().Set((scale_rand, scale_rand, scale_rand))
    add_labels(cube, labels=["cube"], instance_name="class")

plane_path = "/World/Plane"
omni.kit.commands.execute("CreateMeshPrimWithDefaultXform", prim_path=plane_path, prim_type="Plane")
plane_prim = stage.GetPrimAtPath(plane_path)
plane_prim.CreateAttribute("xformOp:scale", Sdf.ValueTypeNames.Double3, False).Set(Gf.Vec3d(10, 10, 1))


def get_shapes():
    stage = omni.usd.get_context().get_stage()
    shapes = []
    for prim in stage.Traverse():
        labels = get_labels(prim)
        if class_labels := labels.get("class"):
            if "cube" in class_labels or "sphere" in class_labels:
                shapes.append(prim)
    return shapes


shapes = get_shapes()


def create_omnipbr_material(mtl_url, mtl_name, mtl_path):
    stage = omni.usd.get_context().get_stage()
    omni.kit.commands.execute("CreateMdlMaterialPrim", mtl_url=mtl_url, mtl_name=mtl_name, mtl_path=mtl_path)
    material_prim = stage.GetPrimAtPath(mtl_path)
    shader = UsdShade.Shader(omni.usd.get_shader_from_material(material_prim, get_prim=True))

    # Add value inputs
    shader.CreateInput("diffuse_color_constant", Sdf.ValueTypeNames.Color3f)
    shader.CreateInput("reflection_roughness_constant", Sdf.ValueTypeNames.Float)
    shader.CreateInput("metallic_constant", Sdf.ValueTypeNames.Float)

    # Add texture inputs
    shader.CreateInput("diffuse_texture", Sdf.ValueTypeNames.Asset)
    shader.CreateInput("reflectionroughness_texture", Sdf.ValueTypeNames.Asset)
    shader.CreateInput("metallic_texture", Sdf.ValueTypeNames.Asset)

    # Add other attributes
    shader.CreateInput("project_uvw", Sdf.ValueTypeNames.Bool)

    # Add texture scale and rotate
    shader.CreateInput("texture_scale", Sdf.ValueTypeNames.Float2)
    shader.CreateInput("texture_rotate", Sdf.ValueTypeNames.Float)

    material = UsdShade.Material(material_prim)
    return material


def create_materials(num):
    MDL = "OmniPBR.mdl"
    mtl_name, _ = os.path.splitext(MDL)
    MAT_PATH = "/World/Looks"
    materials = []
    for _ in range(num):
        prim_path = omni.usd.get_stage_next_free_path(stage, f"{MAT_PATH}/{mtl_name}", False)
        mat = create_omnipbr_material(mtl_url=MDL, mtl_name=mtl_name, mtl_path=prim_path)
        materials.append(mat)
    return materials


materials = create_materials(len(shapes))


async def run_randomizations_async(num_frames, materials, write_data, delay=None):
    assets_root_path = await get_assets_root_path_async()
    textures = [
        assets_root_path + "/NVIDIA/Materials/vMaterials_2/Ground/textures/aggregate_exposed_diff.jpg",
        assets_root_path + "/NVIDIA/Materials/vMaterials_2/Ground/textures/gravel_track_ballast_diff.jpg",
        assets_root_path + "/NVIDIA/Materials/vMaterials_2/Ground/textures/gravel_track_ballast_multi_R_rough_G_ao.jpg",
        assets_root_path + "/NVIDIA/Materials/vMaterials_2/Ground/textures/rough_gravel_rough.jpg",
    ]

    if write_data:
        out_dir = os.path.join(os.getcwd(), "_out_rand_textures")
        print(f"Writing data to {out_dir}..")
        backend = rep.backends.get("DiskBackend")
        backend.initialize(output_dir=out_dir)
        writer = rep.WriterRegistry.get("BasicWriter")
        writer.initialize(backend=backend, rgb=True)
        cam = rep.functional.create.camera(position=(5, 5, 5), look_at=(0, 0, 0), name="Camera")
        rp = rep.create.render_product(cam, resolution=(512, 512))
        writer.attach(rp)

    # Apply the new materials and store the initial ones to reassign later
    initial_materials = {}
    for i, shape in enumerate(shapes):
        cur_mat, _ = UsdShade.MaterialBindingAPI(shape).ComputeBoundMaterial()
        initial_materials[shape] = cur_mat
        UsdShade.MaterialBindingAPI(shape).Bind(materials[i], UsdShade.Tokens.strongerThanDescendants)

    for _ in range(num_frames):
        for mat in materials:
            shader = UsdShade.Shader(omni.usd.get_shader_from_material(mat, get_prim=True))
            diffuse_texture = np.random.choice(textures)
            shader.GetInput("diffuse_texture").Set(diffuse_texture)
            project_uvw = np.random.choice([True, False], p=[0.9, 0.1])
            shader.GetInput("project_uvw").Set(bool(project_uvw))
            texture_scale = np.random.uniform(0.1, 1)
            shader.GetInput("texture_scale").Set((texture_scale, texture_scale))
            texture_rotate = np.random.uniform(0, 45)
            shader.GetInput("texture_rotate").Set(texture_rotate)

        if write_data:
            await rep.orchestrator.step_async(rt_subframes=4)
        else:
            await omni.kit.app.get_app().next_update_async()

        # Optional delay between frames to better visualize the randomization in the viewport
        if delay is not None and delay > 0:
            await asyncio.sleep(delay)

    # Wait for the data to be written to disk and cleanup writer and render product
    if write_data:
        await rep.orchestrator.wait_until_complete_async()
        writer.detach()
        rp.destroy()

    # Reassign the initial materials
    for shape, mat in initial_materials.items():
        if mat:
            UsdShade.MaterialBindingAPI(shape).Bind(mat, UsdShade.Tokens.strongerThanDescendants)
        else:
            UsdShade.MaterialBindingAPI(shape).UnbindAllBindings()


num_frames = 10
asyncio.ensure_future(run_randomizations_async(num_frames, materials, write_data=True, delay=0.2))

Sequential Randomizations#

The snippet provides an example of more complex randomizations, where the results of the first randomization are used to determine the next randomization. It uses a custom sampler function to set the location of the camera by iterating over (almost) equidistant points on a sphere. The snippet starts by setting up the environment, a forklift, a pallet, a bin, and a dome light. For every randomization frame, it cycles through the dome light textures, moves the pallet to a random location, and then moves the bin so that it is fully on top of the pallet. Finally, it moves the camera to a new location on the sphere, ensuring it faces the bin.

../_images/isaac_tutorial_replicator_randomization_chained_persp.gif ../_images/isaac_tutorial_replicator_randomization_chained_sphere.gif
Sequential Randomizations
import asyncio
import itertools
import os

import numpy as np
import omni.replicator.core as rep
import omni.usd
from isaacsim.storage.native import get_assets_root_path_async
from pxr import Gf, Usd, UsdGeom, UsdLux


# Fibonacci sphere algorithm: https://arxiv.org/pdf/0912.4540
def next_point_on_sphere(idx, num_points, radius=1, origin=(0, 0, 0)):
    offset = 2.0 / num_points
    inc = np.pi * (3.0 - np.sqrt(5.0))
    z = ((idx * offset) - 1) + (offset / 2)
    phi = ((idx + 1) % num_points) * inc
    r = np.sqrt(1 - pow(z, 2))
    y = np.cos(phi) * r
    x = np.sin(phi) * r
    return [(x * radius) + origin[0], (y * radius) + origin[1], (z * radius) + origin[2]]


async def run_randomizations_async(
    num_frames, forklift_path, pallet_path, bin_path, dome_textures, write_data, delay=None
):
    assets_root_path = await get_assets_root_path_async()

    await omni.usd.get_context().new_stage_async()
    stage = omni.usd.get_context().get_stage()

    dome_light = UsdLux.DomeLight.Define(stage, "/World/Lights/DomeLight")
    dome_light.GetIntensityAttr().Set(1000)

    forklift_prim = stage.DefinePrim("/World/Forklift", "Xform")
    forklift_prim.GetReferences().AddReference(assets_root_path + forklift_path)
    if not forklift_prim.GetAttribute("xformOp:translate"):
        UsdGeom.Xformable(forklift_prim).AddTranslateOp()
    forklift_prim.GetAttribute("xformOp:translate").Set((-4.5, -4.5, 0))

    pallet_prim = stage.DefinePrim("/World/Pallet", "Xform")
    pallet_prim.GetReferences().AddReference(assets_root_path + pallet_path)
    if not pallet_prim.GetAttribute("xformOp:translate"):
        UsdGeom.Xformable(pallet_prim).AddTranslateOp()
    if not pallet_prim.GetAttribute("xformOp:rotateXYZ"):
        UsdGeom.Xformable(pallet_prim).AddRotateXYZOp()

    bin_prim = stage.DefinePrim("/World/Bin", "Xform")
    bin_prim.GetReferences().AddReference(assets_root_path + bin_path)
    if not bin_prim.GetAttribute("xformOp:translate"):
        UsdGeom.Xformable(bin_prim).AddTranslateOp()
    if not bin_prim.GetAttribute("xformOp:rotateXYZ"):
        UsdGeom.Xformable(bin_prim).AddRotateXYZOp()

    view_cam = stage.DefinePrim("/World/Camera", "Camera")
    if not view_cam.GetAttribute("xformOp:translate"):
        UsdGeom.Xformable(view_cam).AddTranslateOp()
    if not view_cam.GetAttribute("xformOp:orient"):
        UsdGeom.Xformable(view_cam).AddOrientOp()

    dome_textures_full = [assets_root_path + tex for tex in dome_textures]
    textures_cycle = itertools.cycle(dome_textures_full)

    if write_data:
        out_dir = os.path.join(os.getcwd(), "_out_rand_sphere_scan")
        print(f"Writing data to {out_dir}..")
        backend = rep.backends.get("DiskBackend")
        backend.initialize(output_dir=out_dir)
        writer = rep.WriterRegistry.get("BasicWriter")
        writer.initialize(backend=backend, rgb=True)
        persp_cam = rep.functional.create.camera(position=(5, 5, 5), look_at=(0, 0, 0), name="PerspCamera")
        rp_persp = rep.create.render_product(persp_cam, (512, 512), name="PerspView")
        rp_view = rep.create.render_product(view_cam, (512, 512), name="SphereView")
        writer.attach([rp_view, rp_persp])

    bb_cache = UsdGeom.BBoxCache(time=Usd.TimeCode.Default(), includedPurposes=[UsdGeom.Tokens.default_])
    pallet_size = bb_cache.ComputeWorldBound(pallet_prim).GetRange().GetSize()
    pallet_length = pallet_size.GetLength()
    bin_size = bb_cache.ComputeWorldBound(bin_prim).GetRange().GetSize()

    for i in range(num_frames):
        # Set next background texture every nth frame and run an app update
        if i % 5 == 0:
            dome_light.GetTextureFileAttr().Set(next(textures_cycle))
            await omni.kit.app.get_app().next_update_async()

        # Randomize pallet pose
        pallet_prim.GetAttribute("xformOp:translate").Set(
            Gf.Vec3d(np.random.uniform(-1.5, 1.5), np.random.uniform(-1.5, 1.5), 0)
        )
        rand_z_rot = np.random.uniform(-90, 90)
        pallet_prim.GetAttribute("xformOp:rotateXYZ").Set(Gf.Vec3d(0, 0, rand_z_rot))
        pallet_tf_mat = omni.usd.get_world_transform_matrix(pallet_prim)
        pallet_rot = pallet_tf_mat.ExtractRotation()
        pallet_pos = pallet_tf_mat.ExtractTranslation()

        # Randomize bin position on top of the rotated pallet area making sure the bin is fully on the pallet
        rand_transl_x = np.random.uniform(-pallet_size[0] / 2 + bin_size[0] / 2, pallet_size[0] / 2 - bin_size[0] / 2)
        rand_transl_y = np.random.uniform(-pallet_size[1] / 2 + bin_size[1] / 2, pallet_size[1] / 2 - bin_size[1] / 2)

        # Adjust bin position to account for the random rotation of the pallet
        rand_z_rot_rad = np.deg2rad(rand_z_rot)
        rot_adjusted_transl_x = rand_transl_x * np.cos(rand_z_rot_rad) - rand_transl_y * np.sin(rand_z_rot_rad)
        rot_adjusted_transl_y = rand_transl_x * np.sin(rand_z_rot_rad) + rand_transl_y * np.cos(rand_z_rot_rad)
        bin_prim.GetAttribute("xformOp:translate").Set(
            Gf.Vec3d(
                pallet_pos[0] + rot_adjusted_transl_x,
                pallet_pos[1] + rot_adjusted_transl_y,
                pallet_pos[2] + pallet_size[2] + bin_size[2] / 2,
            )
        )
        # Keep bin rotation aligned with pallet
        bin_prim.GetAttribute("xformOp:rotateXYZ").Set(pallet_rot.GetAxis() * pallet_rot.GetAngle())

        # Get next camera position on a sphere looking at the bin with a randomized distance
        rand_radius = np.random.normal(3, 0.5) * pallet_length
        bin_pos = omni.usd.get_world_transform_matrix(bin_prim).ExtractTranslation()
        cam_pos = next_point_on_sphere(i, num_points=num_frames, radius=rand_radius, origin=bin_pos)
        view_cam.GetAttribute("xformOp:translate").Set(Gf.Vec3d(*cam_pos))

        eye = Gf.Vec3d(*cam_pos)
        target = Gf.Vec3d(*bin_pos)
        up_axis = Gf.Vec3d(0, 0, 1)
        look_at_quatd = Gf.Matrix4d().SetLookAt(eye, target, up_axis).GetInverse().ExtractRotation().GetQuat()
        view_cam.GetAttribute("xformOp:orient").Set(Gf.Quatf(look_at_quatd))

        if write_data:
            await rep.orchestrator.step_async(rt_subframes=4, delta_time=0.0)
        else:
            await omni.kit.app.get_app().next_update_async()
        # Optional delay between frames to better visualize the randomization in the viewport
        if delay is not None and delay > 0:
            await asyncio.sleep(delay)

    # Wait for the data to be written to disk and cleanup writer and render products
    if write_data:
        await rep.orchestrator.wait_until_complete_async()
        writer.detach()
        rp_persp.destroy()
        rp_view.destroy()


NUM_FRAMES = 90
FORKLIFT_PATH = "/Isaac/Props/Forklift/forklift.usd"
PALLET_PATH = "/Isaac/Props/Pallet/pallet.usd"
BIN_PATH = "/Isaac/Props/KLT_Bin/small_KLT_visual.usd"
DOME_TEXTURES = [
    "/NVIDIA/Assets/Skies/Cloudy/champagne_castle_1_4k.hdr",
    "/NVIDIA/Assets/Skies/Clear/evening_road_01_4k.hdr",
    "/NVIDIA/Assets/Skies/Clear/mealie_road_4k.hdr",
    "/NVIDIA/Assets/Skies/Clear/qwantani_4k.hdr",
]
asyncio.ensure_future(
    run_randomizations_async(
        NUM_FRAMES, FORKLIFT_PATH, PALLET_PATH, BIN_PATH, DOME_TEXTURES, write_data=True, delay=0.2
    )
)

Physics-based Randomized Volume Filling#

The snippet randomizes the stacking of objects on multiple surfaces. It randomly spawns a given number of pallets in the selected areas and then spawns physically simulated boxes on top of them. A temporary collision box area is created around the pallets to prevent the boxes from falling off. After all the boxes have been dropped, they are moved in various directions and finally pulled towards the center of the pallet for more stable stacking. Finally, the collision area is removed, after which the boxes can also fall to the ground. To allow easier sliding of the boxes into more stable positions, their friction is temporarily reduced during the simulation.

../_images/isaac_tutorial_replicator_randomization_volume_fill.gif ../_images/isaac_tutorial_replicator_randomization_volume_fill_warehouse.gif
Physics-based Randomized Volume Filling
import asyncio
import os
import random
from itertools import chain

import carb
import omni.kit.app
import omni.physx
import omni.replicator.core as rep
import omni.usd
from isaacsim.storage.native import get_assets_root_path_async
from pxr import Gf, PhysicsSchemaTools, PhysxSchema, Sdf, Usd, UsdGeom, UsdPhysics, UsdShade, UsdUtils


# Add transformation properties to the prim (if not already present)
def set_transform_attributes(prim, location=None, orientation=None, rotation=None, scale=None):
    if location is not None:
        if not prim.HasAttribute("xformOp:translate"):
            UsdGeom.Xformable(prim).AddTranslateOp()
        prim.GetAttribute("xformOp:translate").Set(location)
    if orientation is not None:
        if not prim.HasAttribute("xformOp:orient"):
            UsdGeom.Xformable(prim).AddOrientOp()
        prim.GetAttribute("xformOp:orient").Set(orientation)
    if rotation is not None:
        if not prim.HasAttribute("xformOp:rotateXYZ"):
            UsdGeom.Xformable(prim).AddRotateXYZOp()
        prim.GetAttribute("xformOp:rotateXYZ").Set(rotation)
    if scale is not None:
        if not prim.HasAttribute("xformOp:scale"):
            UsdGeom.Xformable(prim).AddScaleOp()
        prim.GetAttribute("xformOp:scale").Set(scale)


# Enables collisions with the asset (without rigid body dynamics the asset will be static)
def add_colliders(prim):
    # Iterate descendant prims (including root) and add colliders to mesh or primitive types
    for desc_prim in Usd.PrimRange(prim):
        if desc_prim.IsA(UsdGeom.Mesh) or desc_prim.IsA(UsdGeom.Gprim):
            # Physics
            if not desc_prim.HasAPI(UsdPhysics.CollisionAPI):
                collision_api = UsdPhysics.CollisionAPI.Apply(desc_prim)
            else:
                collision_api = UsdPhysics.CollisionAPI(desc_prim)
            collision_api.CreateCollisionEnabledAttr(True)

        # Add mesh specific collision properties only to mesh types
        if desc_prim.IsA(UsdGeom.Mesh):
            if not desc_prim.HasAPI(UsdPhysics.MeshCollisionAPI):
                mesh_collision_api = UsdPhysics.MeshCollisionAPI.Apply(desc_prim)
            else:
                mesh_collision_api = UsdPhysics.MeshCollisionAPI(desc_prim)
            mesh_collision_api.CreateApproximationAttr().Set("convexHull")


# Enables rigid body dynamics (physics simulation) on the prim (having valid colliders is recommended)
def add_rigid_body_dynamics(prim, disable_gravity=False, angular_damping=None):
    # Physics
    if not prim.HasAPI(UsdPhysics.RigidBodyAPI):
        rigid_body_api = UsdPhysics.RigidBodyAPI.Apply(prim)
    else:
        rigid_body_api = UsdPhysics.RigidBodyAPI(prim)
    rigid_body_api.CreateRigidBodyEnabledAttr(True)
    # PhysX
    if not prim.HasAPI(PhysxSchema.PhysxRigidBodyAPI):
        physx_rigid_body_api = PhysxSchema.PhysxRigidBodyAPI.Apply(prim)
    else:
        physx_rigid_body_api = PhysxSchema.PhysxRigidBodyAPI(prim)
    physx_rigid_body_api.GetDisableGravityAttr().Set(disable_gravity)
    if angular_damping is not None:
        physx_rigid_body_api.CreateAngularDampingAttr().Set(angular_damping)


# Create a new prim with the provided asset URL and transform properties
def create_asset(stage, asset_url, path, location=None, rotation=None, orientation=None, scale=None):
    prim_path = omni.usd.get_stage_next_free_path(stage, path, False)
    prim = stage.DefinePrim(prim_path, "Xform")
    prim.GetReferences().AddReference(asset_url)
    set_transform_attributes(prim, location=location, rotation=rotation, orientation=orientation, scale=scale)
    return prim


# Create a new prim with the provided asset URL and transform properties including colliders
def create_asset_with_colliders(stage, asset_url, path, location=None, rotation=None, orientation=None, scale=None):
    prim = create_asset(stage, asset_url, path, location, rotation, orientation, scale)
    add_colliders(prim)
    return prim


# Create collision walls around the top surface of the prim with the given height and thickness
def create_collision_walls(stage, prim, bbox_cache=None, height=2, thickness=0.3, material=None, visible=False):
    # Use the untransformed axis-aligned bounding box to calculate the prim surface size and center
    if bbox_cache is None:
        bbox_cache = UsdGeom.BBoxCache(Usd.TimeCode.Default(), includedPurposes=[UsdGeom.Tokens.default_])
    local_range = bbox_cache.ComputeWorldBound(prim).GetRange()
    width, depth, local_height = local_range.GetSize()
    # Raise the midpoint height to the prim's surface
    mid = local_range.GetMidpoint() + Gf.Vec3d(0, 0, local_height / 2)

    # Define the walls (name, location, size) with the specified thickness added externally to the surface and height
    walls = [
        ("floor", (mid[0], mid[1], mid[2] - thickness / 2), (width, depth, thickness)),
        ("ceiling", (mid[0], mid[1], mid[2] + height + thickness / 2), (width, depth, thickness)),
        (
            "left_wall",
            (mid[0] - (width + thickness) / 2, mid[1], mid[2] + height / 2),
            (thickness, depth, height),
        ),
        (
            "right_wall",
            (mid[0] + (width + thickness) / 2, mid[1], mid[2] + height / 2),
            (thickness, depth, height),
        ),
        (
            "front_wall",
            (mid[0], mid[1] + (depth + thickness) / 2, mid[2] + height / 2),
            (width, thickness, height),
        ),
        (
            "back_wall",
            (mid[0], mid[1] - (depth + thickness) / 2, mid[2] + height / 2),
            (width, thickness, height),
        ),
    ]

    # Use the parent prim path to create the walls as children (use local coordinates)
    prim_path = prim.GetPath()
    collision_walls = []
    for name, location, size in walls:
        prim = stage.DefinePrim(f"{prim_path}/{name}", "Cube")
        scale = (size[0] / 2.0, size[1] / 2.0, size[2] / 2.0)
        set_transform_attributes(prim, location=location, scale=scale)
        add_colliders(prim)
        if not visible:
            UsdGeom.Imageable(prim).MakeInvisible()
        if material is not None:
            mat_binding_api = UsdShade.MaterialBindingAPI.Apply(prim)
            mat_binding_api.Bind(material, UsdShade.Tokens.weakerThanDescendants, "physics")
        collision_walls.append(prim)
    return collision_walls


# Slide the assets independently in perpendicular directions and then pull them all together towards the given center
async def apply_forces_async(stage, boxes, pallet, strength=550, strength_center_multiplier=2):
    timeline = omni.timeline.get_timeline_interface()
    timeline.play()
    # Get the pallet center and forward vector to apply forces in the perpendicular directions and towards the center
    pallet_tf: Gf.Matrix4d = UsdGeom.Xformable(pallet).ComputeLocalToWorldTransform(Usd.TimeCode.Default())
    pallet_center = pallet_tf.ExtractTranslation()
    pallet_rot: Gf.Rotation = pallet_tf.ExtractRotation()
    force_forward = Gf.Vec3d(pallet_rot.TransformDir(Gf.Vec3d(1, 0, 0))) * strength
    force_right = Gf.Vec3d(pallet_rot.TransformDir(Gf.Vec3d(0, 1, 0))) * strength

    physx_simulation_interface = omni.physx.get_physx_simulation_interface()
    stage_id = UsdUtils.StageCache.Get().GetId(stage).ToLongInt()
    for box_prim in boxes:
        body_path = PhysicsSchemaTools.sdfPathToInt(box_prim.GetPath())
        forces = [force_forward, force_right, -force_forward, -force_right]
        for force in chain(forces, forces):
            box_tf: Gf.Matrix4d = UsdGeom.Xformable(box_prim).ComputeLocalToWorldTransform(Usd.TimeCode.Default())
            box_position = carb.Float3(*box_tf.ExtractTranslation())
            physx_simulation_interface.apply_force_at_pos(stage_id, body_path, carb.Float3(*force), box_position)
            for _ in range(10):
                await omni.kit.app.get_app().next_update_async()

    # Pull all boxes at once to the pallet center
    for box_prim in boxes:
        body_path = PhysicsSchemaTools.sdfPathToInt(box_prim.GetPath())
        box_tf: Gf.Matrix4d = UsdGeom.Xformable(box_prim).ComputeLocalToWorldTransform(Usd.TimeCode.Default())
        box_location = box_tf.ExtractTranslation()
        force_to_center = (pallet_center - box_location) * strength * strength_center_multiplier
        physx_simulation_interface.apply_force_at_pos(
            stage_id,
            body_path,
            carb.Float3(*force_to_center),
            carb.Float3(*box_location),
        )
    for _ in range(20):
        await omni.kit.app.get_app().next_update_async()
    timeline.pause()


# Create a new stage and and run the example scenario
async def stack_boxes_on_pallet_async(pallet_prim, boxes_urls_and_weights, num_boxes, drop_height=1.5, drop_margin=0.2):
    pallet_path = pallet_prim.GetPath()
    print(f"[BoxStacking] Running scenario for pallet {pallet_path} with {num_boxes} boxes..")
    stage = omni.usd.get_context().get_stage()
    bbox_cache = UsdGeom.BBoxCache(Usd.TimeCode.Default(), includedPurposes=[UsdGeom.Tokens.default_])

    # Create a custom physics material to allow the boxes to easily slide into stacking positions
    material_path = f"{pallet_path}/Looks/PhysicsMaterial"
    default_material = UsdShade.Material.Define(stage, material_path)
    physics_material = UsdPhysics.MaterialAPI.Apply(default_material.GetPrim())
    physics_material.CreateRestitutionAttr().Set(0.0)  # Inelastic collision (no bouncing)
    physics_material.CreateStaticFrictionAttr().Set(0.01)  # Small friction to allow sliding of stationary boxes
    physics_material.CreateDynamicFrictionAttr().Set(0.01)  # Small friction to allow sliding of moving boxes

    # Apply the physics material to the pallet
    mat_binding_api = UsdShade.MaterialBindingAPI.Apply(pallet_prim)
    mat_binding_api.Bind(default_material, UsdShade.Tokens.weakerThanDescendants, "physics")

    # Create collision walls around the top of the pallet and apply the physics material to them
    collision_walls = create_collision_walls(
        stage, pallet_prim, bbox_cache, height=drop_height + drop_margin, material=default_material
    )

    # Create the random boxes (without physics) with the specified weights and sort them by size (volume)
    box_urls, box_weights = zip(*boxes_urls_and_weights)
    rand_boxes_urls = random.choices(box_urls, weights=box_weights, k=num_boxes)
    boxes = [create_asset(stage, box_url, f"{pallet_path}_Boxes/Box_{i}") for i, box_url in enumerate(rand_boxes_urls)]
    boxes.sort(key=lambda box: bbox_cache.ComputeLocalBound(box).GetVolume(), reverse=True)

    # Calculate the drop area above the pallet taking into account the pallet surface, drop height and the margin
    # Note: The boxes can be spawned colliding with the surrounding collision walls as they will be pushed inwards
    pallet_range = bbox_cache.ComputeWorldBound(pallet_prim).GetRange()
    pallet_width, pallet_depth, pallet_heigth = pallet_range.GetSize()
    # Move the spawn center at the given height above the pallet surface
    spawn_center = pallet_range.GetMidpoint() + Gf.Vec3d(0, 0, pallet_heigth / 2 + drop_height)
    spawn_width, spawn_depth = pallet_width / 2 - drop_margin, pallet_depth / 2 - drop_margin

    # Use the pallet local-to-world transform to apply the local random offsets relative to the pallet
    pallet_tf: Gf.Matrix4d = UsdGeom.Xformable(pallet_prim).ComputeLocalToWorldTransform(Usd.TimeCode.Default())
    pallet_rot: Gf.Rotation = pallet_tf.ExtractRotation()

    # Simulate dropping the boxes from random poses on the pallet
    timeline = omni.timeline.get_timeline_interface()
    for box_prim in boxes:
        # Create a random location and orientation for the box within the drop area in local frame
        local_loc = spawn_center + Gf.Vec3d(
            random.uniform(-spawn_width, spawn_width), random.uniform(-spawn_depth, spawn_depth), 0
        )
        axes = [Gf.Vec3d(1, 0, 0), Gf.Vec3d(0, 1, 0), Gf.Vec3d(0, 0, 1)]
        angles = [random.choice([180, 90, 0, -90, -180]) + random.uniform(-3, 3) for _ in axes]
        local_rot = Gf.Rotation()
        for axis, angle in zip(axes, angles):
            local_rot *= Gf.Rotation(axis, angle)

        # Transform the local pose to the pallet's world coordinate system
        world_loc = pallet_tf.Transform(local_loc)
        world_quat = Gf.Quatf((pallet_rot * local_rot).GetQuat())

        # Set the spawn pose and enable collisions and rigid body dynamics with dampened angular movements
        set_transform_attributes(box_prim, location=world_loc, orientation=world_quat)
        add_colliders(box_prim)
        add_rigid_body_dynamics(box_prim, angular_damping=0.9)

        # Bind the physics material to the box (allow frictionless sliding)
        mat_binding_api = UsdShade.MaterialBindingAPI.Apply(box_prim)
        mat_binding_api.Bind(default_material, UsdShade.Tokens.weakerThanDescendants, "physics")
        # Wait for an app update to load the new attributes
        await omni.kit.app.get_app().next_update_async()

        # Play simulation for a few frames for each box
        timeline.play()
        for _ in range(20):
            await omni.kit.app.get_app().next_update_async()
        timeline.pause()

    # Iteratively apply forces to the boxes to move them around then pull them all together towards the pallet center
    await apply_forces_async(stage, boxes, pallet_prim)

    # Remove rigid body dynamics of the boxes until all other scenarios are completed
    for box in boxes:
        UsdPhysics.RigidBodyAPI(box).GetRigidBodyEnabledAttr().Set(False)

    # Increase the friction to prevent sliding of the boxes on the pallet before removing the collision walls
    physics_material.CreateStaticFrictionAttr().Set(0.9)
    physics_material.CreateDynamicFrictionAttr().Set(0.9)

    # Remove collision walls
    for wall in collision_walls:
        stage.RemovePrim(wall.GetPath())
    return boxes


# Run the example scenario
async def run_box_stacking_scenarios_async(num_pallets, env_url=None, write_data=False):
    # Get assets root path once for all asset loading operations
    assets_root_path = await get_assets_root_path_async()

    # List of pallets and boxes to randomly choose from with their respective weights
    pallets_urls_and_weights = [
        (assets_root_path + "/Isaac/Environments/Simple_Warehouse/Props/SM_PaletteA_01.usd", 0.25),
        (assets_root_path + "/Isaac/Environments/Simple_Warehouse/Props/SM_PaletteA_02.usd", 0.75),
    ]
    boxes_urls_and_weights = [
        (assets_root_path + "/Isaac/Environments/Simple_Warehouse/Props/SM_CardBoxA_01.usd", 0.02),
        (assets_root_path + "/Isaac/Environments/Simple_Warehouse/Props/SM_CardBoxB_01.usd", 0.06),
        (assets_root_path + "/Isaac/Environments/Simple_Warehouse/Props/SM_CardBoxC_01.usd", 0.12),
        (assets_root_path + "/Isaac/Environments/Simple_Warehouse/Props/SM_CardBoxD_01.usd", 0.80),
    ]

    # Load a predefined or create a new stage
    if env_url is not None:
        env_path = env_url if env_url.startswith("omniverse://") else assets_root_path + env_url
        omni.usd.get_context().open_stage(env_path)
        stage = omni.usd.get_context().get_stage()
    else:
        omni.usd.get_context().new_stage()
        stage = omni.usd.get_context().get_stage()
        distant_light = stage.DefinePrim("/World/Lights/DistantLight", "DistantLight")
        distant_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(400.0)
        if not distant_light.HasAttribute("xformOp:rotateXYZ"):
            UsdGeom.Xformable(distant_light).AddRotateXYZOp()
        distant_light.GetAttribute("xformOp:rotateXYZ").Set((0, 60, 0))
        dome_light = stage.DefinePrim("/World/Lights/DomeLight", "DomeLight")
        dome_light.CreateAttribute("inputs:intensity", Sdf.ValueTypeNames.Float).Set(500.0)

    # Spawn the pallets
    pallets = []
    pallets_urls, pallets_weights = zip(*pallets_urls_and_weights)
    rand_pallet_urls = random.choices(pallets_urls, weights=pallets_weights, k=num_pallets)
    # Custom pallet poses for the evnironment
    custom_pallet_locations = [
        (-9.3, 5.3, 1.3),
        (-9.3, 7.3, 1.3),
        (-9.3, -0.6, 1.3),
    ]
    random.shuffle(custom_pallet_locations)
    for i, pallet_url in enumerate(rand_pallet_urls):
        # Use a custom location for every other pallet
        if env_url is not None:
            if i % 2 == 0 and custom_pallet_locations:
                rand_loc = Gf.Vec3d(*custom_pallet_locations.pop())
            else:
                rand_loc = Gf.Vec3d(-6.5, i * 1.75, 0) + Gf.Vec3d(random.uniform(-0.2, 0.2), random.uniform(0, 0.2), 0)
        else:
            rand_loc = Gf.Vec3d(i * 1.5, 0, 0) + Gf.Vec3d(random.uniform(0, 0.2), random.uniform(-0.2, 0.2), 0)
        rand_rot = (0, 0, random.choice([180, 90, 0, -90, -180]) + random.uniform(-15, 15))
        pallet_prim = create_asset_with_colliders(
            stage, pallet_url, f"/World/Pallet_{i}", location=rand_loc, rotation=rand_rot
        )
        pallets.append(pallet_prim)

    # Stack the boxes on the pallets
    total_boxes = []
    for pallet in pallets:
        if env_url is not None:
            rand_num_boxes = random.randint(8, 15)
            stacked_boxes = await stack_boxes_on_pallet_async(
                pallet, boxes_urls_and_weights, num_boxes=rand_num_boxes, drop_height=1.0
            )
        else:
            rand_num_boxes = random.randint(12, 20)
            stacked_boxes = await stack_boxes_on_pallet_async(pallet, boxes_urls_and_weights, num_boxes=rand_num_boxes)
        total_boxes.extend(stacked_boxes)

    # Re-enable rigid body dynamics of the boxes and run the simulation for a while
    for box in total_boxes:
        UsdPhysics.RigidBodyAPI(box).GetRigidBodyEnabledAttr().Set(True)
    timeline = omni.timeline.get_timeline_interface()
    timeline.play()
    for _ in range(200):
        await omni.kit.app.get_app().next_update_async()
    timeline.pause()

    if write_data:
        out_dir = os.path.join(os.getcwd(), "_out_box_stacking")
        print(f"Writing data to {out_dir}..")
        backend = rep.backends.get("DiskBackend")
        backend.initialize(output_dir=out_dir)
        writer = rep.WriterRegistry.get("BasicWriter")
        writer.initialize(backend=backend, rgb=True)
        cam = rep.functional.create.camera(position=(5, -5, 2), look_at=(0, 0, 0), name="PalletCamera")
        rp = rep.create.render_product(cam, resolution=(512, 512))
        writer.attach(rp)

        # Capture the data and wait for the data to be written to disk
        await rep.orchestrator.step_async(rt_subframes=8)

        # Wait for the data to be written to disk and cleanup
        await rep.orchestrator.wait_until_complete_async()
        writer.detach()
        rp.destroy()


# asyncio.ensure_future(run_box_stacking_scenarios_async(num_pallets=1, write_data=True))
asyncio.ensure_future(
    run_box_stacking_scenarios_async(
        num_pallets=6, env_url="/Isaac/Environments/Simple_Warehouse/warehouse.usd", write_data=True
    )
)

Simready Assets SDG Example#

Script editor example for using SimReady Assets to randomize the scene. SimReady Assets are physically accurate 3D objects with realistic properties, behavior, and data connections that are optimized for simulation.

Note

The example can only run in async mode and requires the SimReady Explorer window to be enabled to process the search requests.

The example script will create an SDG randomization and capture pipeline scenario with a table, a plate, and a number of items on top of the plate. The scene will be simulated for a while and then the captured images will be saved to disk.

The standalone example can also be run directly (on Windows use python.bat instead of python.sh):

./python.sh standalone_examples/api/isaacsim.replicator.examples/simready_assets_sdg.py
../_images/isim_5.0_replicator_tut_viewport_randomization_simready_assets.jpg
Simready Assets SDG Example
import asyncio
import os
import time

import carb.settings
import numpy as np
import omni.kit.app
import omni.replicator.core as rep
import omni.timeline
import omni.usd
from isaacsim.core.utils.semantics import upgrade_prim_semantics_to_labels
from pxr import Sdf, Usd, UsdGeom, UsdPhysics

# Make sure the simready explorer extension is enabled
ext_manager = omni.kit.app.get_app().get_extension_manager()
if not ext_manager.is_extension_enabled("omni.simready.explorer"):
    ext_manager.set_extension_enabled_immediate("omni.simready.explorer", True)
import omni.simready.explorer as sre


def enable_simready_explorer() -> None:
    """Enable the SimReady Explorer window if not already open."""
    if sre.get_instance().browser_model is None:
        import omni.kit.actions.core as actions

        actions.execute_action("omni.simready.explorer", "toggle_window")


def set_prim_variants(prim: Usd.Prim, variants: dict[str, str]) -> None:
    """Set variant selections on a prim from a dictionary of variant set names to values."""
    vsets = prim.GetVariantSets()
    for name, value in variants.items():
        vset = vsets.GetVariantSet(name)
        if vset:
            vset.SetVariantSelection(value)


async def search_assets_async() -> tuple[list, list, list]:
    """Search for SimReady assets (tables, dishes, items) asynchronously."""
    print(f"[SDG] Searching for SimReady assets...")
    start_time = time.time()
    tables = await sre.find_assets(["table", "furniture"])
    print(f"[SDG]   - Found {len(tables)} tables ({time.time() - start_time:.2f}s)")
    start_time = time.time()
    plates = await sre.find_assets(["plate"])
    print(f"[SDG]   - Found {len(plates)} plates ({time.time() - start_time:.2f}s)")
    start_time = time.time()
    bowls = await sre.find_assets(["bowl"])
    print(f"[SDG]   - Found {len(bowls)} bowls ({time.time() - start_time:.2f}s)")
    dishes = plates + bowls
    start_time = time.time()
    fruits = await sre.find_assets(["fruit"])
    print(f"[SDG]   - Found {len(fruits)} fruits ({time.time() - start_time:.2f}s)")
    start_time = time.time()
    vegetables = await sre.find_assets(["vegetable"])
    print(f"[SDG]   - Found {len(vegetables)} vegetables ({time.time() - start_time:.2f}s)")
    items = fruits + vegetables
    return tables, dishes, items


async def run_simready_randomization_async(
    stage: Usd.Stage,
    camera_prim: Usd.Prim,
    render_product,
    tables: list,
    dishes: list,
    items: list,
    rng: np.random.Generator = None,
) -> None:
    """Randomize a scene with SimReady assets, run physics, and capture the result."""
    if rng is None:
        rng = np.random.default_rng()

    print(f"[SDG]   Creating anonymous variation layer for the randomizations...")
    root_layer = stage.GetRootLayer()
    variation_layer = Sdf.Layer.CreateAnonymous("variation")
    root_layer.subLayerPaths.insert(0, variation_layer.identifier)
    stage.SetEditTarget(variation_layer)

    # Load the simready assets with rigid body properties
    variants = {"PhysicsVariant": "RigidBody"}
    rep.functional.create.scope(name="Assets")

    # Choose a random table and add it to the stage
    print(f"[SDG]   Loading assets...")
    table_asset = tables[rng.integers(len(tables))]
    start_time = time.time()
    table_prim = rep.functional.create.reference(usd_path=table_asset.main_url, parent="/Assets", name=table_asset.name)
    set_prim_variants(table_prim, variants)
    upgrade_prim_semantics_to_labels(table_prim)
    print(f"[SDG]     - Table: '{table_asset.name}' ({time.time() - start_time:.2f}s)")
    await omni.kit.app.get_app().next_update_async()

    # Keep only colliders on the table (disable rigid body dynamics)
    UsdPhysics.RigidBodyAPI(table_prim).GetRigidBodyEnabledAttr().Set(False)

    # Compute table dimensions from its bounding box
    bbox_cache = UsdGeom.BBoxCache(Usd.TimeCode.Default(), includedPurposes=[UsdGeom.Tokens.default_])
    table_bbox = bbox_cache.ComputeWorldBound(table_prim)
    table_extent = table_bbox.GetRange().GetSize()

    # Choose a random dish and add it to the stage
    dish_asset = dishes[rng.integers(len(dishes))]
    start_time = time.time()
    dish_prim = rep.functional.create.reference(usd_path=dish_asset.main_url, parent="/Assets", name=dish_asset.name)
    set_prim_variants(dish_prim, variants)
    upgrade_prim_semantics_to_labels(dish_prim)
    print(f"[SDG]     - Dish: '{dish_asset.name}' ({time.time() - start_time:.2f}s)")
    await omni.kit.app.get_app().next_update_async()

    # Compute dish dimensions from its bounding box
    dish_bbox = bbox_cache.ComputeWorldBound(dish_prim)
    dish_extent = dish_bbox.GetRange().GetSize()

    # Calculate random position for the dish near the center of the table
    center_region_scale = 0.75
    dish_range_x = max(0, (table_extent[0] - dish_extent[0]) / 2 * center_region_scale)
    dish_range_y = max(0, (table_extent[1] - dish_extent[1]) / 2 * center_region_scale)
    dish_position = (
        rng.uniform(-dish_range_x, dish_range_x) if dish_range_x > 0 else 0,
        rng.uniform(-dish_range_y, dish_range_y) if dish_range_y > 0 else 0,
        table_extent[2] + dish_extent[2] / 2,
    )
    dish_prim.GetAttribute("xformOp:translate").Set(dish_position)

    # Add random items above the dish
    num_items = rng.integers(2, 5)
    item_prims = []
    for _ in range(num_items):
        item_asset = items[rng.integers(len(items))]
        start_time = time.time()
        item_prim = rep.functional.create.reference(
            usd_path=item_asset.main_url, parent="/Assets", name=item_asset.name
        )
        set_prim_variants(item_prim, variants)
        upgrade_prim_semantics_to_labels(item_prim)
        print(f"[SDG]     - Item: '{item_asset.name}' ({time.time() - start_time:.2f}s)")
        item_prims.append(item_prim)
        await omni.kit.app.get_app().next_update_async()

    # Position items stacked above the dish
    print(f"[SDG]   Positioning assets on table...")
    stack_height = dish_position[2]
    item_scatter_radius = max(0, dish_extent[0] / 4)
    for item_prim in item_prims:
        item_bbox = bbox_cache.ComputeWorldBound(item_prim)
        item_extent = item_bbox.GetRange().GetSize()
        scatter_x = rng.uniform(-item_scatter_radius, item_scatter_radius) if item_scatter_radius > 0 else 0
        scatter_y = rng.uniform(-item_scatter_radius, item_scatter_radius) if item_scatter_radius > 0 else 0
        item_position = (
            dish_position[0] + scatter_x,
            dish_position[1] + scatter_y,
            stack_height + item_extent[2] / 2,
        )
        item_prim.GetAttribute("xformOp:translate").Set(item_position)
        stack_height += item_extent[2]

    # Run physics simulation for items to settle
    num_sim_steps = 25
    print(f"[SDG]   Running physics simulation ({num_sim_steps} steps)...")
    timeline = omni.timeline.get_timeline_interface()
    timeline.play()
    for _ in range(num_sim_steps):
        await omni.kit.app.get_app().next_update_async()
    timeline.pause()

    print(f"[SDG]   Setting edit target to root layer...")
    stage.SetEditTarget(root_layer)

    print(f"[SDG]   Positioning camera and capturing frame...")
    camera_position = (
        dish_position[0] + rng.uniform(-0.5, 0.5),
        dish_position[1] + rng.uniform(-0.5, 0.5),
        dish_position[2] + 1.5 + rng.uniform(-0.5, 0.5),
    )
    rep.functional.modify.pose(
        camera_prim, position_value=camera_position, look_at_value=dish_prim, look_at_up_axis=(0, 0, 1)
    )
    render_product.hydra_texture.set_updates_enabled(True)
    await rep.orchestrator.step_async(delta_time=0.0, rt_subframes=16)
    render_product.hydra_texture.set_updates_enabled(False)

    print(f"[SDG]   Removing temp variation layer...")
    variation_layer.Clear()
    root_layer.subLayerPaths.remove(variation_layer.identifier)


async def run_simready_randomizations_async(num_scenarios: int) -> None:
    """Run multiple SimReady randomization scenarios and capture the results."""
    print(f"[SDG] Initializing scene...")
    await omni.usd.get_context().new_stage_async()
    stage = omni.usd.get_context().get_stage()

    # Initialize randomization
    rng = np.random.default_rng(34)
    rep.set_global_seed(34)

    # Data capture will happen manually using step()
    rep.orchestrator.set_capture_on_play(False)

    # Set DLSS to Quality mode (2) for best SDG results , options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
    carb.settings.get_settings().set("rtx/post/dlss/execMode", 2)

    # Add lights to the scene
    print(f"[SDG] Setting up lighting...")
    rep.functional.create.xform(name="World")
    rep.functional.create.dome_light(intensity=500, parent="/World", name="DomeLight")
    rep.functional.create.distant_light(intensity=2500, parent="/World", name="DistantLight", rotation=(-75, 0, 0))

    # Simready explorer window needs to be created for the search to work
    enable_simready_explorer()

    # Search for the simready assets
    tables, dishes, items = await search_assets_async()

    # Create the writer and the render product for capturing the scene
    output_dir = os.path.join(os.getcwd(), "_out_simready_assets")
    backend = rep.backends.get("DiskBackend")
    backend.initialize(output_dir=output_dir)
    writer = rep.writers.get("BasicWriter")
    print(f"[SDG] Initializing writer, output directory: {output_dir}...")
    writer.initialize(backend=backend, rgb=True)

    # Create camera and render product (disabled by default, enabled only when capturing)
    print(f"[SDG] Creating camera and render product...")
    camera_prim = rep.functional.create.camera(position=(5, 5, 5), look_at=(0, 0, 0), parent="/World", name="Camera")
    rp = rep.create.render_product(camera_prim, (512, 512))
    rp.hydra_texture.set_updates_enabled(False)
    writer.attach(rp)

    # Generate randomized scenarios
    for i in range(num_scenarios):
        print(f"[SDG] Scenario {i + 1}/{num_scenarios}")
        await run_simready_randomization_async(
            stage=stage, camera_prim=camera_prim, render_product=rp, tables=tables, dishes=dishes, items=items, rng=rng
        )

    # Finalize and cleanup
    print("[SDG] Wait for the data to be written and cleanup render products...")
    await rep.orchestrator.wait_until_complete_async()
    writer.detach()
    rp.destroy()


num_scenarios = 5
print(f"[SDG] Starting SDG pipeline with {num_scenarios} scenarios...")
asyncio.ensure_future(run_simready_randomizations_async(num_scenarios))
Simready Assets SDG Example
from isaacsim import SimulationApp

simulation_app = SimulationApp(launch_config={"headless": False})

import argparse
import asyncio
import os
import time

import carb.settings
import numpy as np
import omni.kit.app
import omni.replicator.core as rep
import omni.timeline
import omni.usd
from isaacsim.core.utils.semantics import upgrade_prim_semantics_to_labels
from pxr import Gf, Sdf, Usd, UsdGeom, UsdPhysics

parser = argparse.ArgumentParser()
parser.add_argument("--num_scenarios", type=int, default=5, help="Number of randomization scenarios to create")
args, _ = parser.parse_known_args()
num_scenarios = args.num_scenarios

# Make sure the simready explorer extension is enabled
ext_manager = omni.kit.app.get_app().get_extension_manager()
if not ext_manager.is_extension_enabled("omni.simready.explorer"):
    ext_manager.set_extension_enabled_immediate("omni.simready.explorer", True)
import omni.simready.explorer as sre


def enable_simready_explorer() -> None:
    """Enable the SimReady Explorer window if not already open."""
    if sre.get_instance().browser_model is None:
        import omni.kit.actions.core as actions

        actions.execute_action("omni.simready.explorer", "toggle_window")


def set_prim_variants(prim: Usd.Prim, variants: dict[str, str]) -> None:
    """Set variant selections on a prim from a dictionary of variant set names to values."""
    vsets = prim.GetVariantSets()
    for name, value in variants.items():
        vset = vsets.GetVariantSet(name)
        if vset:
            vset.SetVariantSelection(value)


async def search_assets_async() -> tuple[list, list, list]:
    """Search for SimReady assets (tables, dishes, items) asynchronously."""
    print(f"[SDG] Searching for SimReady assets...")
    start_time = time.time()
    tables = await sre.find_assets(["table", "furniture"])
    print(f"[SDG]   - Found {len(tables)} tables ({time.time() - start_time:.2f}s)")
    start_time = time.time()
    plates = await sre.find_assets(["plate"])
    print(f"[SDG]   - Found {len(plates)} plates ({time.time() - start_time:.2f}s)")
    start_time = time.time()
    bowls = await sre.find_assets(["bowl"])
    print(f"[SDG]   - Found {len(bowls)} bowls ({time.time() - start_time:.2f}s)")
    dishes = plates + bowls
    start_time = time.time()
    fruits = await sre.find_assets(["fruit"])
    print(f"[SDG]   - Found {len(fruits)} fruits ({time.time() - start_time:.2f}s)")
    start_time = time.time()
    vegetables = await sre.find_assets(["vegetable"])
    print(f"[SDG]   - Found {len(vegetables)} vegetables ({time.time() - start_time:.2f}s)")
    items = fruits + vegetables
    return tables, dishes, items


def run_simready_randomization(
    stage: Usd.Stage,
    camera_prim: Usd.Prim,
    render_product,
    tables: list,
    dishes: list,
    items: list,
    rng: np.random.Generator = None,
) -> None:
    """Randomize a scene with SimReady assets, run physics, and capture the result."""
    if rng is None:
        rng = np.random.default_rng()

    print(f"[SDG]   Creating anonymous variation layer for the randomizations...")
    root_layer = stage.GetRootLayer()
    variation_layer = Sdf.Layer.CreateAnonymous("variation")
    root_layer.subLayerPaths.insert(0, variation_layer.identifier)
    stage.SetEditTarget(variation_layer)

    # Load the simready assets with rigid body properties
    variants = {"PhysicsVariant": "RigidBody"}
    rep.functional.create.scope(name="Assets")

    # Choose a random table and add it to the stage
    print(f"[SDG]   Loading assets...")
    table_asset = tables[rng.integers(len(tables))]
    start_time = time.time()
    table_prim = rep.functional.create.reference(usd_path=table_asset.main_url, parent="/Assets", name=table_asset.name)
    set_prim_variants(table_prim, variants)
    upgrade_prim_semantics_to_labels(table_prim)
    print(f"[SDG]     - Table: '{table_asset.name}' ({time.time() - start_time:.2f}s)")
    simulation_app.update()

    # Keep only colliders on the table (disable rigid body dynamics)
    UsdPhysics.RigidBodyAPI(table_prim).GetRigidBodyEnabledAttr().Set(False)

    # Compute table dimensions from its bounding box
    bbox_cache = UsdGeom.BBoxCache(Usd.TimeCode.Default(), includedPurposes=[UsdGeom.Tokens.default_])
    table_bbox = bbox_cache.ComputeWorldBound(table_prim)
    table_extent = table_bbox.GetRange().GetSize()

    # Choose a random dish and add it to the stage
    dish_asset = dishes[rng.integers(len(dishes))]
    start_time = time.time()
    dish_prim = rep.functional.create.reference(usd_path=dish_asset.main_url, parent="/Assets", name=dish_asset.name)
    set_prim_variants(dish_prim, variants)
    upgrade_prim_semantics_to_labels(dish_prim)
    print(f"[SDG]     - Dish: '{dish_asset.name}' ({time.time() - start_time:.2f}s)")
    simulation_app.update()

    # Compute dish dimensions from its bounding box
    dish_bbox = bbox_cache.ComputeWorldBound(dish_prim)
    dish_extent = dish_bbox.GetRange().GetSize()

    # Calculate random position for the dish near the center of the table
    center_region_scale = 0.75
    dish_range_x = max(0, (table_extent[0] - dish_extent[0]) / 2 * center_region_scale)
    dish_range_y = max(0, (table_extent[1] - dish_extent[1]) / 2 * center_region_scale)
    dish_position = (
        rng.uniform(-dish_range_x, dish_range_x) if dish_range_x > 0 else 0,
        rng.uniform(-dish_range_y, dish_range_y) if dish_range_y > 0 else 0,
        table_extent[2] + dish_extent[2] / 2,
    )
    dish_prim.GetAttribute("xformOp:translate").Set(dish_position)

    # Add random items above the dish
    num_items = rng.integers(2, 5)
    item_prims = []
    for _ in range(num_items):
        item_asset = items[rng.integers(len(items))]
        start_time = time.time()
        item_prim = rep.functional.create.reference(
            usd_path=item_asset.main_url, parent="/Assets", name=item_asset.name
        )
        set_prim_variants(item_prim, variants)
        upgrade_prim_semantics_to_labels(item_prim)
        print(f"[SDG]     - Item: '{item_asset.name}' ({time.time() - start_time:.2f}s)")
        item_prims.append(item_prim)
        simulation_app.update()

    # Position items stacked above the dish
    print(f"[SDG]   Positioning assets on table...")
    stack_height = dish_position[2]
    item_scatter_radius = max(0, dish_extent[0] / 4)
    for item_prim in item_prims:
        item_bbox = bbox_cache.ComputeWorldBound(item_prim)
        item_extent = item_bbox.GetRange().GetSize()
        scatter_x = rng.uniform(-item_scatter_radius, item_scatter_radius) if item_scatter_radius > 0 else 0
        scatter_y = rng.uniform(-item_scatter_radius, item_scatter_radius) if item_scatter_radius > 0 else 0
        item_position = (
            dish_position[0] + scatter_x,
            dish_position[1] + scatter_y,
            stack_height + item_extent[2] / 2,
        )
        item_prim.GetAttribute("xformOp:translate").Set(item_position)
        stack_height += item_extent[2]

    # Run physics simulation for items to settle
    num_sim_steps = 25
    print(f"[SDG]   Running physics simulation ({num_sim_steps} steps)...")
    timeline = omni.timeline.get_timeline_interface()
    timeline.play()
    for _ in range(num_sim_steps):
        simulation_app.update()
    timeline.pause()

    print(f"[SDG]   Setting edit target to root layer...")
    stage.SetEditTarget(root_layer)

    print(f"[SDG]   Positioning camera and capturing frame...")
    camera_position = (
        dish_position[0] + rng.uniform(-0.5, 0.5),
        dish_position[1] + rng.uniform(-0.5, 0.5),
        dish_position[2] + 1.5 + rng.uniform(-0.5, 0.5),
    )
    rep.functional.modify.pose(
        camera_prim, position_value=camera_position, look_at_value=dish_prim, look_at_up_axis=(0, 0, 1)
    )
    render_product.hydra_texture.set_updates_enabled(True)
    rep.orchestrator.step(delta_time=0.0, rt_subframes=16)
    render_product.hydra_texture.set_updates_enabled(False)

    print(f"[SDG]   Removing temp variation layer...")
    variation_layer.Clear()
    root_layer.subLayerPaths.remove(variation_layer.identifier)


def run_simready_randomizations(num_scenarios: int) -> None:
    """Run multiple SimReady randomization scenarios and capture the results."""
    print(f"[SDG] Initializing scene...")
    omni.usd.get_context().new_stage()
    stage = omni.usd.get_context().get_stage()

    # Initialize randomization
    rng = np.random.default_rng(34)
    rep.set_global_seed(34)

    # Data capture will happen manually using step()
    rep.orchestrator.set_capture_on_play(False)

    # Set DLSS to Quality mode (2) for best SDG results , options: 0 (Performance), 1 (Balanced), 2 (Quality), 3 (Auto)
    carb.settings.get_settings().set("rtx/post/dlss/execMode", 2)

    # Add lights to the scene
    print(f"[SDG] Setting up lighting...")
    rep.functional.create.xform(name="World")
    rep.functional.create.dome_light(intensity=500, parent="/World", name="DomeLight")
    rep.functional.create.distant_light(intensity=2500, parent="/World", name="DistantLight", rotation=(-75, 0, 0))

    # Simready explorer window needs to be created for the search to work
    enable_simready_explorer()

    # Search for the simready assets and wait until the task is complete
    search_task = asyncio.ensure_future(search_assets_async())
    while not search_task.done():
        simulation_app.update()
    tables, dishes, items = search_task.result()

    # Create the writer and the render product for capturing the scene
    output_dir = os.path.join(os.getcwd(), "_out_simready_assets")
    backend = rep.backends.get("DiskBackend")
    backend.initialize(output_dir=output_dir)
    writer = rep.writers.get("BasicWriter")
    print(f"[SDG] Initializing writer, output directory: {output_dir}...")
    writer.initialize(backend=backend, rgb=True)

    # Create camera and render product (disabled by default, enabled only when capturing)
    print(f"[SDG] Creating camera and render product...")
    camera_prim = rep.functional.create.camera(position=(5, 5, 5), look_at=(0, 0, 0), parent="/World", name="Camera")
    rp = rep.create.render_product(camera_prim, (512, 512))
    rp.hydra_texture.set_updates_enabled(False)
    writer.attach(rp)

    # Generate randomized scenarios
    for i in range(num_scenarios):
        print(f"[SDG] Scenario {i + 1}/{num_scenarios}")
        run_simready_randomization(
            stage=stage, camera_prim=camera_prim, render_product=rp, tables=tables, dishes=dishes, items=items, rng=rng
        )

    # Finalize and cleanup
    print("[SDG] Wait for the data to be written and cleanup render products...")
    rep.orchestrator.wait_until_complete()
    writer.detach()
    rp.destroy()


print(f"[SDG] Starting SDG pipeline with {num_scenarios} scenarios...")
run_simready_randomizations(num_scenarios)

simulation_app.close()