# Copyright (c) 2022-2025, The Isaac Lab Project Developers.# All rights reserved.## SPDX-License-Identifier: BSD-3-Clause"""Base class for sensors.This class defines an interface for sensors similar to how the :class:`omni.isaac.lab.assets.AssetBase` class works.Each sensor class should inherit from this class and implement the abstract methods."""from__future__importannotationsimportinspectimporttorchimportweakreffromabcimportABC,abstractmethodfromcollections.abcimportSequencefromtypingimportTYPE_CHECKING,Anyimportomni.kit.appimportomni.timelineimportomni.isaac.lab.simassim_utilsifTYPE_CHECKING:from.sensor_base_cfgimportSensorBaseCfg
[文档]classSensorBase(ABC):"""The base class for implementing a sensor. The implementation is based on lazy evaluation. The sensor data is only updated when the user tries accessing the data through the :attr:`data` property or sets ``force_compute=True`` in the :meth:`update` method. This is done to avoid unnecessary computation when the sensor data is not used. The sensor is updated at the specified update period. If the update period is zero, then the sensor is updated at every simulation step. """
[文档]def__init__(self,cfg:SensorBaseCfg):"""Initialize the sensor class. Args: cfg: The configuration parameters for the sensor. """# check that config is validifcfg.history_length<0:raiseValueError(f"History length must be greater than 0! Received: {cfg.history_length}")# check that the config is validcfg.validate()# store inputsself.cfg=cfg# flag for whether the sensor is initializedself._is_initialized=False# flag for whether the sensor is in visualization modeself._is_visualizing=False# note: Use weakref on callbacks to ensure that this object can be deleted when its destructor is called.# add callbacks for stage play/stop# The order is set to 10 which is arbitrary but should be lower priority than the default order of 0timeline_event_stream=omni.timeline.get_timeline_interface().get_timeline_event_stream()self._initialize_handle=timeline_event_stream.create_subscription_to_pop_by_type(int(omni.timeline.TimelineEventType.PLAY),lambdaevent,obj=weakref.proxy(self):obj._initialize_callback(event),order=10,)self._invalidate_initialize_handle=timeline_event_stream.create_subscription_to_pop_by_type(int(omni.timeline.TimelineEventType.STOP),lambdaevent,obj=weakref.proxy(self):obj._invalidate_initialize_callback(event),order=10,)# add handle for debug visualization (this is set to a valid handle inside set_debug_vis)self._debug_vis_handle=None# set initial state of debug visualizationself.set_debug_vis(self.cfg.debug_vis)
def__del__(self):"""Unsubscribe from the callbacks."""# clear physics events handlesifself._initialize_handle:self._initialize_handle.unsubscribe()self._initialize_handle=Noneifself._invalidate_initialize_handle:self._invalidate_initialize_handle.unsubscribe()self._invalidate_initialize_handle=None# clear debug visualizationifself._debug_vis_handle:self._debug_vis_handle.unsubscribe()self._debug_vis_handle=None""" Properties """@propertydefis_initialized(self)->bool:"""Whether the sensor is initialized. Returns True if the sensor is initialized, False otherwise. """returnself._is_initialized@propertydefnum_instances(self)->int:"""Number of instances of the sensor. This is equal to the number of sensors per environment multiplied by the number of environments. """returnself._num_envs@propertydefdevice(self)->str:"""Memory device for computation."""returnself._device@property@abstractmethoddefdata(self)->Any:"""Data from the sensor. This property is only updated when the user tries to access the data. This is done to avoid unnecessary computation when the sensor data is not used. For updating the sensor when this property is accessed, you can use the following code snippet in your sensor implementation: .. code-block:: python # update sensors if needed self._update_outdated_buffers() # return the data (where `_data` is the data for the sensor) return self._data """raiseNotImplementedError@propertydefhas_debug_vis_implementation(self)->bool:"""Whether the sensor has a debug visualization implemented."""# check if function raises NotImplementedErrorsource_code=inspect.getsource(self._set_debug_vis_impl)return"NotImplementedError"notinsource_code""" Operations """
[文档]defset_debug_vis(self,debug_vis:bool)->bool:"""Sets whether to visualize the sensor data. Args: debug_vis: Whether to visualize the sensor data. Returns: Whether the debug visualization was successfully set. False if the sensor does not support debug visualization. """# check if debug visualization is supportedifnotself.has_debug_vis_implementation:returnFalse# toggle debug visualization objectsself._set_debug_vis_impl(debug_vis)# toggle debug visualization flagself._is_visualizing=debug_vis# toggle debug visualization handlesifdebug_vis:# create a subscriber for the post update event if it doesn't existifself._debug_vis_handleisNone:app_interface=omni.kit.app.get_app_interface()self._debug_vis_handle=app_interface.get_post_update_event_stream().create_subscription_to_pop(lambdaevent,obj=weakref.proxy(self):obj._debug_vis_callback(event))else:# remove the subscriber if it existsifself._debug_vis_handleisnotNone:self._debug_vis_handle.unsubscribe()self._debug_vis_handle=None# return successreturnTrue
[文档]defreset(self,env_ids:Sequence[int]|None=None):"""Resets the sensor internals. Args: env_ids: The sensor ids to reset. Defaults to None. """# Resolve sensor idsifenv_idsisNone:env_ids=slice(None)# Reset the timestamp for the sensorsself._timestamp[env_ids]=0.0self._timestamp_last_update[env_ids]=0.0# Set all reset sensors to outdated so that they are updated when data is called the next time.self._is_outdated[env_ids]=True
defupdate(self,dt:float,force_recompute:bool=False):# Update the timestamp for the sensorsself._timestamp+=dtself._is_outdated|=self._timestamp-self._timestamp_last_update+1e-6>=self.cfg.update_period# Update the buffers# TODO (from @mayank): Why is there a history length here when it doesn't mean anything in the sensor base?!?# It is only for the contact sensor but there we should redefine the update function IMO.ifforce_recomputeorself._is_visualizingor(self.cfg.history_length>0):self._update_outdated_buffers()""" Implementation specific. """@abstractmethoddef_initialize_impl(self):"""Initializes the sensor-related handles and internal buffers."""# Obtain Simulation Contextsim=sim_utils.SimulationContext.instance()ifsimisNone:raiseRuntimeError("Simulation Context is not initialized!")# Obtain device and backendself._device=sim.deviceself._backend=sim.backendself._sim_physics_dt=sim.get_physics_dt()# Count number of environmentsenv_prim_path_expr=self.cfg.prim_path.rsplit("/",1)[0]self._parent_prims=sim_utils.find_matching_prims(env_prim_path_expr)self._num_envs=len(self._parent_prims)# Boolean tensor indicating whether the sensor data has to be refreshedself._is_outdated=torch.ones(self._num_envs,dtype=torch.bool,device=self._device)# Current timestamp (in seconds)self._timestamp=torch.zeros(self._num_envs,device=self._device)# Timestamp from last updateself._timestamp_last_update=torch.zeros_like(self._timestamp)@abstractmethoddef_update_buffers_impl(self,env_ids:Sequence[int]):"""Fills the sensor data for provided environment ids. This function does not perform any time-based checks and directly fills the data into the data container. Args: env_ids: The indices of the sensors that are ready to capture. """raiseNotImplementedErrordef_set_debug_vis_impl(self,debug_vis:bool):"""Set debug visualization into visualization objects. This function is responsible for creating the visualization objects if they don't exist and input ``debug_vis`` is True. If the visualization objects exist, the function should set their visibility into the stage. """raiseNotImplementedError(f"Debug visualization is not implemented for {self.__class__.__name__}.")def_debug_vis_callback(self,event):"""Callback for debug visualization. This function calls the visualization objects and sets the data to visualize into them. """raiseNotImplementedError(f"Debug visualization is not implemented for {self.__class__.__name__}.")""" Internal simulation callbacks. """def_initialize_callback(self,event):"""Initializes the scene elements. Note: PhysX handles are only enabled once the simulator starts playing. Hence, this function needs to be called whenever the simulator "plays" from a "stop" state. """ifnotself._is_initialized:self._initialize_impl()self._is_initialized=Truedef_invalidate_initialize_callback(self,event):"""Invalidates the scene elements."""self._is_initialized=False""" Helper functions. """def_update_outdated_buffers(self):"""Fills the sensor data for the outdated sensors."""outdated_env_ids=self._is_outdated.nonzero().squeeze(-1)iflen(outdated_env_ids)>0:# obtain new dataself._update_buffers_impl(outdated_env_ids)# update the timestamp from last updateself._timestamp_last_update[outdated_env_ids]=self._timestamp[outdated_env_ids]# set outdated flag to false for the updated sensorsself._is_outdated[outdated_env_ids]=False