Source code for pyampp.gxbox.view_h5

#!/usr/bin/env python
from __future__ import annotations

import argparse
from dataclasses import dataclass
import warnings
import tempfile
try:
    from pyvista import PyVistaDeprecationWarning
except Exception:
[docs] PyVistaDeprecationWarning = DeprecationWarning
from pathlib import Path import numpy as np import astropy.units as u from astropy.coordinates import SkyCoord from astropy.time import Time from sunpy.coordinates import ( Heliocentric, HeliographicCarrington, HeliographicStonyhurst, Helioprojective, get_earth, ) from sunpy.sun import constants as sun_consts from pyampp.gxbox.box import Box, BoxGeometryMixin from pyampp.gxbox.boxutils import read_b3d_h5, normalize_observer_metadata from pyampp.gxbox.gx_fov2box import _decode_id_text, _extract_execute_geometry, _infer_time_from_entry_loaded from pyampp.io import load_model from pyampp.gxbox.observer_restore import resolve_observer_with_info from PyQt5.QtWidgets import QApplication, QFileDialog, QMessageBox, QProgressDialog from PyQt5.QtCore import Qt, QTimer def _contains_viewer_field_payload(b3d: dict) -> bool: if not isinstance(b3d, dict): return False for key in ("corona", "nlfff", "pot"): group = b3d.get(key) if isinstance(group, dict) and any(name in group for name in ("bx", "by", "bz", "bcube")): return True chromo = b3d.get("chromo") if isinstance(chromo, dict): if any(name in chromo for name in ("bx", "by", "bz", "bcube", "chromo_bcube")): return True return False def _ensure_viewer_compatible_model(b3d: dict, model_path: Path) -> None: if _contains_viewer_field_payload(b3d): return raise ValueError( "Incompatible model file for gxbox-view3d: missing 3D field payload " f"(corona/chromo). File: {model_path}. " "This looks like a metadata-only thin HDF5 (e.g. export-model-metadata output), " "which is supported for geometry metadata workflows but not for 2D/3D viewer rendering." ) def _decode_meta_text(value) -> str: if isinstance(value, (bytes, bytearray)): return value.decode("utf-8", "ignore") if isinstance(value, np.ndarray) and value.shape == (): item = value.item() if isinstance(item, (bytes, bytearray)): return item.decode("utf-8", "ignore") return str(item) return str(value) def _to_xyz_if_zyx(arr: np.ndarray) -> np.ndarray: if arr.ndim == 3: return np.transpose(arr, (2, 1, 0)) if arr.ndim == 4 and arr.shape[-1] == 3: return np.transpose(arr, (2, 1, 0, 3)) if arr.ndim == 4 and arr.shape[0] == 3: # (c, z, y, x) -> (x, y, z, c) return np.transpose(arr, (3, 2, 1, 0)) return arr
[docs] def normalize_viewer_axis_order(b3d: dict) -> dict: """ Convert canonical H5 zyx cubes into viewer xyz cubes. MagFieldViewer expects (x, y, z). """ meta = b3d.get("metadata", {}) if isinstance(b3d, dict) else {} axis_order = _decode_meta_text(meta.get("axis_order_3d", "")).strip().lower() if axis_order != "zyx": return b3d for model_key in ("corona", "chromo", "nlfff", "pot"): if model_key not in b3d or not isinstance(b3d[model_key], dict): continue for comp in ("bx", "by", "bz", "bcube", "chromo_bcube"): if comp in b3d[model_key]: b3d[model_key][comp] = _to_xyz_if_zyx(np.asarray(b3d[model_key][comp])) return b3d
@dataclass
[docs] class SimpleBox(BoxGeometryMixin):
[docs] dims_pix: np.ndarray
[docs] res: u.Quantity
[docs] b3d: dict
_frame_obs: object _center: SkyCoord @property
[docs] def grid_coords(self): dims = self.dims_pix dx = self.res x = np.linspace(-dims[0] / 2, dims[0] / 2, dims[0]) * dx y = np.linspace(-dims[1] / 2, dims[1] / 2, dims[1]) * dx z = np.linspace(-dims[2] / 2, dims[2] / 2, dims[2]) * dx return {"x": x, "y": y, "z": z, "frame": self._frame_obs}
[docs] def infer_dims(b3d: dict) -> np.ndarray: for key in ("corona", "nlfff", "pot"): if key in b3d and "bx" in b3d[key]: return np.array(b3d[key]["bx"].shape, dtype=int) if "chromo" in b3d: if "bx" in b3d["chromo"]: return np.array(b3d["chromo"]["bx"].shape, dtype=int) if "bcube" in b3d["chromo"]: return np.array(b3d["chromo"]["bcube"].shape[:3], dtype=int) if "chromo_bcube" in b3d["chromo"]: return np.array(b3d["chromo"]["chromo_bcube"].shape[1:4], dtype=int) raise ValueError("Unable to infer dimensions from HDF5.")
[docs] def infer_time(b3d: dict) -> Time: if "chromo" in b3d and "attrs" in b3d["chromo"]: attrs = b3d["chromo"]["attrs"] if "obs_time" in attrs: try: return Time(attrs["obs_time"]) except Exception: pass return Time.now()
[docs] def infer_res(b3d: dict) -> u.Quantity: if "corona" in b3d and "dr" in b3d["corona"]: dr = b3d["corona"]["dr"] if dr is not None and np.size(dr) >= 1: return (dr[0] * sun_consts.radius.to(u.km).value) * u.km if "chromo" in b3d and "dr" in b3d["chromo"]: dr = b3d["chromo"]["dr"] if dr is not None and np.size(dr) >= 1: return (dr[0] * sun_consts.radius.to(u.km).value) * u.km return 1.0 * u.Mm
def _box_from_saved_model(b3d: dict, model_path: Path): meta = b3d.get("metadata", {}) if isinstance(b3d, dict) else {} execute_text = _decode_meta_text(meta.get("execute", "")) if isinstance(meta, dict) else "" coords, frame_mode, _projection = _extract_execute_geometry(execute_text) time_iso = _infer_time_from_entry_loaded(b3d, model_path) if not time_iso or coords is None: return None, None try: obs_time = Time(time_iso) except Exception: return None, None dims = infer_dims(b3d) res = infer_res(b3d).to(u.Mm) # Display/LoS observer (camera, projected views) comes from saved observer metadata. observer, observer_warning = _resolve_model_observer(b3d, obs_time) if observer_warning: print(f"Warning: {observer_warning}") frame_obs = Helioprojective(observer=observer, obstime=obs_time) # Geometry in EXECUTE comes from the original model definition (Earth frame). geom_observer = get_earth(obs_time) frame_geom = Helioprojective(observer=geom_observer, obstime=obs_time) try: if (frame_mode or "hpc").lower() == "hgc": box_origin = SkyCoord( lon=float(coords[0]) * u.deg, lat=float(coords[1]) * u.deg, radius=sun_consts.radius.to(u.Mm), obstime=obs_time, observer=geom_observer, frame=HeliographicCarrington, ).transform_to(frame_geom) elif (frame_mode or "hpc").lower() == "hgs": box_origin = SkyCoord( lon=float(coords[0]) * u.deg, lat=float(coords[1]) * u.deg, radius=sun_consts.radius.to(u.Mm), obstime=obs_time, frame=HeliographicStonyhurst, ).transform_to(frame_geom) else: box_origin = SkyCoord( Tx=float(coords[0]) * u.arcsec, Ty=float(coords[1]) * u.arcsec, frame=frame_geom, ) except Exception: return None, None box_dims = np.array(dims, dtype=float) * u.pix box_hcc = Heliocentric(observer=box_origin, obstime=obs_time) box_center = box_origin.transform_to(box_hcc) box_dimensions = box_dims / u.pix * res box_center = SkyCoord( x=box_center.x, y=box_center.y, z=box_center.z + box_dimensions[2] / 2, frame=box_center.frame, ) box = Box(frame_obs, box_origin, box_center, box_dims, res) return box, obs_time def _resolve_model_observer(b3d: dict, obs_time: Time): observer_meta = b3d.get("observer", {}) if isinstance(b3d, dict) else {} if isinstance(observer_meta, dict): fov_box = observer_meta.get("fov_box", {}) if isinstance(fov_box, dict): coord, warning, _used_key = resolve_observer_with_info(b3d, fov_box.get("observer_key"), obs_time) return coord, warning if observer_meta.get("name") is not None: coord, warning, _used_key = resolve_observer_with_info(b3d, observer_meta.get("name"), obs_time) return coord, warning coord, warning, _used_key = resolve_observer_with_info(b3d, "earth", obs_time) return coord, warning def _normalize_vector(vec: np.ndarray) -> np.ndarray | None: arr = np.asarray(vec, dtype=float).reshape(-1) if arr.size != 3: return None norm = float(np.linalg.norm(arr)) if not np.isfinite(norm) or norm <= 0: return None return arr / norm def _viewer_camera_vectors(box, obs_time: Time) -> tuple[np.ndarray | None, np.ndarray | None]: """ Reproduce the legacy GxBox LoS camera basis for standalone HDF5 viewing. """ box_origin = getattr(box, "_origin", None) frame_obs = getattr(box, "_frame_obs", None) box_view_up = getattr(box, "box_view_up", None) observer = getattr(frame_obs, "observer", None) if box_origin is None or frame_obs is None or observer is None or box_view_up is None: return None, None try: box_norm_direction = _normalize_vector( box_origin.transform_to( Heliocentric(observer=observer, obstime=obs_time) ).cartesian.xyz.value ) if box_norm_direction is None: return None, None up_coords = np.diff( box_view_up.transform_to( Heliocentric(observer=observer, obstime=obs_time) ).cartesian.xyz.value ) view_up = _normalize_vector(np.squeeze(up_coords)) if view_up is None: return box_norm_direction, None if abs(view_up[1]) > 0: view_up = np.sign(view_up[1]) * view_up return box_norm_direction, view_up except Exception: return None, None
[docs] def can_prepare_model_for_viewer(model_path: str | Path) -> bool: """ Return ``True`` when a file looks like a viewer-compatible saved model. This is a lightweight gate for enabling UI actions. It intentionally avoids full SAV conversion and only checks file existence, supported suffix, and the presence of recognizable top-level model groups for HDF5 files. """ try: model_path = Path(model_path).expanduser().resolve() except Exception: return False if not model_path.exists() or not model_path.is_file(): return False suffix = model_path.suffix.lower() if suffix == ".sav": return True if suffix != ".h5": return False try: b3d = read_b3d_h5(str(model_path)) except Exception: return False return _contains_viewer_field_payload(b3d)
[docs] def prepare_model_for_viewer(model_path: str | Path) -> tuple[SimpleBox, Time, str, Path | None]: """ Load a saved model file into the in-memory objects expected by ``MagFieldViewer``. Returns ------- tuple ``(box, obs_time, b3dtype, temp_h5_path)`` where ``temp_h5_path`` is a temporary conversion artifact when the canonical loader had to materialize one. """ model_path = Path(model_path).expanduser().resolve() temp_h5_path = None try: b3d, temp_h5_path = load_model(model_path, keep_temp_h5=True) except Exception as exc: raise RuntimeError( "Model input could not be loaded through pyampp.io.load_model. " "If this is a legacy SAV file, verify the conversion dependencies are available." ) from exc if temp_h5_path is not None: print(f"Materialized temporary HDF5: {temp_h5_path}") _ensure_viewer_compatible_model(b3d, model_path) b3d = normalize_viewer_axis_order(b3d) box, obs_time = _box_from_saved_model(b3d, model_path) if box is None: dims = infer_dims(b3d) obs_time = infer_time(b3d) res = infer_res(b3d) observer, observer_warning = _resolve_model_observer(b3d, obs_time) if observer_warning: print(f"Warning: {observer_warning}") frame = Heliocentric(observer=observer, obstime=obs_time) center = SkyCoord(0 * u.Mm, 0 * u.Mm, 0 * u.Mm, frame=frame) box = SimpleBox(dims_pix=dims, res=res.to(u.Mm), b3d=b3d, _frame_obs=frame, _center=center) if "corona" in b3d: b3dtype = "corona" elif "nlfff" in b3d: b3dtype = "corona" b3d["corona"] = b3d.pop("nlfff") elif "pot" in b3d: b3dtype = "corona" b3d["corona"] = b3d.pop("pot") elif "chromo" in b3d: b3dtype = "chromo" chromo = b3d.get("chromo", {}) if "bx" not in chromo and "bcube" in chromo: bcube = chromo["bcube"] if bcube.ndim == 4 and bcube.shape[-1] == 3: chromo["bx"] = bcube[:, :, :, 0] chromo["by"] = bcube[:, :, :, 1] chromo["bz"] = bcube[:, :, :, 2] b3d["chromo"] = chromo else: raise ValueError("No known model types found in HDF5 (expected corona/chromo).") box.b3d = b3d if hasattr(box, "corona_type"): if "corona" in b3d and isinstance(b3d["corona"], dict): attrs = b3d["corona"].get("attrs", {}) if isinstance(attrs, dict): box.corona_type = attrs.get("model_type") if hasattr(box, "corona_models") and box.corona_type: box.corona_models[box.corona_type] = b3d["corona"] return box, obs_time, b3dtype, temp_h5_path
[docs] def main() -> int: from pyampp.gxbox.box_view3d import MagFieldViewer parser = argparse.ArgumentParser(description="Open a saved HDF5 model in the 3D viewer without recomputing.") parser.add_argument("h5_path", nargs="?", help="Path to the HDF5 model file (positional).") parser.add_argument("--h5", dest="h5_opt", help="Path to the HDF5 model file.") parser.add_argument("--dir", dest="start_dir", help="Initial directory for file picker when no model path is given.") parser.add_argument("--pick", action="store_true", help="Open file picker even when model path is provided.") parser.add_argument("--pipeline-child", action="store_true", help="Open in restricted pyAMPP child mode tied to the current model file.") args = parser.parse_args() h5_arg = args.h5_opt or args.h5_path app = QApplication.instance() owns_app = False if app is None: app = QApplication([]) owns_app = True if args.pick or not h5_arg: start_dir = Path(args.start_dir).expanduser() if args.start_dir else Path.cwd() if not start_dir.exists() or not start_dir.is_dir(): start_dir = Path.cwd() dialog = QFileDialog(None, "Open Model (HDF5 or SAV)") # Native macOS picker may ignore selectFile(); use Qt dialog for reliable preselection. dialog.setOption(QFileDialog.DontUseNativeDialog, True) dialog.setFileMode(QFileDialog.ExistingFile) dialog.setNameFilter("Model Files (*.h5 *.sav);;HDF5 Files (*.h5);;SAV Files (*.sav);;All Files (*)") if h5_arg: candidate = Path(h5_arg).expanduser() dialog.setDirectory(str(candidate.parent if candidate.parent.exists() else start_dir)) dialog.selectFile(str(candidate.name)) else: dialog.setDirectory(str(start_dir)) if not dialog.exec_(): return 0 selected = dialog.selectedFiles() if not selected: return 0 h5_arg = selected[0] model_path = Path(h5_arg).expanduser().resolve() loading_dialog = QProgressDialog(None, None, 0, 0, None) loading_dialog.setWindowTitle("Opening Model") loading_dialog.setLabelText(f"Loading {model_path.name}...") loading_dialog.setCancelButton(None) loading_dialog.setMinimumDuration(0) loading_dialog.setWindowModality(Qt.ApplicationModal) loading_dialog.setAutoClose(False) loading_dialog.setAutoReset(False) loading_dialog.show() QApplication.setOverrideCursor(Qt.WaitCursor) app.processEvents() try: box, obs_time, b3dtype, temp_h5_path = prepare_model_for_viewer(model_path) except Exception as exc: loading_dialog.close() QApplication.restoreOverrideCursor() try: QMessageBox.critical(None, "Incompatible Model", str(exc)) except Exception: print(f"Error: {exc}") return 2 finally: if loading_dialog.isVisible(): loading_dialog.close() QApplication.restoreOverrideCursor() app.processEvents() warnings.filterwarnings("ignore", category=PyVistaDeprecationWarning) save_target = model_path if temp_h5_path is None else None box_norm_direction, box_view_up = _viewer_camera_vectors(box, obs_time) viewer = MagFieldViewer( box, time=obs_time, b3dtype=b3dtype, parent=None, model_path=save_target, source_model_path=model_path, box_norm_direction=box_norm_direction, box_view_up=box_view_up, session_mode="pipeline_child" if args.pipeline_child else "standalone", ) if hasattr(viewer, "app_window"): viewer.app_window.setWindowTitle(f"GxBox 3D viewer - {model_path}") viewer.app_window.show() viewer.app_window.showNormal() if hasattr(viewer, "ensure_window_visible"): viewer.ensure_window_visible() viewer.app_window.raise_() viewer.app_window.activateWindow() viewer.show() if hasattr(viewer, "schedule_startup_los_view"): viewer.schedule_startup_los_view() else: QTimer.singleShot(0, viewer.set_camera_to_LOS_direction) if owns_app: app.exec_() # Temporary conversion artifact can be removed after viewer exits. if temp_h5_path is not None: try: temp_h5_path.unlink(missing_ok=True) temp_h5_path.parent.rmdir() except Exception: pass return 0
if __name__ == "__main__": raise SystemExit(main())