Source code for pyampp.gxbox.gxbox_selector_view

#!/usr/bin/env python
from __future__ import annotations

import argparse
import re
import shlex
from pathlib import Path
from typing import Any, Optional, Sequence

import astropy.units as u
import numpy as np
from astropy.io import fits
from sunpy.coordinates import HeliographicStonyhurst
from astropy.time import Time
from PyQt5.QtWidgets import QApplication, QFileDialog, QDialog, QMessageBox

from pyampp.data.downloader import SDOImageDownloader
from pyampp.gxbox.fov_selector_gui import FovBoxSelectorDialog
from pyampp.gxbox.gx_fov2box import (
    _decode_id_text,
    _extract_execute_geometry,
    _extract_execute_paths,
    _infer_time_from_entry_loaded,
    _load_entry_box_any,
)
from pyampp.io import discover_fits_refmap_map_ids, load_model, save_model
from pyampp.gxbox.selector_api import (
    BoxGeometrySelection,
    CoordMode,
    DisplayFovBoxSelection,
    DisplayFovSelection,
    SelectorDialogResult,
    SelectorSessionInput,
)
from pyampp.gxbox.observer_restore import build_pb0r_metadata_from_ephemeris, resolve_observer_with_info

_DEFAULT_MAP_IDS = (
    "Bz",
    "Ic",
    "B_rho",
    "B_theta",
    "B_phi",
    "disambig",
    "Vert_current",
    "chromo_mask",
    "94",
    "131",
    "1600",
    "1700",
    "171",
    "193",
    "211",
    "304",
    "335",
)


def _contains_viewer_field_payload(entry_loaded: dict[str, Any]) -> bool:
    if not isinstance(entry_loaded, dict):
        return False

    for key in ("corona", "nlfff", "pot"):
        group = entry_loaded.get(key)
        if isinstance(group, dict) and any(name in group for name in ("bx", "by", "bz", "bcube")):
            return True

    chromo = entry_loaded.get("chromo")
    if isinstance(chromo, dict):
        if any(name in chromo for name in ("bx", "by", "bz", "bcube", "chromo_bcube")):
            return True

    return False

def _normalize_observer_key(observer_key: str | None) -> str:
    raw = observer_key
    if isinstance(raw, (bytes, bytearray)):
        raw = raw.decode("utf-8", "ignore")
    if isinstance(raw, np.ndarray) and raw.shape == ():
        raw = raw.item()
        if isinstance(raw, (bytes, bytearray)):
            raw = raw.decode("utf-8", "ignore")
    key = str(raw or "earth").strip().lower()
    aliases = {
        "custom": "custom",
        "sdo": "sdo",
        "sdo/aia": "sdo",
        "sdo/hmi": "sdo",
        "earth": "earth",
        "solo": "solar orbiter",
        "solar-orbiter": "solar orbiter",
        "solarorbiter": "solar orbiter",
        "solar orbiter": "solar orbiter",
        "stereo a": "stereo-a",
        "stereo-a": "stereo-a",
        "stereoa": "stereo-a",
        "stereo b": "stereo-b",
        "stereo-b": "stereo-b",
        "stereob": "stereo-b",
    }
    return aliases.get(key, "earth")


def _observer_display_label(observer_key: str | None) -> str:
    return {
        "earth": "Earth",
        "sdo": "SDO",
        "solar orbiter": "Solar Orbiter",
        "stereo-a": "STEREO-A",
        "stereo-b": "STEREO-B",
        "custom": "Custom",
    }.get(_normalize_observer_key(observer_key), "Earth")


def _base_display_id(base_key: str) -> str:
    key = str(base_key).lower()
    aliases = {
        "bx": "Bx",
        "by": "By",
        "bz": "Bz",
        "ic": "Ic",
        "vert_current": "Vert_current",
        "chromo_mask": "chromo_mask",
    }
    return aliases.get(key, key)


def _pick_entry_file(initial: Optional[str], start_dir: Optional[str]) -> Optional[str]:
    dialog = QFileDialog(None, "Open Model Box (.h5 or .sav)")
    dialog.setOption(QFileDialog.DontUseNativeDialog, True)
    dialog.setFileMode(QFileDialog.ExistingFile)
    dialog.setNameFilter("Model Box Files (*.h5 *.sav);;HDF5 Files (*.h5);;SAV Files (*.sav);;All Files (*)")
    if initial:
        candidate = Path(initial).expanduser()
        dialog.setDirectory(str(candidate.parent if candidate.parent.exists() else Path.cwd()))
        dialog.selectFile(candidate.name)
    elif start_dir:
        base = Path(start_dir).expanduser()
        dialog.setDirectory(str(base if base.exists() else Path.cwd()))
    else:
        dialog.setDirectory(str(Path.cwd()))
    if not dialog.exec_():
        return None
    selected = dialog.selectedFiles()
    return selected[0] if selected else None


def _parse_execute_box_dims_and_dx(execute_text: str) -> tuple[Optional[tuple[int, int, int]], Optional[float]]:
    dims = None
    dx_km = None
    if not execute_text:
        return dims, dx_km
    try:
        parts = shlex.split(execute_text)
    except Exception:
        parts = []
    for i, token in enumerate(parts):
        if token == "--box-dims" and i + 3 < len(parts):
            try:
                dims = (int(float(parts[i + 1])), int(float(parts[i + 2])), int(float(parts[i + 3])))
            except Exception:
                pass
        elif token.startswith("--box-dims="):
            try:
                vals = token.split("=", 1)[1].split(",")
                if len(vals) == 3:
                    dims = tuple(int(float(v)) for v in vals)
            except Exception:
                pass
        elif token == "--dx-km" and i + 1 < len(parts):
            try:
                dx_km = float(parts[i + 1])
            except Exception:
                pass
        elif token.startswith("--dx-km="):
            try:
                dx_km = float(token.split("=", 1)[1])
            except Exception:
                pass
    if dims is None:
        m = re.search(
            r"--box-dims\s+([+-]?\d+(?:\.\d+)?)\s+([+-]?\d+(?:\.\d+)?)\s+([+-]?\d+(?:\.\d+)?)",
            execute_text,
        )
        if m:
            dims = tuple(int(round(float(m.group(i)))) for i in (1, 2, 3))
    if dx_km is None:
        m = re.search(r"--dx-km\s+([+-]?\d+(?:\.\d+)?)", execute_text)
        if m:
            dx_km = float(m.group(1))
    return dims, dx_km


def _parse_execute_refmap_paths(execute_text: str) -> tuple[str, ...]:
    if not execute_text:
        return ()
    try:
        parts = shlex.split(str(execute_text))
    except Exception:
        parts = []
    paths: list[str] = []
    flags = {"--refmaps-path", "--refmap-path"}
    for i, token in enumerate(parts):
        if token in flags and i + 1 < len(parts):
            paths.append(parts[i + 1])
        elif token.startswith("--refmaps-path=") or token.startswith("--refmap-path="):
            paths.append(token.split("=", 1)[1])
    return tuple(path for path in paths if str(path).strip())


def _merge_ref_map_paths(*path_groups: Optional[Sequence[str]]) -> tuple[str, ...]:
    merged: list[str] = []
    seen: set[str] = set()
    for paths in path_groups:
        for path in paths or ():
            text = str(path).strip()
            if not text or text in seen:
                continue
            seen.add(text)
            merged.append(text)
    return tuple(merged)


def _infer_dims_from_entry(entry_loaded: dict[str, Any]) -> tuple[int, int, int]:
    meta = entry_loaded.get("metadata", {}) if isinstance(entry_loaded, dict) else {}
    axis_order = _decode_id_text(meta.get("axis_order_3d", "")).strip().lower() if isinstance(meta, dict) else ""

    def _dims_from_shape(shape: tuple[int, ...]) -> tuple[int, int, int]:
        if len(shape) < 3:
            return (150, 75, 150)
        if axis_order == "zyx":
            return (int(shape[2]), int(shape[1]), int(shape[0]))
        return (int(shape[0]), int(shape[1]), int(shape[2]))

    for group_name in ("corona", "chromo"):
        grp = entry_loaded.get(group_name)
        if not isinstance(grp, dict):
            continue
        for key in ("bx", "by", "bz"):
            if key in grp:
                arr = np.asarray(grp[key])
                if arr.ndim >= 3:
                    return _dims_from_shape(tuple(int(v) for v in arr.shape[:3]))
        for key in ("bcube", "chromo_bcube"):
            if key in grp:
                arr = np.asarray(grp[key])
                if arr.ndim == 4 and arr.shape[-1] == 3:
                    return _dims_from_shape(tuple(int(v) for v in arr.shape[:3]))
                if arr.ndim == 4 and arr.shape[0] == 3:
                    return _dims_from_shape(tuple(int(v) for v in arr.shape[1:4]))
    return (150, 75, 150)


def _infer_dx_km_from_entry(entry_loaded: dict[str, Any]) -> float:
    rsun_km = 695700.0
    for group_name in ("corona", "chromo"):
        grp = entry_loaded.get(group_name)
        if not isinstance(grp, dict):
            continue
        dr = grp.get("dr")
        if dr is None:
            continue
        try:
            arr = np.asarray(dr, dtype=float).ravel()
            if arr.size > 0:
                return float(arr[0] * rsun_km)
        except Exception:
            continue
    return 1400.0


def _observer_fov_from_entry(
    entry_loaded: dict[str, Any],
) -> tuple[Optional[DisplayFovSelection], bool, Optional[DisplayFovBoxSelection]]:
    observer = entry_loaded.get("observer")
    if not isinstance(observer, dict):
        return None, False, None
    fov = observer.get("fov")
    if not isinstance(fov, dict):
        return None, False, None
    try:
        result = DisplayFovSelection(
            center_x_arcsec=float(fov["xc_arcsec"]),
            center_y_arcsec=float(fov["yc_arcsec"]),
            width_arcsec=float(fov["xsize_arcsec"]),
            height_arcsec=float(fov["ysize_arcsec"]),
        )
        fov_box_meta = observer.get("fov_box")
        fov_box = None
        if isinstance(fov_box_meta, dict):
            try:
                fov_box = DisplayFovBoxSelection(
                    center_x_arcsec=float(fov_box_meta.get("xc_arcsec", result.center_x_arcsec)),
                    center_y_arcsec=float(fov_box_meta.get("yc_arcsec", result.center_y_arcsec)),
                    width_arcsec=float(fov_box_meta.get("xsize_arcsec", result.width_arcsec)),
                    height_arcsec=float(fov_box_meta.get("ysize_arcsec", result.height_arcsec)),
                    z_min_mm=float(fov_box_meta["zmin_mm"]),
                    z_max_mm=float(fov_box_meta["zmax_mm"]),
                    observer_key=_normalize_observer_key(
                        fov_box_meta.get("observer_key", observer.get("name", "earth"))
                    ),
                )
            except Exception:
                fov_box = None
        return result, bool(fov.get("square", False)), fov_box
    except Exception:
        return None, False, None


def _geometry_from_entry(entry_loaded: dict[str, Any], entry_path: Path) -> tuple[str, BoxGeometrySelection]:
    meta = entry_loaded.get("metadata", {}) if isinstance(entry_loaded, dict) else {}
    execute_text = _decode_id_text(meta.get("execute", "")) if isinstance(meta, dict) else ""
    time_iso = _infer_time_from_entry_loaded(entry_loaded, entry_path) or Time.now().to_datetime().strftime("%Y-%m-%dT%H:%M:%S")
    coords, frame, _projection = _extract_execute_geometry(execute_text)
    dims, dx_km = _parse_execute_box_dims_and_dx(execute_text)
    if coords is None:
        coords = (0.0, 0.0)
    if dims is None:
        dims = _infer_dims_from_entry(entry_loaded)
    if dx_km is None:
        dx_km = _infer_dx_km_from_entry(entry_loaded)
    mode = {
        "hpc": CoordMode.HPC,
        "hgc": CoordMode.HGC,
        "hgs": CoordMode.HGS,
    }.get((frame or "hpc").lower(), CoordMode.HPC)
    geometry = BoxGeometrySelection(
        coord_mode=mode,
        coord_x=float(coords[0]),
        coord_y=float(coords[1]),
        grid_x=int(dims[0]),
        grid_y=int(dims[1]),
        grid_z=int(dims[2]),
        dx_km=float(dx_km),
    )
    return time_iso, geometry


def _discover_filesystem_maps(time_iso: str, data_dir: Optional[str]) -> dict[str, str]:
    if not data_dir:
        return {}
    base = Path(data_dir).expanduser()
    if not base.exists() or not base.is_dir():
        return {}
    try:
        downloader = SDOImageDownloader(Time(time_iso), data_dir=str(base), euv=True, uv=True, hmi=True)
        files = {k: v for k, v in downloader._check_files_exist(downloader.path, returnfilelist=True).items() if v}
        files.update(
            {
                key: value
                for key, value in _discover_external_ref_map_files(
                    [downloader.path],
                    generic=False,
                ).items()
                if key not in files
            }
        )
        return files
    except Exception:
        return {}


def _discover_external_ref_map_files(ref_map_paths: Optional[Sequence[str]], *, generic: bool = True) -> dict[str, str]:
    if not ref_map_paths:
        return {}
    out: dict[str, str] = {}
    for path, refmap_id in discover_fits_refmap_map_ids(ref_map_paths, generic=generic).items():
        out[_viewer_context_id_from_refmap_id(refmap_id)] = str(path)
    return out


def _viewer_context_id_from_refmap_id(refmap_id: str) -> str:
    match = re.fullmatch(r"AIA_(\d+)", str(refmap_id))
    if match:
        return match.group(1)
    return str(refmap_id)


def _available_map_ids_from_sources(map_files: dict[str, str], refmaps: dict[str, dict], base_maps: dict[str, Any]) -> list[str]:
    out: list[str] = []
    fs_availability = {
        "Bz": "magnetogram" in map_files,
        "Ic": "continuum" in map_files,
        "B_rho": "field" in map_files,
        "B_theta": "inclination" in map_files,
        "B_phi": "azimuth" in map_files,
        "disambig": "disambig" in map_files,
        # Backward-compatible legacy IDs; these now map to measured HPC products.
        "Br": "field" in map_files,
        "Bp": "inclination" in map_files,
        "Bt": "azimuth" in map_files,
    }
    ref_availability = {
        "Bz": "Bz_reference" in refmaps,
        "Ic": "Ic_reference" in refmaps,
        "Vert_current": "Vert_current" in refmaps,
        "94": "AIA_94" in refmaps,
        "131": "AIA_131" in refmaps,
        "1600": "AIA_1600" in refmaps,
        "1700": "AIA_1700" in refmaps,
        "171": "AIA_171" in refmaps,
        "193": "AIA_193" in refmaps,
        "211": "AIA_211" in refmaps,
        "304": "AIA_304" in refmaps,
        "335": "AIA_335" in refmaps,
    }
    base_availability = {
        "Bz": "bz" in base_maps,
        "Ic": "ic" in base_maps,
        "chromo_mask": "chromo_mask" in base_maps,
    }
    for map_id in _DEFAULT_MAP_IDS:
        if map_id in fs_availability:
            if fs_availability[map_id] or ref_availability.get(map_id, False) or base_availability.get(map_id, False):
                out.append(map_id)
        elif map_id in ref_availability:
            if ref_availability[map_id]:
                out.append(map_id)
        elif map_id in base_availability:
            if base_availability[map_id]:
                out.append(map_id)
        elif map_id in map_files:
            out.append(map_id)
    for base_key in sorted(base_maps.keys()):
        display_id = _base_display_id(base_key)
        if display_id not in out:
            out.append(display_id)
    if "Vert_current" in refmaps and "Vert_current" not in out:
        out.append("Vert_current")
    aliased_refmaps = {
        "Bz_reference",
        "Ic_reference",
        "Vert_current",
        "AIA_94",
        "AIA_131",
        "AIA_1600",
        "AIA_1700",
        "AIA_171",
        "AIA_193",
        "AIA_211",
        "AIA_304",
        "AIA_335",
    }
    for ref_key in sorted(refmaps.keys()):
        if ref_key not in aliased_refmaps and ref_key not in out:
            out.append(ref_key)
    for map_id in sorted(map_files.keys()):
        if map_id not in {
            "magnetogram",
            "continuum",
            "field",
            "inclination",
            "azimuth",
            "disambig",
        } and map_id not in out:
            out.append(map_id)
    return out or list(_DEFAULT_MAP_IDS)


def _build_session_input(entry_path: Path, ref_map_paths: Optional[Sequence[str]] = None) -> SelectorSessionInput:
    entry_loaded = _load_entry_box_any(entry_path)
    if not _contains_viewer_field_payload(entry_loaded):
        raise ValueError(
            "Incompatible model file for gxbox-view2d: missing 3D field payload "
            f"(corona/chromo). File: {entry_path}. "
            "This looks like a metadata-only thin HDF5 (e.g. export-model-metadata output), "
            "which is supported for geometry metadata workflows but not for 2D/3D viewer rendering."
        )
    time_iso, geometry = _geometry_from_entry(entry_loaded, entry_path)
    meta = entry_loaded.get("metadata", {}) if isinstance(entry_loaded, dict) else {}
    execute_text = _decode_id_text(meta.get("execute", "")) if isinstance(meta, dict) else ""
    data_dir, _gxmodel_dir = _extract_execute_paths(execute_text)
    execute_ref_map_paths = _parse_execute_refmap_paths(execute_text)
    all_ref_map_paths = _merge_ref_map_paths(execute_ref_map_paths, ref_map_paths)
    explicit_fov, square_fov, explicit_fov_box = _observer_fov_from_entry(entry_loaded)
    observer_meta = entry_loaded.get("observer") if isinstance(entry_loaded, dict) else None
    observer_name = observer_meta.get("name", "earth") if isinstance(observer_meta, dict) else "earth"
    display_observer_key = _normalize_observer_key(observer_name)
    custom_observer_ephemeris = None
    custom_observer_label = None
    custom_observer_source = None
    if isinstance(observer_meta, dict):
        raw_ephemeris = observer_meta.get("ephemeris")
        raw_label = _decode_id_text(observer_meta.get("label", "")).strip() or None
        raw_source = _decode_id_text(observer_meta.get("source", "")).strip() or None
        custom_needed = (
            display_observer_key == "custom"
            or _normalize_observer_key(observer_name) == "custom"
            or (
                isinstance(explicit_fov_box, DisplayFovBoxSelection)
                and _normalize_observer_key(explicit_fov_box.observer_key) == "custom"
            )
        )
        if custom_needed and isinstance(raw_ephemeris, dict):
            custom_observer_ephemeris = {
                key: raw_ephemeris[key]
                for key in ("obs_date", "obs_time", "hgln_obs_deg", "hglt_obs_deg", "dsun_cm", "rsun_cm")
                if key in raw_ephemeris
            } or None
            custom_observer_label = raw_label or "Custom"
            custom_observer_source = raw_source

    map_files = _discover_filesystem_maps(time_iso, data_dir)
    map_files.update(_discover_external_ref_map_files(all_ref_map_paths))
    refmaps = {}
    raw_refmaps = entry_loaded.get("refmaps")
    if isinstance(raw_refmaps, dict):
        refmaps = raw_refmaps
    base_maps = {}
    base_wcs_header = None
    raw_base = entry_loaded.get("base")
    if isinstance(raw_base, dict):
        for key in ("index", "index_header", "wcs_header"):
            if key in raw_base:
                try:
                    base_wcs_header = _decode_id_text(raw_base.get(key))
                except Exception:
                    base_wcs_header = None
                if base_wcs_header and str(base_wcs_header).strip():
                    break
        for key, value in raw_base.items():
            try:
                arr = np.asarray(value)
            except Exception:
                continue
            if arr.ndim == 2:
                base_maps[str(key).lower()] = value
    map_ids = _available_map_ids_from_sources(map_files, refmaps, base_maps)
    initial_map = "171" if "171" in map_ids else ("Bz" if "Bz" in map_ids else (map_ids[0] if map_ids else None))
    map_source_mode = "filesystem" if map_files else ("embedded" if refmaps else "auto")

    return SelectorSessionInput(
        time_iso=time_iso,
        data_dir=data_dir or "",
        geometry=geometry,
        fov=explicit_fov,
        fov_box=explicit_fov_box,
        square_fov=square_fov,
        allow_geometry_edit=False,
        map_ids=tuple(map_ids),
        map_files=map_files or None,
        refmaps=refmaps or None,
        base_maps=base_maps or None,
        base_wcs_header=base_wcs_header,
        base_geometry=geometry,
        map_source_mode=map_source_mode,
        display_observer_key=display_observer_key,
        custom_observer_ephemeris=custom_observer_ephemeris,
        custom_observer_label=custom_observer_label,
        custom_observer_source=custom_observer_source,
        initial_map_id=initial_map,
        pad_frac=0.10,
    )


def _pick_save_as_h5_path(parent_widget, default_stem: str = "model") -> Path | None:
    """Show a Save-As file dialog and return the chosen .h5 path, or None if cancelled."""
    dialog = QFileDialog(parent_widget, "Save Model As")
    dialog.setOption(QFileDialog.DontUseNativeDialog, True)
    dialog.setAcceptMode(QFileDialog.AcceptSave)
    dialog.setFileMode(QFileDialog.AnyFile)
    dialog.setNameFilter("HDF5 Files (*.h5);;All Files (*)")
    dialog.selectFile(f"{default_stem}.h5")
    if not dialog.exec_():
        return None
    selected = dialog.selectedFiles()
    if not selected:
        return None
    path = Path(selected[0])
    if path.suffix.lower() != ".h5":
        path = path.with_suffix(".h5")
    return path


def _persist_selector_result_to_entry(
    entry_path: Path,
    result: SelectorDialogResult,
    line_seeds=None,
    fov_box: DisplayFovBoxSelection | None = None,
    observer_state: dict[str, Any] | None = None,
    output_path: Path | None = None,
) -> bool:
    dest = output_path or entry_path
    if dest.suffix.lower() != ".h5":
        return False

    box_data = load_model(entry_path)
    observer = box_data.get("observer")
    if not isinstance(observer, dict):
        observer = {}
    else:
        observer = dict(observer)

    observer_name = observer.get("name", "earth")
    ephemeris = observer.get("ephemeris")
    display_observer_key = _normalize_observer_key(
        observer_state.get("display_observer_key") if isinstance(observer_state, dict) else observer_name
    )
    raw_custom_ephemeris = (
        observer_state.get("custom_observer_ephemeris")
        if isinstance(observer_state, dict)
        else None
    )
    custom_observer_label = (
        str(observer_state.get("custom_observer_label", "")).strip()
        if isinstance(observer_state, dict)
        else ""
    )
    custom_observer_source = (
        str(observer_state.get("custom_observer_source", "")).strip()
        if isinstance(observer_state, dict)
        else ""
    )
    if not isinstance(raw_custom_ephemeris, dict):
        raw_custom_ephemeris = None
    fov = {
        "frame": "helioprojective",
        "xc_arcsec": float(result.fov.center_x_arcsec),
        "yc_arcsec": float(result.fov.center_y_arcsec),
        "xsize_arcsec": float(result.fov.width_arcsec),
        "ysize_arcsec": float(result.fov.height_arcsec),
        "square": bool(result.square_fov),
    }
    observer["name"] = str(display_observer_key or "earth")
    observer["fov"] = fov
    if isinstance(fov_box, DisplayFovBoxSelection):
        observer["fov_box"] = fov_box.as_observer_metadata(square=bool(result.square_fov))
    else:
        observer.pop("fov_box", None)
    persisted_fov_meta = observer.get("fov_box", {}) if isinstance(observer.get("fov_box"), dict) else {}
    fov_observer_key = (
        str(fov_box.observer_key)
        if isinstance(fov_box, DisplayFovBoxSelection)
        else _normalize_observer_key(persisted_fov_meta.get("observer_key", observer["name"]))
    )
    needs_custom_ephemeris = (
        display_observer_key == "custom"
        or _normalize_observer_key(fov_observer_key) == "custom"
    )
    if needs_custom_ephemeris and custom_observer_label:
        observer["label"] = custom_observer_label
    else:
        observer["label"] = _observer_display_label(display_observer_key)
    if needs_custom_ephemeris and custom_observer_source:
        observer["source"] = custom_observer_source
    else:
        observer.pop("source", None)
    resolved_ephemeris: dict[str, float | str] = {}
    obs_time = _infer_time_from_entry_loaded(box_data, entry_path)
    if obs_time:
        try:
            when = Time(obs_time)
        except Exception:
            when = None
    else:
        when = None
    if needs_custom_ephemeris and raw_custom_ephemeris:
        resolved_ephemeris = {
            key: raw_custom_ephemeris[key]
            for key in ("obs_date", "obs_time", "hgln_obs_deg", "hglt_obs_deg", "dsun_cm", "rsun_cm")
            if key in raw_custom_ephemeris
        }
        if when is not None and "obs_date" not in resolved_ephemeris:
            resolved_ephemeris["obs_date"] = when.isot
    else:
        if when is not None:
            resolved_ephemeris["obs_date"] = when.isot
            coord, _warning, _used_key = resolve_observer_with_info(box_data, display_observer_key, when)
            if coord is not None:
                try:
                    obs_hgs = coord.transform_to(HeliographicStonyhurst(obstime=when))
                    resolved_ephemeris["hgln_obs_deg"] = float(obs_hgs.lon.to_value(u.deg))
                    resolved_ephemeris["hglt_obs_deg"] = float(obs_hgs.lat.to_value(u.deg))
                    resolved_ephemeris["dsun_cm"] = float(coord.radius.to_value(u.cm))
                except Exception:
                    pass
    if "rsun_cm" not in resolved_ephemeris:
        refmaps = box_data.get("refmaps", {}) if isinstance(box_data, dict) else {}
        for key in ("Bz_reference", "Ic_reference"):
            payload = refmaps.get(key) if isinstance(refmaps, dict) else None
            if not isinstance(payload, dict):
                continue
            header_text = payload.get("wcs_header")
            if header_text is None:
                continue
            try:
                text = _decode_id_text(header_text).replace("\\n", "\n")
                header = fits.Header.fromstring(text, sep="\n")
                if "RSUN_REF" in header:
                    resolved_ephemeris["rsun_cm"] = float(u.Quantity(header["RSUN_REF"], u.m).to_value(u.cm))
                    break
            except Exception:
                continue
    if not resolved_ephemeris and isinstance(ephemeris, dict):
        resolved_ephemeris = {
            key: ephemeris[key]
            for key in ("obs_date", "obs_time", "hgln_obs_deg", "hglt_obs_deg", "dsun_cm", "rsun_cm")
            if key in ephemeris
        }
    if needs_custom_ephemeris or display_observer_key == "custom":
        if resolved_ephemeris:
            observer["ephemeris"] = resolved_ephemeris
            pb0r = build_pb0r_metadata_from_ephemeris(
                resolved_ephemeris,
                observer_key="custom",
                obs_time=resolved_ephemeris.get("obs_date"),
            )
            if pb0r:
                observer["pb0r"] = pb0r
            else:
                observer.pop("pb0r", None)
        else:
            observer.pop("ephemeris", None)
            observer.pop("pb0r", None)
    elif resolved_ephemeris:
        observer["ephemeris"] = resolved_ephemeris
        pb0r = build_pb0r_metadata_from_ephemeris(
            resolved_ephemeris,
            observer_key=observer.get("name"),
            obs_time=resolved_ephemeris.get("obs_date"),
        )
        if pb0r:
            observer["pb0r"] = pb0r
    else:
        observer.pop("ephemeris", None)
        observer.pop("pb0r", None)
    box_data["observer"] = observer
    if isinstance(line_seeds, dict):
        box_data["line_seeds"] = line_seeds
    else:
        box_data.pop("line_seeds", None)
    
    # Save with contract persistence via centralized model.io loader
    save_model(box_data, dest)
    return True


[docs] def main() -> int: parser = argparse.ArgumentParser(description="Open the FOV / Box selector from a saved .h5/.sav box file.") parser.add_argument("entry_box", nargs="?", help="Path to the saved .h5 or .sav box file.") parser.add_argument("--pick", action="store_true", help="Open a file picker even if a path is provided.") parser.add_argument("--dir", dest="start_dir", help="Initial directory for the file picker.") parser.add_argument( "--refmaps-path", "--ref-map-path", action="append", dest="refmaps_path", default=[], help=( "Additional FITS file or directory of FITS files to expose as " "filesystem context maps. May be passed more than once." ), ) args = parser.parse_args() app = QApplication.instance() owns_app = False if app is None: app = QApplication([]) owns_app = True entry_arg = args.entry_box if args.pick or not entry_arg: picked = _pick_entry_file(entry_arg, args.start_dir) if not picked: return 0 entry_arg = picked entry_path = Path(entry_arg).expanduser().resolve() try: session_input = _build_session_input(entry_path, ref_map_paths=args.refmaps_path) except Exception as exc: QMessageBox.critical( None, "Incompatible Model", str(exc), ) return 2 dialog = FovBoxSelectorDialog(session_input=session_input, entry_box_path=entry_path) dialog.setWindowTitle(f"FOV / Box Selector - {entry_path.name}") def _on_save_as_clicked() -> None: out_path = _pick_save_as_h5_path( dialog, default_stem=entry_path.stem, ) if out_path is None: return try: result = dialog.current_selection_snapshot() line_seeds = dialog.committed_line_seeds() fov_box = dialog.current_fov_box_selection() observer_state = dialog.current_observer_persistence_state() _persist_selector_result_to_entry( entry_path, result, line_seeds=line_seeds, fov_box=fov_box, observer_state=observer_state, output_path=out_path, ) QMessageBox.information( dialog, "Model Saved", f"Saved updated model to:\n{out_path}", ) except Exception as exc: QMessageBox.warning( dialog, "Save Failed", f"Failed to save model to {out_path}:\n{exc}", ) dialog.set_save_as_callback(_on_save_as_clicked, text="Save As") dialog.set_accept_button_text("Apply && Close") def _persist_result_if_needed() -> None: if dialog.result() != QDialog.Accepted: return result = dialog.accepted_selection() if result is None: return line_seeds = dialog.committed_line_seeds() fov_box = dialog.current_fov_box_selection() observer_state = dialog.current_observer_persistence_state() if entry_path.suffix.lower() != ".h5": btn = QMessageBox.question( dialog, "Save As Required", "This model cannot be updated in place.\n\n" "Save the updated model (with your FOV changes) to a new .h5 file?", QMessageBox.Save | QMessageBox.Discard, QMessageBox.Save, ) if btn != QMessageBox.Save: return out_path = _pick_save_as_h5_path( dialog, default_stem=entry_path.stem, ) if out_path is None: return try: _persist_selector_result_to_entry( entry_path, result, line_seeds=line_seeds, fov_box=fov_box, observer_state=observer_state, output_path=out_path, ) except Exception as exc: QMessageBox.warning( dialog, "Save Failed", f"Failed to save model to {out_path}:\n{exc}", ) return try: _persist_selector_result_to_entry( entry_path, result, line_seeds=line_seeds, fov_box=fov_box, observer_state=observer_state, ) except Exception as exc: QMessageBox.warning( dialog, "Save Failed", f"Failed to write updated observer FOV metadata:\n{exc}", ) if owns_app: dialog.finished.connect(lambda _code: (_persist_result_if_needed(), app.quit())) dialog.show() dialog.raise_() dialog.activateWindow() app.exec_() else: accepted = dialog.exec_() == QDialog.Accepted if accepted: _persist_result_if_needed() return 0
if __name__ == "__main__": raise SystemExit(main())