# Copyright (c) 2022-2025, The Isaac Lab Project Developers.# All rights reserved.## SPDX-License-Identifier: BSD-3-Clause"""Reward manager for computing reward signals for a given world."""from__future__importannotationsimporttorchfromcollections.abcimportSequencefromprettytableimportPrettyTablefromtypingimportTYPE_CHECKINGfrom.manager_baseimportManagerBase,ManagerTermBasefrom.manager_term_cfgimportRewardTermCfgifTYPE_CHECKING:fromomni.isaac.lab.envsimportManagerBasedRLEnv
[文档]classRewardManager(ManagerBase):"""Manager for computing reward signals for a given world. The reward manager computes the total reward as a sum of the weighted reward terms. The reward terms are parsed from a nested config class containing the reward manger's settings and reward terms configuration. The reward terms are parsed from a config class containing the manager's settings and each term's parameters. Each reward term should instantiate the :class:`RewardTermCfg` class. .. note:: The reward manager multiplies the reward term's ``weight`` with the time-step interval ``dt`` of the environment. This is done to ensure that the computed reward terms are balanced with respect to the chosen time-step interval in the environment. """_env:ManagerBasedRLEnv"""The environment instance."""
[文档]def__init__(self,cfg:object,env:ManagerBasedRLEnv):"""Initialize the reward manager. Args: cfg: The configuration object or dictionary (``dict[str, RewardTermCfg]``). env: The environment instance. """# create buffers to parse and store termsself._term_names:list[str]=list()self._term_cfgs:list[RewardTermCfg]=list()self._class_term_cfgs:list[RewardTermCfg]=list()# call the base class constructor (this will parse the terms config)super().__init__(cfg,env)# prepare extra info to store individual reward term informationself._episode_sums=dict()forterm_nameinself._term_names:self._episode_sums[term_name]=torch.zeros(self.num_envs,dtype=torch.float,device=self.device)# create buffer for managing reward per environmentself._reward_buf=torch.zeros(self.num_envs,dtype=torch.float,device=self.device)# Buffer which stores the current step reward for each term for each environmentself._step_reward=torch.zeros((self.num_envs,len(self._term_names)),dtype=torch.float,device=self.device)
def__str__(self)->str:"""Returns: A string representation for reward manager."""msg=f"<RewardManager> contains {len(self._term_names)} active terms.\n"# create table for term informationtable=PrettyTable()table.title="Active Reward Terms"table.field_names=["Index","Name","Weight"]# set alignment of table columnstable.align["Name"]="l"table.align["Weight"]="r"# add info on each termforindex,(name,term_cfg)inenumerate(zip(self._term_names,self._term_cfgs)):table.add_row([index,name,term_cfg.weight])# convert table to stringmsg+=table.get_string()msg+="\n"returnmsg""" Properties. """@propertydefactive_terms(self)->list[str]:"""Name of active reward terms."""returnself._term_names""" Operations. """
[文档]defreset(self,env_ids:Sequence[int]|None=None)->dict[str,torch.Tensor]:"""Returns the episodic sum of individual reward terms. Args: env_ids: The environment ids for which the episodic sum of individual reward terms is to be returned. Defaults to all the environment ids. Returns: Dictionary of episodic sum of individual reward terms. """# resolve environment idsifenv_idsisNone:env_ids=slice(None)# store informationextras={}forkeyinself._episode_sums.keys():# store information# r_1 + r_2 + ... + r_nepisodic_sum_avg=torch.mean(self._episode_sums[key][env_ids])extras["Episode_Reward/"+key]=episodic_sum_avg/self._env.max_episode_length_s# reset episodic sumself._episode_sums[key][env_ids]=0.0# reset all the reward termsforterm_cfginself._class_term_cfgs:term_cfg.func.reset(env_ids=env_ids)# return logged informationreturnextras
[文档]defcompute(self,dt:float)->torch.Tensor:"""Computes the reward signal as a weighted sum of individual terms. This function calls each reward term managed by the class and adds them to compute the net reward signal. It also updates the episodic sums corresponding to individual reward terms. Args: dt: The time-step interval of the environment. Returns: The net reward signal of shape (num_envs,). """# reset computationself._reward_buf[:]=0.0# iterate over all the reward termsforname,term_cfginzip(self._term_names,self._term_cfgs):# skip if weight is zero (kind of a micro-optimization)ifterm_cfg.weight==0.0:continue# compute term's valuevalue=term_cfg.func(self._env,**term_cfg.params)*term_cfg.weight*dt# update total rewardself._reward_buf+=value# update episodic sumself._episode_sums[name]+=value# Update current reward for this step.self._step_reward[:,self._term_names.index(name)]=value/dtreturnself._reward_buf
""" Operations - Term settings. """
[文档]defset_term_cfg(self,term_name:str,cfg:RewardTermCfg):"""Sets the configuration of the specified term into the manager. Args: term_name: The name of the reward term. cfg: The configuration for the reward term. Raises: ValueError: If the term name is not found. """ifterm_namenotinself._term_names:raiseValueError(f"Reward term '{term_name}' not found.")# set the configurationself._term_cfgs[self._term_names.index(term_name)]=cfg
[文档]defget_term_cfg(self,term_name:str)->RewardTermCfg:"""Gets the configuration for the specified term. Args: term_name: The name of the reward term. Returns: The configuration of the reward term. Raises: ValueError: If the term name is not found. """ifterm_namenotinself._term_names:raiseValueError(f"Reward term '{term_name}' not found.")# return the configurationreturnself._term_cfgs[self._term_names.index(term_name)]
[文档]defget_active_iterable_terms(self,env_idx:int)->Sequence[tuple[str,Sequence[float]]]:"""Returns the active terms as iterable sequence of tuples. The first element of the tuple is the name of the term and the second element is the raw value(s) of the term. Args: env_idx: The specific environment to pull the active terms from. Returns: The active terms. """terms=[]foridx,nameinenumerate(self._term_names):terms.append((name,[self._step_reward[env_idx,idx].cpu().item()]))returnterms
""" Helper functions. """def_prepare_terms(self):# check if config is dict alreadyifisinstance(self.cfg,dict):cfg_items=self.cfg.items()else:cfg_items=self.cfg.__dict__.items()# iterate over all the termsforterm_name,term_cfgincfg_items:# check for non configifterm_cfgisNone:continue# check for valid config typeifnotisinstance(term_cfg,RewardTermCfg):raiseTypeError(f"Configuration for the term '{term_name}' is not of type RewardTermCfg."f" Received: '{type(term_cfg)}'.")# check for valid weight typeifnotisinstance(term_cfg.weight,(float,int)):raiseTypeError(f"Weight for the term '{term_name}' is not of type float or int."f" Received: '{type(term_cfg.weight)}'.")# resolve common parametersself._resolve_common_term_cfg(term_name,term_cfg,min_argc=1)# add function to listself._term_names.append(term_name)self._term_cfgs.append(term_cfg)# check if the term is a classifisinstance(term_cfg.func,ManagerTermBase):self._class_term_cfgs.append(term_cfg)