Source code for usd_rerun_logger.env_wrapper

"""Gymnasium environment wrapper for logging training with Rerun.io."""

from __future__ import annotations

from collections.abc import Callable
from pathlib import Path
from typing import Any, Generic, SupportsFloat

import rerun as rr

import gymnasium as gym
from gymnasium.core import ActType, ObsType, RenderFrame

from .isaac_lab_logger import IsaacLabRerunLogger

__all__ = [
    "LogRerun",
]


[docs] class LogRerun( gym.Wrapper[ObsType, ActType, ObsType, ActType], Generic[ObsType, ActType, RenderFrame], gym.utils.RecordConstructorArgs, ): """Logs Isaac Lab based environment episodes with Rerun.io. The API follows Gymnasium's :class:`gymnasium.wrappers.RecordVideo` wrapper, but instead of writing video, it logs Isaac Lab scenes to a Rerun stream. Typically you only want to log intermittently (e.g. every 100th episode or every N environment steps). Use either ``episode_trigger`` or ``step_trigger`` to decide when a new recording should start. The recording stream can either be provided directly via ``recording_stream``, or a new recording stream can be created that saves to ``save_path``. If neither is provided, it will try to find it using ``rr.get_data_recording()``. ``save_path`` takes precedence over ``recording_stream``. Parameters ---------- env: The environment to wrap. Must be Isaac Lab based and expose ``env.unwrapped.scene``. logged_envs: Indices of environments to log (for multi-env Isaac Lab scenes). If an ``int``, it is treated as a single environment index. If a list, those indices are logged. Defaults to ``0`` (the first environment). recording_stream: The Rerun recording stream to use. Ignored if ``save_path`` is provided. save_path: Path where the Rerun recording will be saved. If provided, a new recording stream is created that saves to this path. episode_trigger: Callable ``episode_trigger(episode_id) -> bool`` returning ``True`` iff a recording should start on the given episode. If both ``episode_trigger`` and ``step_trigger`` are ``None``, defaults to :func:`gymnasium.utils.save_video.capped_cubic_video_schedule`. (Same signature as in :class:`gymnasium.wrappers.RecordVideo`.) step_trigger: Callable ``step_trigger(step_id) -> bool`` returning ``True`` iff a recording should start on the current *global* environment step (summed over episodes). (Same signature as in :class:`gymnasium.wrappers.RecordVideo`.) recording_length: Number of frames to record per snippet. If ``0``, record full episodes (until reset). If strictly positive, record fixed-length snippets. (Same as video_length in :class:`gymnasium.wrappers.RecordVideo`.) Raises ------ ValueError If the environment does not expose a ``scene`` attribute on ``env.unwrapped``, i.e. it does not appear to be an Isaac Lab based environment. Notes ----- - No vectorized wrapper variant is provided. - When a recording is active, each environment step logs the Isaac Lab scene at a timestamp derived from the scene ``physics_dt``. Examples -------- Record every 10th episode and write `.rrd` files to disk: .. code-block:: python trigger = lambda ep: ep % 10 == 0 rr.init("gymnasium_example", spawn=True) env = LogRerun(env, save_path="./save_rerun1.rrd", episode_trigger=trigger) Start a recording every 200th step and record 100 frames each time: .. code-block:: python trigger = lambda t: t % 200 == 0 # LogRerun will find the global Rerun recording with rr.get_data_recording() rr.init("gymnasium_example", spawn=True) env = LogRerun(env, step_trigger=trigger, recording_length=100) Record everything, but split into chunks of 1000 frames: .. code-block:: python # Set the recording stream directly recording = rr.RecordingStream("gymnasium_example") env = LogRerun(env, recording_length=1000, recording_stream=recording) """
[docs] def __init__( self, env: gym.Env[ObsType, ActType], logged_envs: int | list[int] = 0, recording_stream: rr.RecordingStream | None = None, save_path: Path | str | None = None, episode_trigger: Callable[[int], bool] | None = None, step_trigger: Callable[[int], bool] | None = None, recording_length: int = 0, ): """Create the wrapper.""" gym.utils.RecordConstructorArgs.__init__( self, episode_trigger=episode_trigger, step_trigger=step_trigger, recording_length=recording_length, ) gym.Wrapper.__init__(self, env) # Check if the environment has a scene if not hasattr(self.env.unwrapped, "scene"): raise ValueError( ( "Cannot use LogRerun wrapper: the environment does not have a 'scene' attribute. " "Are you sure this is an Isaac Lab based environment?" ) ) self.scene = self.env.unwrapped.scene self.logger = IsaacLabRerunLogger( scene=env.unwrapped.scene, logged_envs=logged_envs, recording_stream=recording_stream, save_path=save_path, application_id=env.spec.id if env.spec is not None else "env", ) if episode_trigger is None and step_trigger is None: from gymnasium.utils.save_video import capped_cubic_video_schedule episode_trigger = capped_cubic_video_schedule self.episode_trigger = episode_trigger self.step_trigger = step_trigger self.recording_length: int = ( recording_length if recording_length != 0 else float("inf") ) self.step_id = -1 # Global step counter across episodes self.episode_id = -1 # Global episode counter self._timeline_name: str | None = None self._recorded_frames = 0 # Number of frames recorded in the current snippet
def _capture_frame(self): """Capture a frame from the environment.""" if self._timeline_name is None: return self._update_timelines() self.logger.log_scene() self._recorded_frames += 1 def _update_timelines(self): """Update the timestamp and the sequence based timelines.""" if self._timeline_name is None: return timestamp = self._recorded_frames * self.logger.scene.physics_dt # It's not possible to specify the playback speed with a sequence based timeline, # so we set a timestamp based timeline as well. This mimics the behavior of # lerobot-dataset-viz. self.logger.recording_stream.set_time( timeline=self.timestamp_timeline_name, duration=timestamp ) self.logger.recording_stream.set_time( timeline=self.sequence_timeline_name, sequence=self._recorded_frames )
[docs] def reset( self, *, seed: int | None = None, options: dict[str, Any] | None = None ) -> tuple[ObsType, dict[str, Any]]: """Reset the environment and eventually starts a new recording.""" obs, info = super().reset(seed=seed, options=options) self.episode_id += 1 if self.recording_length == float("inf"): self.stop_recording() if self.episode_trigger and self.episode_trigger(self.episode_id): self.start_recording(f"episode_{self.episode_id}") self._capture_frame() return obs, info
[docs] def step( self, action: ActType ) -> tuple[ObsType, SupportsFloat, bool, bool, dict[str, Any]]: """Steps through the environment using action and logs the environment if recording is active.""" obs, rew, terminated, truncated, info = self.env.step(action) self.step_id += 1 if self.step_trigger and self.step_trigger(self.step_id): self.start_recording(f"step_{self.step_id}") self._capture_frame() if self._recorded_frames > self.recording_length: self.stop_recording() return obs, rew, terminated, truncated, info
[docs] def close(self): """Closes the wrapper and flushes the recording stream.""" super().close() self.stop_recording()
@property def sequence_timeline_name(self) -> str | None: """Get the name of the step based timeline, if any.""" return ( f"{self._timeline_name}_step" if self._timeline_name is not None else None ) @property def timestamp_timeline_name(self) -> str | None: """Get the name of the timestamp based timeline, if any.""" return ( f"{self._timeline_name}_timestamp" if self._timeline_name is not None else None )
[docs] def start_recording(self, timeline_name: str): """Start a new recording. If it is already recording, stops the current recording before starting the new one.""" self._timeline_name = timeline_name self.logger.recording_stream.reset_time() self._update_timelines()
[docs] def stop_recording(self): """Stop current recording and flush the recording stream.""" self._timeline_name = None self._recorded_frames = 0 self.logger.recording_stream.flush()