Source code for usd_rerun_logger.usd_logger

"""Rerun.io logger for any USD stage."""

import fnmatch
from pathlib import Path

from .util import assert_usd_core_dependency

assert_usd_core_dependency()

import rerun as rr  # noqa: E402
from pxr import Gf, Usd, UsdGeom  # noqa: E402

from .transfom import log_usd_transform  # noqa: E402
from .util import get_recording_stream  # noqa: E402
from .visual import log_visuals  # noqa: E402

__all__ = [
    "UsdRerunLogger",
]


[docs] class UsdRerunLogger: """Logs USD (Universal Scene Description) stages to Rerun.io for visualization. This logger traverses a USD stage and logs all transforms and visual geometry (meshes, cubes, spheres, etc.) to a Rerun recording stream. It supports incremental logging, only re-logging transforms that have changed between calls to :meth:`log_stage`. The recording stream can either be provided directly via ``recording_stream``, or a new recording stream can be created that saves to ``save_path``. If neither is provided, it will try to find it using ``rr.get_data_recording()``. ``save_path`` takes precedence over ``recording_stream``. Parameters ---------- stage: The USD stage to log. This is the root of the scene hierarchy that will be traversed and logged. path_filter: Glob pattern(s) to filter which prims are logged. Can be a single pattern string or a list of patterns. Patterns starting with ``"!"`` are treated as exclusion patterns. For example, ``"/World/*"`` includes only prims under ``/World``, while ``"!/World/Hidden/*"`` excludes prims under ``/World/Hidden``. If ``None``, all prims are logged. recording_stream: The Rerun recording stream to use. Ignored if ``save_path`` is provided. save_path: Path where the Rerun recording will be saved as an ``.rrd`` file. If provided, a new recording stream is created that saves to this path. application_id: Application ID for the Rerun recording. Used when creating a new recording stream (either via ``save_path`` or when falling back to a new stream). Defaults to ``"usd_logger"`` if not provided. Attributes ---------- stage : Usd.Stage The USD stage being logged. recording_stream : rr.RecordingStream The Rerun recording stream used for logging. Notes ----- - Meshes are logged only once (on first encounter) and tracked internally to avoid redundant logging. - Transforms are logged every time :meth:`log_stage` is called, but only if they have changed since the last call. - Prims with ``purpose`` set to ``guide`` are skipped, along with their children. - Instance proxies (referenced/instanced prims) are traversed and logged. - When a prim is removed from the stage, it is automatically cleared from the Rerun recording. Examples -------- Log a USD stage to the Rerun viewer: .. code-block:: python import rerun as rr from pxr import Usd from usd_rerun_logger import UsdRerunLogger rr.init("my_usd_viewer", spawn=True) stage = Usd.Stage.Open("my_scene.usda") logger = UsdRerunLogger(stage) logger.log_stage() Save the recording to a file: .. code-block:: python stage = Usd.Stage.Open("my_scene.usda") logger = UsdRerunLogger(stage, save_path="recording.rrd") logger.log_stage() Filter which prims are logged: .. code-block:: python # Log only prims under /World/Robots, but exclude /World/Robots/Debug logger = UsdRerunLogger( stage, path_filter=["/World/Robots/*", "!/World/Robots/Debug/*"] ) logger.log_stage() Animate a scene by logging at each time step: .. code-block:: python rr.init("animated_scene", spawn=True) stage = Usd.Stage.Open("animated.usda") logger = UsdRerunLogger(stage) for frame in range(100): # Update USD stage time code stage.SetTimeCode(frame) # Log current state - only changed transforms are re-logged rr.set_time_sequence("frame", frame) logger.log_stage() """
[docs] def __init__( self, stage: Usd.Stage, path_filter: str | list[str] | None = None, recording_stream: rr.RecordingStream | None = None, save_path: Path | str | None = None, application_id: str | None = None, ): """Create the USD Rerun logger.""" self._stage = stage self._recording_stream = get_recording_stream( recording_stream=recording_stream, save_path=save_path, application_id=application_id, ) self._logged_meshes = set() # Track which meshes we've already logged self._last_usd_transforms: dict[ str, Gf.Matrix4d ] = {} # Track last logged transforms for change detection filters = ( [path_filter] if isinstance(path_filter, str) else list(path_filter or []) ) self._path_filter = filters or None include_filters: list[str] = [] exclude_filters: list[str] = [] for pattern in filters: if pattern.startswith("!") and len(pattern) > 1: exclude_filters.append(pattern[1:]) else: include_filters.append(pattern) self._include_filter = include_filters or None self._exclude_filter = exclude_filters or None self._prev_transforms: dict[str, Gf.Matrix4d] = {}
@property def stage(self) -> Usd.Stage: """The USD stage being logged.""" return self._stage @property def recording_stream(self) -> rr.RecordingStream: """The Rerun recording stream used for logging.""" return self._recording_stream
[docs] def log_stage(self): """Log the current state of the USD stage to Rerun. Traverses all prims in the stage and logs their transforms and visual geometry. Transforms are only re-logged if they have changed since the last call. Meshes and other visual geometry are logged only once on first encounter. Prims with ``purpose`` set to ``guide`` are skipped along with their children. Path filters (if configured) are applied to include/exclude specific prim paths. When prims are removed from the stage between calls, they are automatically cleared from the Rerun recording. """ # Traverse all prims in the stage current_paths = set() # Using Usd.TraverseInstanceProxies to traverse into instanceable prims (references) predicate = Usd.TraverseInstanceProxies(Usd.PrimDefaultPredicate) iterator = iter(self._stage.Traverse(predicate)) for prim in iterator: # Skip guides if prim.GetAttribute("purpose").Get() == UsdGeom.Tokens.guide: iterator.PruneChildren() continue entity_path = str(prim.GetPath()) # Apply path filters if self._include_filter and not any( fnmatch.fnmatch(entity_path, pattern) for pattern in self._include_filter ): continue if self._exclude_filter and any( fnmatch.fnmatch(entity_path, pattern) for pattern in self._exclude_filter ): continue current_paths.add(entity_path) # Log transforms for all Xformable prims log_usd_transform(self._recording_stream, prim, self._last_usd_transforms) if entity_path not in self._logged_meshes: # Log visuals for Mesh prims log_visuals(self._recording_stream, prim) self._logged_meshes.add(entity_path) # Clear the logged paths that are no longer present in the stage for path in list(self._last_usd_transforms.keys()): if path not in current_paths: self._recording_stream.log(path, rr.Clear.flat()) del self._last_usd_transforms[path]