Source code for pyampp.gxbox.box_view2d
from __future__ import annotations
import copy
import logging
from dataclasses import dataclass
from pathlib import Path
import threading
from types import SimpleNamespace
from typing import Iterable, Optional
import numpy as np
import astropy.units as u
import matplotlib.colors as mcolors
from astropy.io import fits
from astropy.coordinates import SkyCoord
from astropy.time import Time
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas, NavigationToolbar2QT as NavigationToolbar
from matplotlib.collections import LineCollection
from matplotlib.figure import Figure
from matplotlib.patches import Rectangle
from PyQt5.QtCore import Qt, QTimer
from PyQt5.QtGui import QFont, QIcon
from PyQt5.QtCore import QSize
from PyQt5.QtWidgets import (
QButtonGroup,
QHBoxLayout,
QLabel,
QStyle,
QSizePolicy,
QToolButton,
QVBoxLayout,
QWidget,
)
from sunpy.map import Map, make_fitswcs_header
from sunpy.coordinates import (
Heliocentric,
HeliographicCarrington,
HeliographicStonyhurst,
Helioprojective,
SphericalScreen,
sun,
)
from sunpy.visualization import colormaps as sunpy_colormaps
from pyampp.geometry import (
build_fov_box_from_red_box_world,
build_fov_box_from_user_hpc_and_red_box_world,
local_cartesian_to_world,
observer_fov_box_to_world_corners,
observer_rectangle_to_hpc_corners,
project_box_front_face_to_observer_hpc,
project_coordinate_edges_to_observer_hpc,
project_world_to_observer_hpc,
project_world_to_pixel,
)
from .box import Box
from .boxutils import load_sunpy_map_compat, map_from_data_header_compat
from .observer_restore import (
build_ephemeris_from_pb0r,
build_pb0r_metadata_from_ephemeris,
resolve_observer_parameters_from_ephemeris,
resolve_observer_with_info,
)
from .selector_api import BoxGeometrySelection, CoordMode, DisplayFovBoxSelection, DisplayFovSelection, SelectorSessionInput
logging.getLogger("sunpy").setLevel(logging.WARNING)
_CONTEXT_DISPLAY_MAP_ALIASES = {
"Bz": "magnetogram",
"Ic": "continuum",
"B_rho": "field",
"B_theta": "inclination",
"B_phi": "azimuth",
"disambig": "disambig",
# Backward-compatible legacy IDs now mapped to measured HPC products.
"Br": "field",
"Bp": "inclination",
"Bt": "azimuth",
}
_BOTTOM_DISPLAY_MAP_ALIASES = {
"Bx": "bx",
"By": "by",
"Bz": "bz",
"Ic": "ic",
"chromo_mask": "chromo_mask",
"Chromo_mask": "chromo_mask",
"Vert_current": "vert_current",
"vert_current": "vert_current",
}
_HMI_VECTOR_SEGMENTS = ("field", "inclination", "azimuth", "disambig")
_HMI_DISPLAY_KEYS = {"magnetogram", "continuum", "field", "inclination", "azimuth", "disambig"}
_SIGNED_MAGNETIC_KEYS = {"magnetogram", "bx", "by", "bz"}
_TRANSVERSE_MAGNETIC_KEYS = set()
_VERT_CURRENT_KEYS = {"Vert_current", "vert_current"}
_CHROMO_MASK_KEYS = {"chromo_mask"}
_AIA_REFERENCE_IDS = ("171", "193", "211", "304", "335", "1600", "1700", "131", "94")
_HMI_VECTOR_DISPLAY_KEYS = {"field", "inclination", "azimuth", "disambig"}
_AIA_COLOR_KEYS = {"94", "131", "1600", "1700", "171", "193", "211", "304", "335"}
_EOVSA_REFMAP_PREFIX = "EOVSA_"
_BOTTOM_OVERLAY_CONTEXT_KEYS = _AIA_COLOR_KEYS | _HMI_VECTOR_DISPLAY_KEYS
_EMBEDDED_REFMAP_FLAG = "PYEMBED"
_BOX_EDGE_INDEX_PAIRS = (
(0, 1), (1, 3), (3, 2), (2, 0),
(4, 5), (5, 7), (7, 6), (6, 4),
(0, 4), (1, 5), (2, 6), (3, 7),
)
_DISPLAY_OBSERVER_OPTIONS = (
("earth", "Earth"),
("sdo", "SDO"),
("solar orbiter", "Solar Orbiter"),
("stereo-a", "STEREO-A"),
("stereo-b", "STEREO-B"),
)
_DISPLAY_OBSERVER_LABELS = {
**dict(_DISPLAY_OBSERVER_OPTIONS),
"custom": "Custom",
}
_DISPLAY_OBSERVER_HORIZONS = {
"solar orbiter": "Solar Orbiter",
"stereo-a": "STEREO-A",
"stereo-b": "STEREO-B",
}
def _prepare_model_for_viewer(*args, **kwargs):
from .view_h5 import prepare_model_for_viewer
return prepare_model_for_viewer(*args, **kwargs)
def _viewer_camera_basis(*args, **kwargs):
from .view_h5 import _viewer_camera_vectors
return _viewer_camera_vectors(*args, **kwargs)
def _generate_streamlines_from_seeds(*args, **kwargs):
from .box_view3d import generate_streamlines_from_line_seeds
return generate_streamlines_from_line_seeds(*args, **kwargs)
def _magfield_viewer_cls():
from .box_view3d import MagFieldViewer
return MagFieldViewer
@dataclass
[docs]
class MapBoxViewState:
"""
Reusable state container for focused map+box visualization tools.
This is intentionally lightweight and plotting-backend agnostic so it can be
reused by multiple future GUIs (FOV selector, box inspector, model preview, ...).
"""
class _SquareCanvasHost(QWidget):
"""Keep the embedded plot canvas left-aligned with a fixed-width rectangular host."""
def __init__(self, parent: Optional[QWidget] = None):
super().__init__(parent)
self._canvas = None
self.setMinimumWidth(600)
self.setMaximumWidth(600)
self.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Expanding)
def set_canvas(self, canvas: QWidget) -> None:
self._canvas = canvas
self._canvas.setParent(self)
self._canvas.show()
self._reposition_canvas()
def resizeEvent(self, event):
super().resizeEvent(event)
self._reposition_canvas()
def _reposition_canvas(self) -> None:
if self._canvas is None:
return
w, h = self.width(), self.height()
x = 0
self._canvas.setGeometry(x, 0, max(1, w), max(1, h))
[docs]
class MapBoxDisplayWidget(QWidget):
"""
Reusable widget shell for map display + interactive box overlays.
Current implementation provides:
- SunPy map plotting using the map's native WCS projection
- a static box-outline overlay derived from the current geometry state
- a stable API for future drag/resize interaction layers
"""
def __init__(self, parent: Optional[QWidget] = None):
super().__init__(parent)
self._svg_dir = self._resolve_svg_dir()
self._state: Optional[MapBoxViewState] = None
self._geometry_change_callback = None
self._map_summary_cache: dict[str, str] = {}
self._loaded_map_cache = {}
self._prepared_context_map_cache = {}
self._raw_map_cache = {}
self._cache_lock = threading.RLock()
self._background_cache_enabled = False
self._background_cache_generation = 0
self._background_cache_thread = None
self._current_map = None
self._current_axes = None
self._overlay_rect = None
self._overlay_bbox_rect = None
self._projected_box_bbox_rect = None
self._projected_box_fov = None
self._overlay_center_artist = None
self._overlay_corner_artists = []
self._overlay_line_artists = []
self._drag_preview_box_artist = None
self._drag_preview_fov_artist = None
self._drag_preview_center_artist = None
self._drag_preview_background = None
self._drag_preview_active = False
self._drag_preview_geometry = None
self._zoom_anchor_px: tuple[float, float] | None = None
self._drag_state = None
self._entry_box_path: Optional[Path] = None
self._viewer3d = None
self._viewer3d_temp_h5_path: Optional[Path] = None
self._viewer3d_watchdog = QTimer(self)
self._viewer3d_watchdog.setInterval(400)
self._viewer3d_watchdog.timeout.connect(self._check_viewer3d_state)
self._hidden_for_live_3d = False
self._viewer3d_close_handled = False
self._committed_line_seeds = None
self._session_box_template = None
self._session_obs_time = None
self._session_b3dtype = None
self._session_temp_h5_path: Optional[Path] = None
self._session_model_loaded = False
self._fieldline_frame_hcc = None
self._fieldline_frame_obs = None
self._fieldline_streamlines = []
self._fieldline_z_base = 0.0
self._fieldline_artists = []
self._map_info_callback = None
self._status_callback = None
self._fov_change_callback = None
self._observer_info_callback = None
self._observer_coord_cache: dict[str, SkyCoord] = {}
self._observer_metadata_cache: dict[tuple[str, ...], dict] = {}
self._observer_warning_cache: set[str] = set()
self._observer_refresh_serial = 0
self._available_observer_keys_override: set[str] | None = None
self._observer_availability_notice: str | None = None
self._pending_launch_margin_fix = False
self._last_map_info_text = "Map info: <uninitialized>"
self._last_status_base_text = "Map/box display initialized"
self._last_status_text = "Map/box display initialized"
self._last_context_summary_text = "Context map: <uninitialized>"
self._last_bottom_summary_text = "Base map: <uninitialized>"
self._prep_trace_counts: dict[str, int] = {}
self._prep_trace_order: list[str] = []
self._view_mode = "box_fov"
self._full_view_limits = None
self._interaction_mode = "auto"
self._mouse_actions_enabled = False
self._geometry_edit_enabled = True
self._action_state_callback = None
self._fig = Figure(figsize=(7.5, 5.0))
self._canvas = FigureCanvas(self._fig)
self._canvas.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self._cid_press = self._canvas.mpl_connect("button_press_event", self._on_mouse_press)
self._cid_move = self._canvas.mpl_connect("motion_notify_event", self._on_mouse_move)
self._cid_release = self._canvas.mpl_connect("button_release_event", self._on_mouse_release)
self._cid_scroll = self._canvas.mpl_connect("scroll_event", self._on_scroll)
self._canvas_host = _SquareCanvasHost()
self._canvas_host.set_canvas(self._canvas)
self._nav_toolbar = NavigationToolbar(self._canvas, self)
self._nav_toolbar.setMinimumWidth(600)
self._nav_toolbar.setMaximumWidth(600)
self._nav_toolbar.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Fixed)
self.show_loading_placeholder("Preparing viewer data...\nPlease wait.")
self._full_view_btn = self._make_svg_button("expand.svg", "Full Sun View", self.show_full_sun_view)
self._box_view_btn = self._make_svg_button("shrink.svg", "Zoom canvas to image FOV", self.show_box_fov_view)
self._recompute_fov_btn = self._make_svg_button("rectangle-horizontal.svg", "Recompute image FOV from projected 3D box", self.recompute_fov_from_box)
self._control_mode_label = QLabel("BOX Controls")
self._left_btn = self._make_svg_button("arrow-left.svg", "Move box center left", lambda: self._nudge_primary_center("x", -1))
self._right_btn = self._make_svg_button("arrow-right.svg", "Move box center right", lambda: self._nudge_primary_center("x", +1))
self._down_btn = self._make_svg_button("arrow-down.svg", "Move box center down", lambda: self._nudge_primary_center("y", -1))
self._up_btn = self._make_svg_button("arrow-up.svg", "Move box center up", lambda: self._nudge_primary_center("y", +1))
self._x_minus_btn = self._make_svg_button("shrink-horizontal.svg", "Decrease X box size", lambda: self._nudge_primary_size("x", -1))
self._x_plus_btn = self._make_svg_button("expand-horizontal.svg", "Increase X box size", lambda: self._nudge_primary_size("x", +1))
self._y_minus_btn = self._make_svg_button("shrink-vertical.svg", "Decrease Y box size", lambda: self._nudge_primary_size("y", -1))
self._y_plus_btn = self._make_svg_button("expand-vertical.svg", "Increase Y box size", lambda: self._nudge_primary_size("y", +1))
self._xy_minus_btn = self._make_svg_button("shrink.svg", "Decrease X and Y box size together", lambda: self._nudge_primary_size_xy(-1))
self._xy_plus_btn = self._make_svg_button("expand.svg", "Increase X and Y box size together", lambda: self._nudge_primary_size_xy(+1))
self._zoom_in_btn = self._make_svg_button("zoom-in.svg", "Zoom In (centered on image FOV)", lambda: self._scale_view(1 / 1.25))
self._zoom_out_btn = self._make_svg_button("zoom-out.svg", "Zoom Out (centered on image FOV)", lambda: self._scale_view(1.25))
self._can_open_3d = False
self._can_clear_lines = False
zoom_label = QLabel("Zoom Controls")
zoom_toolbar = QHBoxLayout()
zoom_toolbar.setContentsMargins(0, 0, 0, 0)
zoom_toolbar.setSpacing(4)
zoom_toolbar.addWidget(zoom_label)
zoom_toolbar.addSpacing(8)
zoom_toolbar.addWidget(self._full_view_btn)
zoom_toolbar.addWidget(self._box_view_btn)
zoom_toolbar.addWidget(self._recompute_fov_btn)
zoom_toolbar.addSpacing(8)
zoom_toolbar.addWidget(self._zoom_in_btn)
zoom_toolbar.addWidget(self._zoom_out_btn)
zoom_toolbar.addStretch()
control_toolbar = QHBoxLayout()
control_toolbar.setContentsMargins(0, 0, 0, 0)
control_toolbar.setSpacing(4)
control_toolbar.addWidget(self._control_mode_label)
control_toolbar.addSpacing(8)
control_toolbar.addWidget(self._left_btn)
control_toolbar.addWidget(self._right_btn)
control_toolbar.addWidget(self._down_btn)
control_toolbar.addWidget(self._up_btn)
control_toolbar.addSpacing(8)
control_toolbar.addWidget(self._x_minus_btn)
control_toolbar.addWidget(self._x_plus_btn)
control_toolbar.addWidget(self._y_minus_btn)
control_toolbar.addWidget(self._y_plus_btn)
control_toolbar.addWidget(self._xy_minus_btn)
control_toolbar.addWidget(self._xy_plus_btn)
control_toolbar.addStretch()
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(2)
layout.addLayout(zoom_toolbar)
layout.addLayout(control_toolbar)
layout.addWidget(self._canvas_host, 0, Qt.AlignLeft)
nav_row = QHBoxLayout()
nav_row.setContentsMargins(0, 0, 0, 0)
nav_row.setSpacing(0)
nav_row.addWidget(self._nav_toolbar, 0, Qt.AlignLeft)
nav_row.addStretch()
layout.addLayout(nav_row)
self._refresh_control_mode_ui()
[docs]
def show_loading_placeholder(self, message: str = "Preparing viewer data...") -> None:
self._fig.clear()
ax = self._fig.add_subplot(111)
ax.text(0.5, 0.54, message, ha="center", va="center", fontsize=13)
ax.text(
0.5,
0.42,
"The viewer will populate when maps are ready.",
ha="center",
va="center",
fontsize=10,
alpha=0.7,
)
ax.axis("off")
self._fig.subplots_adjust(left=0.04, right=0.96, bottom=0.06, top=0.94)
self._canvas.draw_idle()
[docs]
def showEvent(self, event):
super().showEvent(event)
if self._pending_launch_margin_fix:
QTimer.singleShot(0, self._post_show_margin_refresh)
def _post_show_margin_refresh(self) -> None:
if not self._pending_launch_margin_fix:
return
ax = self._current_axes
if ax is None:
return
self._pending_launch_margin_fix = False
adjusted = self._auto_adjust_axes_margins(ax, top=0.93, pad_px=10.0)
self._render_fieldlines()
if adjusted:
self._canvas.draw()
else:
self._canvas.draw_idle()
def _make_mode_button(self, text: str, mode: str, checked: bool = False) -> QToolButton:
btn = QToolButton(self)
btn.setCheckable(True)
btn.setAutoRaise(False)
btn.setIconSize(QSize(32, 32))
btn.setToolButtonStyle(Qt.ToolButtonIconOnly)
icon, fallback = self._mode_button_icon(mode)
btn.setIcon(icon)
if icon.isNull():
btn.setText(fallback)
btn.setToolButtonStyle(Qt.ToolButtonTextOnly)
btn.setToolTip(text)
btn.setCheckable(True)
btn.setChecked(checked)
btn.clicked.connect(lambda _=False, m=mode: self._set_interaction_mode(m))
self._mode_group.addButton(btn)
return btn
def _make_glyph_button(self, glyph: str, tooltip: str, callback) -> QToolButton:
btn = QToolButton(self)
btn.setText(glyph)
btn.setToolTip(tooltip)
btn.setToolButtonStyle(Qt.ToolButtonTextOnly)
btn.setAutoRaise(False)
btn.setFixedSize(24, 24)
f = QFont(btn.font())
f.setPointSize(10)
btn.setFont(f)
btn.clicked.connect(callback)
return btn
def _make_text_button(self, text: str, tooltip: str, callback) -> QToolButton:
btn = QToolButton(self)
btn.setText(text)
btn.setToolTip(tooltip)
btn.setToolButtonStyle(Qt.ToolButtonTextOnly)
btn.setAutoRaise(False)
btn.setMinimumHeight(24)
btn.clicked.connect(callback)
return btn
def _make_svg_button(self, svg_name: str, tooltip: str, callback) -> QToolButton:
btn = QToolButton(self)
btn.setToolTip(tooltip)
btn.setAutoRaise(False)
btn.setFixedSize(32, 32)
btn.setIconSize(QSize(20, 20))
icon_path = self._svg_dir / svg_name
if icon_path.exists():
btn.setIcon(QIcon(str(icon_path)))
else:
btn.setText("?")
btn.setToolButtonStyle(Qt.ToolButtonTextOnly)
btn.clicked.connect(callback)
return btn
@staticmethod
def _resolve_svg_dir() -> Path:
here = Path(__file__).resolve()
candidates = [
here.parents[2] / "docs" / "svg",
here.parents[1] / "docs" / "svg",
Path.cwd() / "docs" / "svg",
]
for candidate in candidates:
try:
if candidate.exists():
return candidate
except Exception:
continue
return candidates[0]
def _make_icon_button(self, icon_names, tooltip, callback, fallback_text="") -> QToolButton:
btn = QToolButton(self)
btn.setAutoRaise(False)
btn.setIconSize(QSize(32, 32))
btn.setToolButtonStyle(Qt.ToolButtonIconOnly)
icon = self._theme_icon(icon_names)
if icon.isNull():
icon = self._fallback_std_icon()
if not icon.isNull():
btn.setIcon(icon)
else:
btn.setText(fallback_text)
btn.setToolButtonStyle(Qt.ToolButtonTextOnly)
btn.setToolTip(tooltip)
btn.clicked.connect(callback)
return btn
def _mode_button_icon(self, mode: str):
mapping = {
"auto": (["transform-move"], "â—Ž"),
"move": (["transform-move"], "✛"),
"resize_xy": (["transform-scale"], "⤡"),
"resize_x": (["object-flip-horizontal"], "↔"),
"resize_y": (["object-flip-vertical"], "↕"),
}
names, fallback = mapping.get(mode, ([], mode))
icon = self._theme_icon(names)
if icon.isNull():
# Use standard cursor-like fallback icons where possible.
if mode == "move":
icon = self.style().standardIcon(QStyle.SP_ArrowUp)
elif mode in {"resize_x", "resize_y", "resize_xy"}:
icon = self.style().standardIcon(QStyle.SP_TitleBarShadeButton)
return icon, fallback
@staticmethod
def _theme_icon(names) -> QIcon:
for name in names:
icon = QIcon.fromTheme(name)
if not icon.isNull():
return icon
return QIcon()
def _fallback_std_icon(self) -> QIcon:
try:
return self.style().standardIcon(QStyle.SP_ArrowRight)
except Exception:
return QIcon()
def _set_interaction_mode(self, mode: str) -> None:
self._interaction_mode = mode
self._refresh_status_text()
self._update_cursor_for_mode()
@staticmethod
def _normalize_observer_key(observer_key: str | None) -> str:
raw = observer_key
if isinstance(raw, (bytes, bytearray)):
raw = raw.decode("utf-8", "ignore")
if isinstance(raw, np.ndarray) and raw.shape == ():
raw = raw.item()
if isinstance(raw, (bytes, bytearray)):
raw = raw.decode("utf-8", "ignore")
key = str(raw or "earth").strip().lower()
aliases = {
"custom": "custom",
"sdo": "sdo",
"sdo/aia": "sdo",
"sdo/hmi": "sdo",
"earth": "earth",
"solo": "solar orbiter",
"solar-orbiter": "solar orbiter",
"solarorbiter": "solar orbiter",
"solar orbiter": "solar orbiter",
"stereo a": "stereo-a",
"stereo-a": "stereo-a",
"stereoa": "stereo-a",
"stereo b": "stereo-b",
"stereo-b": "stereo-b",
"stereob": "stereo-b",
}
return aliases.get(key, "earth")
@staticmethod
def _observer_label_for_key(observer_key: str | None) -> str:
return _DISPLAY_OBSERVER_LABELS.get(
MapBoxDisplayWidget._normalize_observer_key(observer_key),
"Earth",
)
def _enabled_observer_keys(self) -> set[str]:
allow_only_earth = self._entry_box_path is None
enabled = {
key for key, _label in _DISPLAY_OBSERVER_OPTIONS
if (not allow_only_earth) or key == "earth"
}
if self._available_observer_keys_override is not None:
enabled &= set(self._available_observer_keys_override)
enabled.add("earth")
return enabled
def _observer_source_b3d(self) -> dict:
source_b3d = getattr(self._session_box_template, "b3d", None)
if isinstance(source_b3d, dict):
return source_b3d
if self._state is None:
return {}
payload: dict = {}
if isinstance(self._state.refmaps, dict) and self._state.refmaps:
payload["refmaps"] = self._state.refmaps
return payload
def _normalize_display_observer_state(self) -> None:
if self._state is None:
return
enabled_keys = self._enabled_observer_keys()
if (
"earth" in enabled_keys
and self._state.display_observer_key not in enabled_keys
and self._normalize_observer_key(self._state.display_observer_key) != "custom"
):
self._state.display_observer_key = "earth"
[docs]
def set_display_observer_key(self, observer_key: str | None) -> None:
if self._state is None:
return
key = self._normalize_observer_key(observer_key)
enabled_keys = self._enabled_observer_keys()
if "earth" in enabled_keys and key not in enabled_keys and key != "custom":
key = "earth"
if key == self._state.display_observer_key:
self._normalize_display_observer_state()
self._emit_observer_info()
return
self._state.display_observer_key = key
self._normalize_display_observer_state()
self._refresh_status_text()
self._emit_observer_info()
preserve_current_view = self._should_preserve_pixel_view()
if self._view_mode == "box_fov":
preserve_current_view = False
self._schedule_observer_refresh(
preserve_current_view=preserve_current_view,
align_projected_fov=(self._view_mode == "box_fov"),
)
[docs]
def set_custom_display_observer_pb0r(
self,
*,
b0_deg,
l0_deg,
rsun_arcsec,
obs_date=None,
rsun_cm=None,
label: str | None = None,
source: str | None = None,
) -> bool:
if self._state is None:
return False
ephemeris = build_ephemeris_from_pb0r(
b0_deg=b0_deg,
l0_deg=l0_deg,
rsun_arcsec=rsun_arcsec,
obs_date=obs_date,
rsun_cm=rsun_cm,
)
if ephemeris is None:
return False
self._state.custom_observer_ephemeris = ephemeris
if label is not None:
self._state.custom_observer_label = str(label).strip() or "Custom"
elif not self._state.custom_observer_label:
self._state.custom_observer_label = "Custom"
if source is not None:
self._state.custom_observer_source = str(source).strip() or None
self._state.display_observer_key = "custom"
self._normalize_display_observer_state()
self._refresh_status_text()
self._emit_observer_info()
preserve_current_view = self._should_preserve_pixel_view()
if self._view_mode == "box_fov":
preserve_current_view = False
self._schedule_observer_refresh(
preserve_current_view=preserve_current_view,
align_projected_fov=(self._view_mode == "box_fov"),
)
return True
[docs]
def set_custom_observer_identity(self, *, label: str | None = None, source: str | None = None) -> None:
if self._state is None:
return
changed = False
if label is not None:
normalized_label = str(label).strip() or "Custom"
if normalized_label != (self._state.custom_observer_label or ""):
self._state.custom_observer_label = normalized_label
changed = True
if source is not None:
normalized_source = str(source).strip() or None
if normalized_source != self._state.custom_observer_source:
self._state.custom_observer_source = normalized_source
changed = True
if changed:
self._refresh_status_text()
self._emit_observer_info()
def _schedule_observer_refresh(self, *, preserve_current_view: bool, align_projected_fov: bool) -> None:
self._observer_refresh_serial += 1
serial = self._observer_refresh_serial
def _run() -> None:
if serial != self._observer_refresh_serial:
return
self._refresh_plot(preserve_current_view=preserve_current_view)
if align_projected_fov and self._view_mode == "box_fov":
try:
self._set_view_to_projected_fov(pad_factor=1.10)
self._canvas.draw_idle()
except Exception:
pass
QTimer.singleShot(0, _run)
@staticmethod
def _obstime_for_map(smap, fallback_iso: str | None = None):
obstime = getattr(smap, "date", None)
if obstime is not None:
return obstime
if fallback_iso:
try:
return Time(fallback_iso)
except Exception:
return None
return None
@staticmethod
def _observer_cache_number(value, digits: int = 6) -> str:
try:
number = float(value)
except Exception:
return ""
if not np.isfinite(number):
return ""
return f"{number:.{digits}f}"
def _custom_observer_metadata_token(self) -> tuple[str, ...]:
ephemeris = self._state.custom_observer_ephemeris if self._state is not None else None
if not isinstance(ephemeris, dict):
return ("", "", "", "", "")
return (
str(ephemeris.get("obs_date", ephemeris.get("obs_time", "")) or ""),
self._observer_cache_number(ephemeris.get("hgln_obs_deg")),
self._observer_cache_number(ephemeris.get("hglt_obs_deg")),
self._observer_cache_number(ephemeris.get("dsun_cm"), digits=1),
self._observer_cache_number(ephemeris.get("rsun_cm"), digits=1),
)
def _observer_metadata_cache_key(self, observer_key: str | None, obstime) -> tuple[str, ...] | None:
key = self._normalize_observer_key(observer_key)
if obstime is None:
return None
when = obstime if isinstance(obstime, Time) else Time(obstime)
cache_key: tuple[str, ...] = (key, when.isot)
if key == "custom":
cache_key += self._custom_observer_metadata_token()
return cache_key
def _resolve_display_observer_metadata(self, observer_key: str | None, obstime) -> dict | None:
key = self._normalize_observer_key(observer_key)
if obstime is None:
return None
when = obstime if isinstance(obstime, Time) else Time(obstime)
cache_key = self._observer_metadata_cache_key(key, when)
if cache_key is not None and cache_key in self._observer_metadata_cache:
return self._observer_metadata_cache[cache_key]
metadata = None
if key == "custom":
ephemeris = self._state.custom_observer_ephemeris if self._state is not None else None
metadata = resolve_observer_parameters_from_ephemeris(
ephemeris,
observer_key="custom",
obs_time=when,
)
if metadata is None:
return None
coord = metadata.get("observer_coordinate")
if coord is not None:
self._observer_coord_cache[key] = coord
else:
coord = self._observer_coord_cache.get(key)
warning = None
used_key = key
if coord is None:
source_b3d = self._observer_source_b3d()
coord, warning, used_key = resolve_observer_with_info(
source_b3d if isinstance(source_b3d, dict) else {},
key,
when,
)
if coord is None and key == "sdo":
raw_map = None
try:
raw_map = self._load_raw_map(
self._canonical_map_key(
self._state.selected_context_id if self._state is not None else None,
purpose="context",
),
purpose="context",
) if (self._state is not None and self._state.selected_context_id) else None
except Exception:
raw_map = None
if raw_map is None:
try:
raw_map = self._reference_context_map()
except Exception:
raw_map = None
if raw_map is not None:
try:
raw_observer = getattr(raw_map, "observer_coordinate", None)
if raw_observer is not None:
coord = raw_observer.transform_to(HeliographicStonyhurst(obstime=when))
warning = None
used_key = "sdo"
except Exception:
pass
if warning and key not in self._observer_warning_cache:
self._observer_warning_cache.add(key)
self._last_status_text = warning
if self._status_callback is not None:
self._status_callback(warning)
if coord is not None:
self._observer_coord_cache[key] = coord
if coord is None:
return None
try:
hgs = coord.transform_to(HeliographicStonyhurst(obstime=when))
except Exception:
hgs = coord
metadata = {
"observer_coordinate": coord,
"observer_key": used_key,
"obs_time": when,
"b0_deg": float(hgs.lat.to_value(u.deg)),
"l0_deg": float(hgs.lon.to_value(u.deg)),
"p_deg": float(sun.P(when).to_value(u.deg)) if used_key == "earth" else None,
"dsun_cm": float(coord.radius.to_value(u.cm)),
"rsun_cm": None,
"rsun_arcsec": None,
"source": "session",
}
observer = metadata.get("observer_coordinate") if isinstance(metadata, dict) else None
if observer is None:
return None
try:
hgs = observer.transform_to(HeliographicStonyhurst(obstime=when))
metadata["los_signature"] = (
f"hgs:{when.isot}:"
f"{float(hgs.lon.to_value(u.deg)):.6f}:"
f"{float(hgs.lat.to_value(u.deg)):.6f}"
)
except Exception:
metadata["los_signature"] = key
if cache_key is not None:
self._observer_metadata_cache[cache_key] = metadata
return metadata
def _resolve_display_observer_coord(self, observer_key: str | None, obstime) -> SkyCoord | None:
metadata = self._resolve_display_observer_metadata(observer_key, obstime)
if metadata is None:
return None
return metadata.get("observer_coordinate")
def _observer_context(self, observer_key: str | None, obstime):
coord = self._resolve_display_observer_coord(observer_key, obstime)
if coord is None:
return None
return SimpleNamespace(observer_coordinate=coord, date=obstime)
def _resolved_observer_for_map(self, smap, observer_key: str | None = None):
obstime = getattr(smap, "date", None) if smap is not None else None
key = observer_key
if key is None and self._state is not None:
key = self._state.display_observer_key
context = self._observer_context(key, obstime)
if context is not None and getattr(context, "observer_coordinate", None) is not None:
return getattr(context, "observer_coordinate")
return getattr(smap, "observer_coordinate", None) if smap is not None else None
def _display_observer_cache_token(self, smap, observer_key: str | None = None) -> str:
key = self._normalize_observer_key(
observer_key if observer_key is not None else (
self._state.display_observer_key if self._state is not None else "earth"
)
)
if smap is None:
return key
obstime = self._obstime_for_map(smap, self._state.session_input.time_iso if self._state is not None else None)
metadata = self._resolve_display_observer_metadata(key, obstime)
if metadata is None:
return key
return str(metadata.get("los_signature") or key)
def _observers_share_los(self, observer_key_a: str | None, observer_key_b: str | None, obstime) -> bool:
meta_a = self._resolve_display_observer_metadata(observer_key_a, obstime)
meta_b = self._resolve_display_observer_metadata(observer_key_b, obstime)
if meta_a is None or meta_b is None:
return self._normalize_observer_key(observer_key_a) == self._normalize_observer_key(observer_key_b)
return str(meta_a.get("los_signature") or "") == str(meta_b.get("los_signature") or "")
def _reproject_map_for_display_observer(
self,
smap,
*,
fov_override: DisplayFovSelection | None = None,
):
if self._state is None:
return smap, None
display_key = self._normalize_observer_key(self._state.display_observer_key)
if display_key == "earth":
return smap, None
obstime = self._obstime_for_map(smap, self._state.session_input.time_iso)
observer = self._resolve_display_observer_coord(display_key, obstime)
if observer is None:
return smap, None
try:
current_observer = getattr(smap, "observer_coordinate", None)
if current_observer is not None:
same_lon = np.isclose(
float(current_observer.lon.to_value(u.deg)),
float(observer.lon.to_value(u.deg)),
atol=1e-6,
)
same_lat = np.isclose(
float(current_observer.lat.to_value(u.deg)),
float(observer.lat.to_value(u.deg)),
atol=1e-6,
)
if same_lon and same_lat:
return smap, None
except Exception:
pass
ny, nx = np.asarray(smap.data).shape[:2]
cx = 0.5 * max(0, nx - 1)
cy = 0.5 * max(0, ny - 1)
try:
center_world = smap.wcs.pixel_to_world(cx, cy)
source_label = str(
getattr(smap, "detector", None)
or getattr(smap, "observatory", None)
or getattr(smap, "nickname", None)
or "map"
)
target_fov = fov_override if fov_override is not None else self._current_display_prepare_fov(
display_key,
obstime=obstime,
)
header = self._display_observer_reproject_header_for_selection(smap, observer, obstime, target_fov)
if header is not None:
self._record_prepare_event(f"observer reproj: {source_label} -> {display_key} [roi]")
coverage_fov = target_fov
else:
target_center = center_world.transform_to(Helioprojective(observer=observer, obstime=obstime))
self._record_prepare_event(f"observer reproj: {source_label} -> {display_key} [full]")
header = make_fitswcs_header(
smap.data,
target_center,
scale=u.Quantity([
abs(smap.scale.axis1.to_value(u.arcsec / u.pix)),
abs(smap.scale.axis2.to_value(u.arcsec / u.pix)),
], u.arcsec / u.pix),
)
try:
header["rsun_ref"] = float(smap.rsun_meters.to_value(u.m))
except Exception:
pass
coverage_fov = None
return smap.reproject_to(header, algorithm="adaptive", roundtrip_coords=False), coverage_fov
except Exception:
return smap, None
def _is_non_earth_display_observer(self) -> bool:
if self._state is None:
return False
return self._normalize_observer_key(self._state.display_observer_key) != "earth"
[docs]
def initialize(self, session_input: SelectorSessionInput) -> None:
selected_context_id = self._default_context_id(session_input)
selected_bottom_id = self._default_bottom_id(session_input)
self._clear_prepare_trace()
self._map_summary_cache.clear()
self._observer_coord_cache.clear()
self._observer_metadata_cache.clear()
self._observer_warning_cache.clear()
self._invalidate_map_caches()
available_keys = getattr(session_input, "available_observer_keys", None)
self._available_observer_keys_override = (
{self._normalize_observer_key(key) for key in available_keys}
if available_keys
else None
)
self._observer_availability_notice = (
str(getattr(session_input, "observer_availability_notice", "")).strip() or None
)
self._state = MapBoxViewState(
session_input=session_input,
selected_context_id=selected_context_id,
selected_bottom_id=selected_bottom_id,
geometry=session_input.geometry,
fov=session_input.fov,
fov_box=session_input.fov_box,
map_files=dict(session_input.map_files or {}),
refmaps=dict(session_input.refmaps or {}),
base_maps=dict(session_input.base_maps or {}),
base_wcs_header=str(session_input.base_wcs_header) if session_input.base_wcs_header else None,
base_geometry=session_input.base_geometry,
map_source_mode=str(session_input.map_source_mode or "auto"),
square_fov=bool(session_input.square_fov),
display_observer_key=self._normalize_observer_key(
getattr(session_input, "display_observer_key", "earth")
),
geometry_definition_observer_key="earth",
fov_definition_observer_key=self._normalize_observer_key(
getattr(session_input.fov_box, "observer_key", "earth")
),
custom_observer_ephemeris=copy.deepcopy(
getattr(session_input, "custom_observer_ephemeris", None)
) if isinstance(getattr(session_input, "custom_observer_ephemeris", None), dict) else None,
custom_observer_label=str(getattr(session_input, "custom_observer_label", "") or "").strip() or None,
custom_observer_source=str(getattr(session_input, "custom_observer_source", "") or "").strip() or None,
)
self._normalize_display_observer_state()
self._refresh_status_text()
self._refresh_map_info()
self._emit_observer_info()
self._update_fov_control_enabled_state()
[docs]
def refresh_session_view(self) -> None:
self._refresh_plot()
self._ensure_session_model_loaded()
self._refresh_fieldlines_from_committed_seeds()
self._update_fov_control_enabled_state()
[docs]
def set_available_maps(self, map_ids: Iterable[str]) -> None:
if self._state is None:
return
map_ids = list(map_ids)
self._state.session_input.map_ids = map_ids
if self._state.selected_context_id not in map_ids:
self._state.selected_context_id = self._default_context_id(self._state.session_input)
if self._state.selected_bottom_id not in map_ids:
self._state.selected_bottom_id = self._default_bottom_id(self._state.session_input)
self._refresh_status_text()
self._refresh_map_info()
self._refresh_plot()
[docs]
def set_context_map_id(self, map_id: Optional[str]) -> None:
if self._state is None:
return
if self._state.selected_context_id == map_id:
return
self._state.selected_context_id = map_id
self._refresh_status_text()
self._refresh_map_info()
self._refresh_plot(preserve_current_view=False)
[docs]
def set_bottom_map_id(self, map_id: Optional[str]) -> None:
if self._state is None:
return
if self._state.selected_bottom_id == map_id:
return
self._state.selected_bottom_id = map_id
self._refresh_status_text()
self._refresh_map_info()
self._refresh_plot(preserve_current_view=self._should_preserve_pixel_view())
[docs]
def set_map_file_paths(self, map_files: dict[str, str]) -> None:
if self._state is None:
return
normalized = dict(map_files or {})
if dict(self._state.map_files or {}) == normalized:
return
self._state.map_files = normalized
self._map_summary_cache.clear()
self._invalidate_map_caches()
self._refresh_map_info()
self._refresh_plot()
[docs]
def set_map_source_mode(self, mode: str) -> None:
if self._state is None:
return
mode = str(mode or "auto").lower()
if mode not in {"auto", "filesystem", "embedded"}:
mode = "auto"
if self._state.map_source_mode == mode:
return
self._state.map_source_mode = mode
self._map_summary_cache.clear()
self._invalidate_map_caches()
self._refresh_status_text()
self._refresh_map_info()
self._refresh_plot(preserve_current_view=self._should_preserve_pixel_view())
[docs]
def set_geometry_edit_enabled(self, enabled: bool) -> None:
self._geometry_edit_enabled = bool(enabled)
self._refresh_control_mode_ui()
self._refresh_status_text()
[docs]
def set_entry_box_path(self, entry_box_path: Optional[str | Path], *, load_session_model: bool = True) -> None:
self._entry_box_path = Path(entry_box_path).expanduser().resolve() if entry_box_path else None
self._session_box_template = None
self._session_obs_time = None
self._session_b3dtype = None
self._session_temp_h5_path = None
self._committed_line_seeds = None
self._session_model_loaded = False
self._observer_coord_cache.clear()
self._observer_metadata_cache.clear()
self._observer_warning_cache.clear()
if load_session_model:
self._load_session_model_from_entry()
self._normalize_display_observer_state()
self._emit_observer_info()
self._refresh_open_3d_state()
self._emit_action_state()
if load_session_model:
self._refresh_fieldlines_from_committed_seeds()
[docs]
def set_geometry_selection(self, selection: BoxGeometrySelection) -> None:
if self._state is None:
return
self._state.geometry = selection
self._invalidate_geometry_dependent_display_maps()
self._refresh_status_text()
self._refresh_plot(preserve_current_view=self._should_preserve_pixel_view())
if self._geometry_change_callback is not None:
self._geometry_change_callback(selection)
[docs]
def set_fov_selection(self, selection: DisplayFovSelection) -> None:
if self._state is None:
return
if self._state.square_fov:
selection = DisplayFovSelection(
center_x_arcsec=selection.center_x_arcsec,
center_y_arcsec=selection.center_y_arcsec,
width_arcsec=selection.width_arcsec,
height_arcsec=selection.width_arcsec,
)
self._state.fov = selection
self._sync_fov_box_to_selection()
self._invalidate_geometry_dependent_display_maps()
self._refresh_status_text()
self._refresh_plot(preserve_current_view=self._should_preserve_pixel_view())
if self._fov_change_callback is not None:
self._fov_change_callback(selection)
[docs]
def set_square_fov(self, enabled: bool, *, refresh: bool = True) -> None:
if self._state is None:
return
self._state.square_fov = bool(enabled)
self._update_fov_control_enabled_state()
if enabled and self._state.fov is not None:
selection = DisplayFovSelection(
center_x_arcsec=self._state.fov.center_x_arcsec,
center_y_arcsec=self._state.fov.center_y_arcsec,
width_arcsec=self._state.fov.width_arcsec,
height_arcsec=self._state.fov.width_arcsec,
)
if refresh:
self.set_fov_selection(selection)
else:
self._state.fov = selection
self._sync_fov_box_to_selection()
def _update_fov_control_enabled_state(self) -> None:
self._refresh_control_mode_ui()
[docs]
def set_action_state_callback(self, callback) -> None:
self._action_state_callback = callback
self._emit_action_state()
def _emit_action_state(self) -> None:
if self._action_state_callback is not None:
self._action_state_callback(self._can_open_3d, self._can_clear_lines)
def _refresh_open_3d_state(self) -> None:
self._can_open_3d = (
self._viewer3d is None
and self._entry_box_path is not None
)
def _load_session_model_from_entry(self) -> None:
if self._entry_box_path is None:
return
try:
box, obs_time, b3dtype, temp_h5_path = _prepare_model_for_viewer(self._entry_box_path)
except Exception:
self._session_box_template = None
self._session_obs_time = None
self._session_b3dtype = None
self._session_temp_h5_path = None
self._committed_line_seeds = None
self._session_model_loaded = True
return
self._session_box_template = box
self._session_obs_time = obs_time
self._session_b3dtype = b3dtype
self._session_temp_h5_path = temp_h5_path
line_seeds = box.b3d.get("line_seeds")
self._committed_line_seeds = copy.deepcopy(line_seeds) if isinstance(line_seeds, dict) else None
self._session_model_loaded = True
def _ensure_session_model_loaded(self) -> None:
if self._session_model_loaded:
return
self._load_session_model_from_entry()
self._refresh_open_3d_state()
self._emit_action_state()
def _clone_session_model(self):
if self._session_box_template is None:
return None, None, None
box = copy.deepcopy(self._session_box_template)
return box, self._session_obs_time, self._session_b3dtype
def _apply_live_session_state(self, box, *, update_frame_obs: bool = True) -> None:
if isinstance(self._committed_line_seeds, dict):
box.b3d["line_seeds"] = copy.deepcopy(self._committed_line_seeds)
else:
box.b3d.pop("line_seeds", None)
if not isinstance(box.b3d, dict):
return
observer_meta = box.b3d.get("observer", {})
if not isinstance(observer_meta, dict):
observer_meta = {}
if self._state is not None:
if self._state.fov is not None:
observer_meta["fov"] = {
"frame": "helioprojective",
"xc_arcsec": float(self._state.fov.center_x_arcsec),
"yc_arcsec": float(self._state.fov.center_y_arcsec),
"xsize_arcsec": float(self._state.fov.width_arcsec),
"ysize_arcsec": float(self._state.fov.height_arcsec),
"square": bool(self._state.square_fov),
}
if self._state.fov_box is not None:
observer_meta["fov_box"] = self._state.fov_box.as_observer_metadata(
square=bool(self._state.square_fov)
)
observer_meta["name"] = str(
self._normalize_observer_key(
self._state.display_observer_key if self._state is not None else "earth"
)
)
if self._normalize_observer_key(observer_meta.get("name")) == "custom":
observer_meta["label"] = str(self._state.custom_observer_label or "Custom")
if self._state.custom_observer_source:
observer_meta["source"] = str(self._state.custom_observer_source)
else:
observer_meta.pop("source", None)
else:
observer_meta["label"] = self._observer_label_for_key(observer_meta.get("name"))
observer_meta.pop("source", None)
ephemeris = copy.deepcopy(self._state.custom_observer_ephemeris or {})
needs_custom_ephemeris = (
self._normalize_observer_key(self._state.display_observer_key) == "custom"
or self._normalize_observer_key(self._state.fov_definition_observer_key) == "custom"
or (
self._state.fov_box is not None
and self._normalize_observer_key(getattr(self._state.fov_box, "observer_key", None)) == "custom"
)
)
if needs_custom_ephemeris and ephemeris:
observer_meta["ephemeris"] = ephemeris
pb0r = build_pb0r_metadata_from_ephemeris(
ephemeris,
observer_key="custom",
obs_time=ephemeris.get("obs_date", self._session_obs_time),
)
if pb0r:
observer_meta["pb0r"] = pb0r
else:
observer_meta.pop("ephemeris", None)
observer_meta.pop("pb0r", None)
box.b3d["observer"] = observer_meta
# Keep the live 3D frame observer aligned with the active 2D observer
# context so LOS camera and FOV overlay are evaluated in one frame.
if not update_frame_obs:
return
try:
frame_obs = getattr(box, "_frame_obs", None)
obs_time = getattr(frame_obs, "obstime", None)
if obs_time is None:
return
desired_key = self._normalize_observer_key(
self._state.display_observer_key if self._state is not None else "earth"
)
desired_observer = self._resolve_display_observer_coord(desired_key, obs_time)
if desired_observer is not None:
box._frame_obs = Helioprojective(observer=desired_observer, obstime=obs_time)
except Exception:
pass
def _refresh_live_3d_viewer_state(self) -> None:
viewer = self._viewer3d
if viewer is None:
return
try:
self._apply_live_session_state(viewer.box)
if hasattr(viewer, "_update_los_scene_label"):
viewer._update_los_scene_label()
if hasattr(viewer, "previous_params"):
viewer.previous_params = {}
if hasattr(viewer, "update_plot"):
viewer.update_plot(init=True)
elif hasattr(viewer, "update_fov_box"):
viewer.update_fov_box(getattr(viewer, "fov_box_visible", True), do_render=False)
if hasattr(viewer, "render"):
viewer.render()
except Exception:
pass
def _on_viewer3d_closed(self, *_args) -> None:
close_was_handled = self._viewer3d_close_handled
if not close_was_handled and self._viewer3d is not None:
try:
self.commit_live_3d_edits(
self._viewer3d._collect_line_seeds_snapshot(),
self._viewer3d._collect_streamlines(),
z_base=self._viewer3d.grid_zbase,
)
close_was_handled = True
except Exception:
pass
self._viewer3d = None
self._viewer3d_temp_h5_path = None
self._viewer3d_watchdog.stop()
if self._hidden_for_live_3d and not self._viewer3d_close_handled:
host = self.window()
host.show()
host.raise_()
host.activateWindow()
self._hidden_for_live_3d = False
self._viewer3d_close_handled = False
self._refresh_open_3d_state()
self._emit_action_state()
if not close_was_handled:
self._set_runtime_status("Live 3D viewer closed.")
[docs]
def committed_line_seeds(self):
return copy.deepcopy(self._committed_line_seeds) if isinstance(self._committed_line_seeds, dict) else None
def _refresh_fieldlines_from_committed_seeds(self) -> None:
if self._current_map is None or self._current_axes is None:
return
# Session models are loaded lazily in dialog startup; ensure seeds are
# available before attempting to rebuild and project field lines.
self._ensure_session_model_loaded()
if self._session_box_template is None:
self.clear_fieldlines()
return
if not isinstance(self._committed_line_seeds, dict):
self.clear_fieldlines()
return
try:
box, _obs_time, b3dtype = self._clone_session_model()
if box is None:
self.clear_fieldlines()
return
self._apply_live_session_state(box)
self._fieldline_frame_hcc = getattr(getattr(box, "_center", None), "frame", None)
self._fieldline_frame_obs = getattr(box, "_frame_obs", None)
streamlines, z_base = _generate_streamlines_from_seeds(box, b3dtype, self._committed_line_seeds)
if streamlines:
self.plot_fieldlines(streamlines, z_base=z_base)
else:
self.clear_fieldlines()
except Exception as exc:
self._set_runtime_status(f"Failed to restore saved field lines: {exc}")
[docs]
def commit_live_3d_edits(self, line_seeds, streamlines, z_base=0.0) -> None:
self._committed_line_seeds = copy.deepcopy(line_seeds) if isinstance(line_seeds, dict) else None
self._viewer3d_close_handled = True
if self._hidden_for_live_3d:
host = self.window()
host.show()
host.raise_()
host.activateWindow()
self._hidden_for_live_3d = False
self.plot_fieldlines(streamlines, z_base=z_base)
self._set_runtime_status("Accepted 3D seed edits into the 2D session model.")
[docs]
def cancel_live_3d_edits(self) -> None:
self._viewer3d_close_handled = True
if self._hidden_for_live_3d:
host = self.window()
host.show()
host.raise_()
host.activateWindow()
self._hidden_for_live_3d = False
self._set_runtime_status("Canceled 3D seed edits; kept the 2D session model unchanged.")
def _check_viewer3d_state(self) -> None:
if self._viewer3d is None:
self._viewer3d_watchdog.stop()
return
try:
window = self._viewer3d.app_window if hasattr(self._viewer3d, "app_window") else self._viewer3d
if not window.isVisible():
self._on_viewer3d_closed()
except Exception:
self._on_viewer3d_closed()
def _control_target_mode(self) -> str:
return "box" if self._geometry_edit_enabled else "fov"
def _refresh_control_mode_ui(self) -> None:
mode = self._control_target_mode()
can_recompute_fov = self._projected_box_fov is not None
if mode == "box":
self._control_mode_label.setText("BOX Controls")
self._left_btn.setToolTip("Move box center left")
self._right_btn.setToolTip("Move box center right")
self._down_btn.setToolTip("Move box center down")
self._up_btn.setToolTip("Move box center up")
self._x_minus_btn.setToolTip("Decrease X box size")
self._x_plus_btn.setToolTip("Increase X box size")
self._y_minus_btn.setToolTip("Decrease Y box size")
self._y_plus_btn.setToolTip("Increase Y box size")
self._xy_minus_btn.setToolTip("Decrease X and Y box size together")
self._xy_plus_btn.setToolTip("Increase X and Y box size together")
self._recompute_fov_btn.setEnabled(bool(self._entry_box_path is None and can_recompute_fov))
self._y_minus_btn.setEnabled(True)
self._y_plus_btn.setEnabled(True)
self._xy_minus_btn.setEnabled(True)
self._xy_plus_btn.setEnabled(True)
else:
square = bool(self._state.square_fov) if self._state is not None else False
self._control_mode_label.setText("FOV Controls")
self._left_btn.setToolTip("Move FOV center left")
self._right_btn.setToolTip("Move FOV center right")
self._down_btn.setToolTip("Move FOV center down")
self._up_btn.setToolTip("Move FOV center up")
self._x_minus_btn.setToolTip("Decrease FOV X size")
self._x_plus_btn.setToolTip("Increase FOV X size")
self._y_minus_btn.setToolTip("Decrease FOV Y size")
self._y_plus_btn.setToolTip("Increase FOV Y size")
self._xy_minus_btn.setToolTip("Decrease FOV X and Y size together")
self._xy_plus_btn.setToolTip("Increase FOV X and Y size together")
self._recompute_fov_btn.setEnabled(can_recompute_fov)
self._y_minus_btn.setEnabled(not square)
self._y_plus_btn.setEnabled(not square)
self._xy_minus_btn.setEnabled(True)
self._xy_plus_btn.setEnabled(True)
def _nudge_primary_center(self, axis: str, sign: int) -> None:
if self._control_target_mode() == "box":
self._nudge_box_center(axis, sign)
else:
self._nudge_fov_center(axis, sign)
def _nudge_primary_size(self, axis: str, sign: int) -> None:
if self._control_target_mode() == "box":
self._nudge_box_size(axis, sign)
else:
self._nudge_fov_size(axis, sign)
def _nudge_primary_size_xy(self, sign: int) -> None:
if self._control_target_mode() == "box":
self._nudge_box_size_xy(sign)
else:
self._nudge_fov_size_xy(sign)
[docs]
def current_geometry_selection(self) -> Optional[BoxGeometrySelection]:
if self._state is None:
return None
return self._state.geometry
[docs]
def current_fov_selection(self) -> Optional[DisplayFovSelection]:
if self._state is None:
return None
return self._state.fov
[docs]
def current_fov_box_selection(self) -> Optional[DisplayFovBoxSelection]:
if self._state is None:
return None
return self._state.fov_box
[docs]
def set_geometry_change_callback(self, callback) -> None:
self._geometry_change_callback = callback
[docs]
def set_map_info_callback(self, callback) -> None:
self._map_info_callback = callback
if callback is not None:
callback(self._last_map_info_text)
[docs]
def set_status_callback(self, callback) -> None:
self._status_callback = callback
if callback is not None and self._state is not None:
callback(self._last_status_text)
[docs]
def set_observer_info_callback(self, callback) -> None:
self._observer_info_callback = callback
if callback is not None and self._state is not None:
callback(self.current_observer_info())
[docs]
def set_fov_change_callback(self, callback) -> None:
self._fov_change_callback = callback
if callback is not None and self._state is not None and self._state.fov is not None:
callback(self._state.fov)
[docs]
def observer_options(self) -> tuple[tuple[str, str], ...]:
return tuple(_DISPLAY_OBSERVER_OPTIONS)
[docs]
def set_available_observer_keys(
self,
observer_keys: Iterable[str] | None,
*,
notice: str | None = None,
) -> None:
self._available_observer_keys_override = (
{self._normalize_observer_key(key) for key in observer_keys}
if observer_keys
else None
)
self._observer_availability_notice = str(notice or "").strip() or None
if self._state is None:
return
self._normalize_display_observer_state()
self._refresh_status_text()
self._emit_observer_info()
self._update_fov_control_enabled_state()
[docs]
def current_display_observer_key(self) -> str:
if self._state is None:
return "earth"
return self._normalize_observer_key(self._state.display_observer_key)
[docs]
def current_observer_persistence_state(self) -> dict[str, Any]:
if self._state is None:
return {
"display_observer_key": "earth",
"custom_observer_ephemeris": None,
"custom_observer_label": "",
"custom_observer_source": "",
"fov_definition_observer_key": "earth",
}
return {
"display_observer_key": self._normalize_observer_key(self._state.display_observer_key),
"custom_observer_ephemeris": copy.deepcopy(self._state.custom_observer_ephemeris),
"custom_observer_label": str(self._state.custom_observer_label or ""),
"custom_observer_source": str(self._state.custom_observer_source or ""),
"fov_definition_observer_key": self._normalize_observer_key(self._state.fov_definition_observer_key),
}
[docs]
def current_observer_info(self) -> dict[str, str]:
info = {
"name": "",
"label": "",
"source": "",
"model_time": "",
"obs_date": "",
"b0_deg": "",
"l0_deg": "",
"rsun_arcsec": "",
"p_deg": "",
"hgln_obs_deg": "",
"hglt_obs_deg": "",
"dsun_cm": "",
"rsun_cm": "",
}
if self._state is None:
return info
info["model_time"] = str(self._state.session_input.time_iso or "")
info["name"] = self._observer_label_for_key(self._state.display_observer_key)
info["label"] = info["name"]
if self._normalize_observer_key(self._state.display_observer_key) == "custom":
ephemeris = self._state.custom_observer_ephemeris or {}
try:
when = ephemeris.get("obs_date", self._state.session_input.time_iso)
when = when if isinstance(when, Time) else Time(when)
except Exception:
return info
params = self._resolve_display_observer_metadata("custom", when)
if params is None:
return info
info["name"] = "Custom"
info["label"] = str(self._state.custom_observer_label or "Custom")
info["source"] = str(self._state.custom_observer_source or "")
info["obs_date"] = when.isot
observer = params.get("observer_coordinate")
if observer is not None:
try:
hgs = observer.transform_to(HeliographicStonyhurst(obstime=when))
info["hgln_obs_deg"] = f"{float(hgs.lon.to_value(u.deg)):.6f}"
info["hglt_obs_deg"] = f"{float(hgs.lat.to_value(u.deg)):.6f}"
except Exception:
pass
for key, digits in (("b0_deg", 6), ("l0_deg", 6), ("p_deg", 6), ("rsun_arcsec", 2)):
value = params.get(key)
if value is None:
continue
try:
info[key] = f"{float(value):.{digits}f}"
except Exception:
pass
for key in ("dsun_cm", "rsun_cm"):
value = params.get(key)
if value is None:
continue
try:
info[key] = f"{float(value):.6e}"
except Exception:
pass
return info
source_b3d = self._observer_source_b3d()
observer_meta = source_b3d.get("observer", {}) if isinstance(source_b3d, dict) else {}
ephemeris = observer_meta.get("ephemeris", {}) if isinstance(observer_meta, dict) else {}
obs_time = self._state.session_input.time_iso
if isinstance(ephemeris, dict):
obs_time = ephemeris.get("obs_date", ephemeris.get("obs_time", obs_time))
smap = self._current_map
if smap is not None:
obs_time = self._obstime_for_map(smap, obs_time)
try:
when = obs_time if isinstance(obs_time, Time) else Time(obs_time)
except Exception:
return info
if isinstance(source_b3d, dict):
observer, _warning, used_key = resolve_observer_with_info(
source_b3d,
self._state.display_observer_key,
when,
)
else:
observer = self._resolve_display_observer_coord(self._state.display_observer_key, when)
used_key = self._normalize_observer_key(self._state.display_observer_key)
if observer is None:
return info
info["name"] = self._observer_label_for_key(used_key)
info["label"] = info["name"]
try:
hgs = observer.transform_to(HeliographicStonyhurst(obstime=when))
except Exception:
hgs = observer
rsun_cm = None
if isinstance(ephemeris, dict) and ephemeris.get("rsun_cm") is not None:
try:
rsun_cm = float(ephemeris.get("rsun_cm"))
except Exception:
rsun_cm = None
elif self._current_map is not None and getattr(self._current_map, "rsun_meters", None) is not None:
try:
rsun_cm = float(u.Quantity(self._current_map.rsun_meters).to_value(u.cm))
except Exception:
rsun_cm = None
params = resolve_observer_parameters_from_ephemeris(
{
"hgln_obs_deg": float(hgs.lon.to_value(u.deg)),
"hglt_obs_deg": float(hgs.lat.to_value(u.deg)),
"dsun_cm": float(hgs.radius.to_value(u.cm)),
"rsun_cm": rsun_cm,
"obs_date": when.isot,
},
observer_key=used_key,
obs_time=when,
)
if params is None:
return info
info["obs_date"] = when.isot
info["hgln_obs_deg"] = f"{float(hgs.lon.to_value(u.deg)):.6f}"
info["hglt_obs_deg"] = f"{float(hgs.lat.to_value(u.deg)):.6f}"
for key, digits in (("b0_deg", 6), ("l0_deg", 6), ("p_deg", 6), ("rsun_arcsec", 2)):
value = params.get(key)
if value is None:
continue
try:
info[key] = f"{float(value):.{digits}f}"
except Exception:
info[key] = str(value)
for key in ("dsun_cm", "rsun_cm"):
value = params.get(key)
if value is None:
continue
try:
info[key] = f"{float(value):.3e}"
except Exception:
info[key] = str(value)
return info
def _sync_fov_box_to_selection(self) -> None:
if self._state is None or self._state.fov is None:
return
recomputed = self._compute_fov_box_from_current_selection()
if recomputed is not None:
self._state.fov_box = recomputed
return
if self._state.fov_box is None:
return
# Fallback path: preserve existing z extent if geometry recomputation fails.
self._state.fov_box = DisplayFovBoxSelection(
center_x_arcsec=float(self._state.fov.center_x_arcsec),
center_y_arcsec=float(self._state.fov.center_y_arcsec),
width_arcsec=float(self._state.fov.width_arcsec),
height_arcsec=float(self._state.fov.height_arcsec),
z_min_mm=float(self._state.fov_box.z_min_mm),
z_max_mm=float(self._state.fov_box.z_max_mm),
observer_key=str(self._state.fov_box.observer_key or self._state.fov_definition_observer_key),
)
def _compute_fov_box_from_current_selection(self) -> Optional[DisplayFovBoxSelection]:
if self._state is None or self._state.fov is None or self._current_map is None:
return None
obstime = getattr(self._current_map, "date", None)
geometry_observer_key = self._state.geometry_definition_observer_key
source_map = self._observer_context(geometry_observer_key, obstime) or self._current_map
box = self._build_legacy_box(
source_map,
geometry_observer_key=geometry_observer_key,
)
if box is None:
return None
world = box.model_box_corners_world()
if world is None:
return None
observer = self._resolved_observer_for_map(self._current_map, self._state.display_observer_key) or "earth"
try:
fov_box = build_fov_box_from_user_hpc_and_red_box_world(
world,
xc_arcsec=float(self._state.fov.center_x_arcsec),
yc_arcsec=float(self._state.fov.center_y_arcsec),
xsize_arcsec=float(self._state.fov.width_arcsec),
ysize_arcsec=float(self._state.fov.height_arcsec),
observer=observer,
obstime=obstime,
)
if fov_box is None:
return None
return DisplayFovBoxSelection(
center_x_arcsec=float(fov_box["xc_arcsec"]),
center_y_arcsec=float(fov_box["yc_arcsec"]),
width_arcsec=float(fov_box["xsize_arcsec"]),
height_arcsec=float(fov_box["ysize_arcsec"]),
z_min_mm=float(fov_box["zmin_mm"]),
z_max_mm=float(fov_box["zmax_mm"]),
observer_key=self._normalize_observer_key(self._state.display_observer_key),
)
except Exception:
return None
def _compute_fov_box_local_corners(
self,
fov_box: DisplayFovBoxSelection | None = None,
) -> tuple[tuple[float, float, float], ...] | None:
if self._state is None or self._current_map is None:
return None
fov_box = fov_box or self._state.fov_box
if fov_box is None:
return None
source_context = self._observer_context(
getattr(fov_box, "observer_key", None),
getattr(self._current_map, "date", None),
)
source_map = source_context or self._current_map
box = self._build_legacy_box(
source_map,
geometry_observer_key=self._state.geometry_definition_observer_key,
)
if box is None:
return None
corners = box.fov_box_corners_local_mm(
fov_box.as_observer_metadata(square=bool(self._state.square_fov))
)
if corners is None:
return None
return tuple(tuple(float(v) for v in row) for row in np.asarray(corners, dtype=float))
def _should_preserve_pixel_view(self) -> bool:
return self._current_axes is not None
def _status_text_with_prepare_trace(self, base_text: str) -> str:
text = str(base_text or "")
if not self._prep_trace_order:
return text
lines = []
for label in self._prep_trace_order[-12:]:
count = self._prep_trace_counts.get(label, 0)
if count <= 0:
continue
lines.append(f"{count}x {label}")
if not lines:
return text
return f"{text}\n\nprep_trace:\n" + "\n".join(lines)
def _emit_status_text(self) -> None:
self._last_status_text = self._status_text_with_prepare_trace(self._last_status_base_text)
if self._status_callback is not None:
self._status_callback(self._last_status_text)
def _clear_prepare_trace(self) -> None:
self._prep_trace_counts.clear()
self._prep_trace_order.clear()
def _record_prepare_event(self, label: str) -> None:
key = str(label or "").strip()
if not key:
return
if key not in self._prep_trace_counts:
self._prep_trace_order.append(key)
if len(self._prep_trace_order) > 20:
old = self._prep_trace_order.pop(0)
self._prep_trace_counts.pop(old, None)
self._prep_trace_counts[key] = 0
self._prep_trace_counts[key] += 1
self._emit_status_text()
def _refresh_status_text(self) -> None:
if self._state is None:
self._last_status_base_text = "Map/box display placeholder (uninitialized)"
self._emit_status_text()
return
geom = self._state.geometry
if geom is None:
geom_text = "geometry: <none>"
else:
geom_text = (
f"geometry: {geom.coord_mode.value} "
f"({geom.coord_x:.3f}, {geom.coord_y:.3f}), "
f"dims={geom.grid_x}x{geom.grid_y}x{geom.grid_z}, dx={geom.dx_km:.3f} km"
)
if self._state.fov is None:
fov_text = "fov: <auto>"
else:
fov = self._state.fov
fov_text = (
f"fov: center=({fov.center_x_arcsec:.2f}, {fov.center_y_arcsec:.2f}) arcsec, "
f"size={fov.width_arcsec:.2f}x{fov.height_arcsec:.2f} arcsec"
)
base_text = (
"Map/box selector interaction\n"
f"mouse_actions={'on' if self._mouse_actions_enabled else 'off'}\n"
f"geometry_edit={'on' if self._geometry_edit_enabled else 'off'}\n"
f"display_observer={self._observer_label_for_key(self._state.display_observer_key)}\n"
f"geometry_frame={self._observer_label_for_key(self._state.geometry_definition_observer_key)}\n"
f"fov_frame={self._observer_label_for_key(self._state.fov_definition_observer_key)}\n"
f"context={self._display_map_label(self._state.selected_context_id, bottom=False)!r}, "
f"base={self._display_map_label(self._state.selected_bottom_id, bottom=True)!r}\n"
f"map_source={self._state.map_source_mode}\n"
f"square_fov={'on' if self._state.square_fov else 'off'}\n"
f"{geom_text}\n{fov_text}"
)
if self._observer_availability_notice:
base_text = f"{base_text}\n\n{self._observer_availability_notice}"
model_time = str(self._state.session_input.time_iso or "")
if model_time:
base_text = f"{base_text}\nmodel_time={model_time}"
observer_time = ""
if self._normalize_observer_key(self._state.display_observer_key) == "custom":
if isinstance(self._state.custom_observer_ephemeris, dict):
observer_time = str(
self._state.custom_observer_ephemeris.get("obs_date")
or self._state.custom_observer_ephemeris.get("obs_time")
or ""
)
else:
observer_time = model_time
if observer_time:
base_text = f"{base_text}\nobserver_time={observer_time}"
if self._normalize_observer_key(self._state.display_observer_key) == "custom":
base_text = f"{base_text}\ncustom_label={self._state.custom_observer_label or 'Custom'}"
if self._state.custom_observer_source:
base_text = f"{base_text}\ncustom_source={self._state.custom_observer_source}"
self._last_status_base_text = base_text
self._emit_status_text()
def _refresh_map_info(self) -> None:
if self._state is None:
self._set_map_info_text("Map info: <uninitialized>")
return
context_summary = self._single_map_summary(self._state.selected_context_id, role="Context", bottom=False)
bottom_summary = self._single_map_summary(self._state.selected_bottom_id, role="Base", bottom=True)
self._set_map_info_text(f"{context_summary}\n\n{bottom_summary}")
def _set_map_info_text(self, text: str) -> None:
self._last_map_info_text = text
if self._map_info_callback is not None:
self._map_info_callback(text)
def _auto_adjust_axes_margins(self, ax, *, top: float = 0.93, pad_px: float = 8.0) -> bool:
"""Expand subplot margins after rendering if WCS labels are clipped."""
try:
self._fig.subplots_adjust(top=top)
self._canvas.draw()
renderer = self._canvas.get_renderer()
tight = ax.get_tightbbox(renderer)
if tight is None:
return False
fig_bbox = self._fig.bbox
fig_w = max(float(fig_bbox.width), 1.0)
fig_h = max(float(fig_bbox.height), 1.0)
sp = self._fig.subplotpars
left = float(sp.left)
right = float(sp.right)
bottom = float(sp.bottom)
left_over = max(0.0, (fig_bbox.x0 + pad_px) - float(tight.x0))
right_over = max(0.0, float(tight.x1) + pad_px - fig_bbox.x1)
bottom_over = max(0.0, (fig_bbox.y0 + pad_px) - float(tight.y0))
new_left = min(0.30, left + (left_over / fig_w))
new_right = max(0.70, right - (right_over / fig_w))
new_bottom = min(0.22, bottom + (bottom_over / fig_h))
if (
abs(new_left - left) > 1e-4
or abs(new_right - right) > 1e-4
or abs(new_bottom - bottom) > 1e-4
):
self._fig.subplots_adjust(left=new_left, right=new_right, bottom=new_bottom, top=top)
return True
except Exception:
return False
return False
def _emit_observer_info(self) -> None:
if self._observer_info_callback is not None:
self._observer_info_callback(self.current_observer_info())
def _single_map_summary(self, map_id: Optional[str], role: str, bottom: bool) -> str:
if not map_id:
return f"{role} map: <none>"
cache_key = f"{role}:{map_id}"
if cache_key in self._map_summary_cache:
return self._map_summary_cache[cache_key]
try:
smap = self._selected_bottom_map() if bottom else self._selected_context_map()
if smap is None:
txt = f"{role} map ({self._display_map_label(map_id, bottom)}): unavailable"
self._map_summary_cache[cache_key] = txt
return txt
data = np.asarray(smap.data)
finite = np.isfinite(data)
n_finite = int(finite.sum())
stats = "all non-finite"
if n_finite > 0:
vals = data[finite]
stats = (
f"min={float(np.nanmin(vals)):.3g}, "
f"max={float(np.nanmax(vals)):.3g}, "
f"mean={float(np.nanmean(vals)):.3g}"
)
obs_time = getattr(smap, "date", None)
purpose = "bottom" if bottom else "context"
txt = (
f"{role} map ({self._display_map_label(map_id, bottom)})\n"
f"source={self._map_source_label(map_id, purpose=purpose)}\n"
f"shape={tuple(data.shape)}, finite={n_finite}/{data.size}\n"
f"{stats}\n"
f"obs_time={obs_time}"
)
except Exception as exc:
purpose = "bottom" if bottom else "context"
txt = f"{role} map ({map_id}) load failed:\n{self._map_source_label(map_id, purpose=purpose)}\n{exc}"
self._map_summary_cache[cache_key] = txt
return txt
@staticmethod
def _display_map_label(map_id: Optional[str], bottom: bool) -> Optional[str]:
if map_id is None:
return None
if not bottom and map_id == "Bz":
return "Blos"
return map_id
def _selected_context_map(self):
if self._state is None:
return None
map_id = self._state.selected_context_id
if not map_id:
return None
return self._map_for_id(map_id, purpose="context")
def _selected_bottom_map(self):
if self._state is None:
return None
map_id = self._state.selected_bottom_id
if not map_id:
return None
return self._map_for_id(map_id, purpose="bottom")
def _map_for_id(self, map_id: str, purpose: str):
canonical_key = self._canonical_map_key(map_id, purpose=purpose)
if purpose == "context":
return self._context_map_for_id(map_id, canonical_key)
smap = self._load_raw_map(canonical_key, purpose=purpose)
if smap is None:
return None
observer_token = self._display_observer_cache_token(smap)
view_key = str(self._view_mode or "box_fov")
display_key = f"__{purpose}__:{observer_token}:{view_key}:{canonical_key}"
alias_key = f"__{purpose}__:{observer_token}:{view_key}:{map_id}"
with self._cache_lock:
if alias_key in self._loaded_map_cache:
return self._loaded_map_cache[alias_key]
if display_key in self._loaded_map_cache:
smap = self._loaded_map_cache[display_key]
self._loaded_map_cache[alias_key] = smap
return smap
smap = self._prepare_bottom_map(canonical_key, smap)
with self._cache_lock:
self._loaded_map_cache[display_key] = smap
self._loaded_map_cache[alias_key] = smap
return smap
def _context_map_for_id(self, map_id: str, canonical_key: str):
smap = self._load_raw_map(canonical_key, purpose="context")
if smap is None:
return None
observer_key = self._normalize_observer_key(
self._state.display_observer_key if self._state is not None else "earth"
)
observer_token = self._display_observer_cache_token(smap, observer_key)
prepared_key = f"__context_prepared__:{observer_token}:{canonical_key}"
desired_prepare_fov = self._current_display_prepare_fov(
observer_key,
obstime=self._obstime_for_map(
smap,
self._state.session_input.time_iso if self._state is not None else None,
),
)
with self._cache_lock:
cache_entry = self._prepared_context_map_cache.get(prepared_key)
prepared_map = None
coverage_fov = None
if isinstance(cache_entry, dict):
prepared_map = cache_entry.get("map")
coverage_fov = cache_entry.get("coverage_fov")
else:
prepared_map = cache_entry
if prepared_map is None or (desired_prepare_fov is None and coverage_fov is not None) or (
desired_prepare_fov is not None and coverage_fov is not None and not self._fov_contains(coverage_fov, desired_prepare_fov)
):
self._record_prepare_event(f"context prepare: {canonical_key} @ {observer_key}")
prepared_map, coverage_fov = self._prepare_context_map(canonical_key, smap, target_fov=desired_prepare_fov)
with self._cache_lock:
self._prepared_context_map_cache[prepared_key] = {
"map": prepared_map,
"coverage_fov": coverage_fov,
}
if self._view_mode == "box_fov":
display_bounds = self._display_window_pixel_bounds(prepared_map)
if display_bounds is not None:
return self._submap_to_pixel_bounds(prepared_map, display_bounds)
display_fov = self._display_window_fov_selection(prepared_map)
if display_fov is not None:
return self._submap_to_explicit_fov(prepared_map, fov_override=display_fov, pad_factor=1.0)
box = self._build_legacy_box(prepared_map)
if box is not None:
return self._submap_to_box_bounds(prepared_map, box)
return prepared_map
def _invalidate_map_caches(self) -> None:
with self._cache_lock:
self._loaded_map_cache.clear()
self._prepared_context_map_cache.clear()
self._raw_map_cache.clear()
self._background_cache_generation += 1
def _invalidate_display_map_cache(self) -> None:
with self._cache_lock:
self._loaded_map_cache.clear()
self._background_cache_generation += 1
def _invalidate_geometry_dependent_display_maps(self) -> None:
with self._cache_lock:
keys_to_drop = [
key for key in self._loaded_map_cache.keys()
if key.startswith("__bottom__:")
]
if self._is_non_earth_display_observer() and self._view_mode == "box_fov":
keys_to_drop.extend(
key for key in self._loaded_map_cache.keys()
if key.startswith("__context__:")
)
for key in set(keys_to_drop):
self._loaded_map_cache.pop(key, None)
self._background_cache_generation += 1
def _start_background_cache_build(self) -> None:
if not self._background_cache_enabled:
return
if self._state is None:
return
map_ids = tuple(
m for m in (self._state.session_input.map_ids or ())
if m in {"Bz", "Ic", "B_rho", "B_theta", "B_phi", "disambig", "Br", "Bp", "Bt"}
)
if not map_ids:
return
with self._cache_lock:
generation = self._background_cache_generation
thread = self._background_cache_thread
if thread is not None and thread.is_alive():
return
self._background_cache_thread = threading.Thread(
target=self._background_cache_worker,
args=(generation, map_ids),
daemon=True,
)
self._background_cache_thread.start()
def _background_cache_worker(self, generation: int, map_ids: tuple[str, ...]) -> None:
for map_id in map_ids:
with self._cache_lock:
if generation != self._background_cache_generation:
return
try:
self._map_for_id(map_id, purpose="context")
self._map_for_id(map_id, purpose="bottom")
except Exception:
continue
@staticmethod
def _canonical_map_key(map_id: str, *, purpose: str = "context") -> str:
if purpose == "bottom":
return _BOTTOM_DISPLAY_MAP_ALIASES.get(map_id, map_id)
return _CONTEXT_DISPLAY_MAP_ALIASES.get(map_id, map_id)
def _map_source_label(self, map_id: str, *, purpose: str = "context") -> str:
canonical_key = self._canonical_map_key(map_id, purpose=purpose)
if canonical_key in {"field", "inclination", "azimuth", "disambig"}:
path = self._filesystem_path_for_key(canonical_key, purpose=purpose)
if path:
return Path(path).name
return canonical_key
path = self._filesystem_path_for_key(canonical_key, purpose=purpose)
if path:
return Path(path).name
if self._embedded_base_key_for_map(canonical_key) and self._embedded_base_array(canonical_key, purpose=purpose) is not None:
return f"embedded:base.{self._embedded_base_key_for_map(canonical_key)}"
ref_key = self._embedded_refmap_key(canonical_key)
if ref_key and self._embedded_payload_for_key(ref_key, purpose=purpose):
return f"embedded:{ref_key}"
return canonical_key
def _filesystem_enabled(self, purpose: str = "context") -> bool:
if purpose == "bottom":
return False
return self._state is not None and self._state.map_source_mode in {"auto", "filesystem"}
def _embedded_enabled(self, purpose: str = "context") -> bool:
if self._state is None:
return False
if purpose == "bottom":
return bool(self._state.base_maps or self._state.refmaps)
# In "filesystem" mode, prefer on-disk files when they exist, but still
# allow fallback to embedded products for map types that have no
# filesystem representation (e.g. Vert_current in saved HDF5 models).
return bool(self._state.base_maps or self._state.refmaps)
def _filesystem_path_for_key(self, map_key: str, purpose: str = "context") -> str | None:
if not self._filesystem_enabled(purpose=purpose):
return None
return (self._state.map_files or {}).get(map_key) if self._state is not None else None
def _embedded_payload_for_key(self, ref_key: str, purpose: str = "context"):
if not self._embedded_enabled(purpose=purpose) or self._state is None:
return None
return (self._state.refmaps or {}).get(ref_key)
@staticmethod
def _embedded_base_key_for_map(map_key: str) -> str | None:
key = str(map_key)
key_l = key.lower()
if key_l in {"bx", "by", "bz"}:
return key_l
if map_key == "magnetogram":
return "bz"
if key_l in {"continuum", "ic"}:
return "ic"
if key_l == "chromo_mask":
return "chromo_mask"
if key_l == "vert_current":
return "vert_current"
return None
def _embedded_base_array(self, map_key: str, purpose: str = "context"):
if not self._embedded_enabled(purpose=purpose) or self._state is None:
return None
base_key = self._embedded_base_key_for_map(map_key)
if not base_key:
return None
base_maps = self._state.base_maps or {}
if base_key not in base_maps:
# Backward/forward compatibility for case variants in persisted keys.
folded = {str(k).lower(): k for k in base_maps.keys()}
if str(base_key).lower() not in folded:
return None
base_key = folded[str(base_key).lower()]
arr = np.asarray(base_maps[base_key])
if arr.ndim != 2:
return None
return arr
def _load_embedded_base_map(self, map_key: str, purpose: str = "context"):
if self._state is None:
return None
with self._cache_lock:
cache_key = f"__base__:{purpose}:{map_key}"
if cache_key in self._raw_map_cache:
return self._raw_map_cache[cache_key]
data = self._embedded_base_array(map_key, purpose=purpose)
if data is None:
return None
try:
ref_map = self._reference_context_map()
if ref_map is None:
return None
base_geom = self._state.base_geometry or self._state.geometry
box = self._build_legacy_box(ref_map, geom=base_geom)
if box is None:
return None
header = box.bottom_cea_header
self._copy_observer_cards_from_map(header, ref_map)
smap = map_from_data_header_compat(np.asarray(data), header)
except Exception:
return None
with self._cache_lock:
self._raw_map_cache[cache_key] = smap
return smap
@staticmethod
def _header_text_from_value(value) -> str:
if value is None:
return ""
if isinstance(value, (bytes, bytearray)):
return value.decode("utf-8", "ignore")
if isinstance(value, np.ndarray) and value.shape == ():
return MapBoxDisplayWidget._header_text_from_value(value.item())
return str(value)
@staticmethod
def _normalize_embedded_header_text(header_text: str) -> str:
text = str(header_text or "")
# Embedded box files may persist FITS headers with literal "\\n"
# separators instead of real newlines.
if "\\n" in text and "\n" not in text:
text = text.replace("\\n", "\n")
return text
@staticmethod
def _copy_observer_cards_from_map(header, smap) -> None:
if header is None or smap is None:
return
meta = getattr(smap, "meta", None)
if meta is None:
return
for src_key, dst_key in (
("hgln_obs", "HGLN_OBS"),
("hglt_obs", "HGLT_OBS"),
("dsun_obs", "DSUN_OBS"),
("crln_obs", "CRLN_OBS"),
("crlt_obs", "CRLT_OBS"),
("rsun_ref", "RSUN_REF"),
("date-obs", "DATE-OBS"),
("date_obs", "DATE_OBS"),
):
value = meta.get(src_key)
if value is None:
value = meta.get(dst_key)
if value is not None:
header[dst_key] = value
@staticmethod
def _embedded_refmap_key(map_key: str) -> str | None:
key = str(map_key)
key_l = key.lower()
if key == "magnetogram":
return "Bz_reference"
if key == "continuum":
return "Ic_reference"
if key_l == "vert_current":
return "Vert_current"
if key.isdigit():
return f"AIA_{key}"
return key
def _load_embedded_refmap(self, ref_key: str, purpose: str = "context"):
if self._state is None:
return None
with self._cache_lock:
cache_key = f"__embedded__:{purpose}:{ref_key}"
if cache_key in self._raw_map_cache:
return self._raw_map_cache[cache_key]
payload = self._embedded_payload_for_key(ref_key, purpose=purpose)
if not isinstance(payload, dict):
return None
data = payload.get("data")
header_text = self._normalize_embedded_header_text(
self._header_text_from_value(payload.get("wcs_header"))
)
if data is None or not header_text.strip():
return None
try:
header = fits.Header.fromstring(header_text, sep="\n")
ref_map = self._reference_context_map()
self._copy_observer_cards_from_map(header, ref_map)
header[_EMBEDDED_REFMAP_FLAG] = True
smap = map_from_data_header_compat(np.asarray(data), header)
except Exception:
return None
with self._cache_lock:
self._raw_map_cache[cache_key] = smap
return smap
def _load_raw_map(self, map_key: str, purpose: str = "context"):
with self._cache_lock:
raw_cache_key = f"__rawmap__:{purpose}:{map_key}"
if raw_cache_key in self._raw_map_cache:
return self._raw_map_cache[raw_cache_key]
path = self._filesystem_path_for_key(map_key, purpose=purpose)
if path:
smap = load_sunpy_map_compat(path)
if map_key in _HMI_VECTOR_SEGMENTS:
smap = self._submap_to_geometry_fov(smap)
else:
smap = self._load_embedded_base_map(map_key, purpose=purpose)
if smap is not None:
with self._cache_lock:
self._raw_map_cache[raw_cache_key] = smap
return smap
ref_key = self._embedded_refmap_key(map_key)
smap = self._load_embedded_refmap(ref_key, purpose=purpose) if ref_key else None
if smap is None:
return None
with self._cache_lock:
self._raw_map_cache[raw_cache_key] = smap
return smap
def _prepare_context_map(
self,
map_key: str,
smap,
*,
target_fov: DisplayFovSelection | None = None,
):
display_map = smap
if map_key in _HMI_DISPLAY_KEYS:
try:
display_map = display_map.rotate(order=3)
except Exception:
pass
ref_map = self._reference_context_map()
if ref_map is not None:
try:
self._record_prepare_event(f"context reproj: {map_key} -> ref_wcs")
display_map = self._with_matching_rsun(display_map, ref_map)
display_map = display_map.reproject_to(ref_map.wcs)
except Exception:
pass
display_map, coverage_fov = self._reproject_map_for_display_observer(display_map, fov_override=target_fov)
self._apply_display_scaling(display_map, map_key)
return display_map, coverage_fov
def _prepare_bottom_map(self, map_key: str, smap):
display_map = smap
box = self._build_legacy_box(display_map)
if box is not None:
if self._view_mode == "box_fov":
display_bounds = self._display_window_pixel_bounds(display_map)
if display_bounds is not None:
display_map = self._submap_to_pixel_bounds(display_map, display_bounds)
else:
display_fov = self._display_window_fov_selection(display_map)
if display_fov is not None:
display_map = self._submap_to_explicit_fov(display_map, fov_override=display_fov, pad_factor=1.0)
else:
display_map = self._submap_to_box_bounds(display_map, box)
if not self._is_non_earth_display_observer() and self._view_mode == "box_fov":
try:
self._record_prepare_event(f"bottom reproj: {map_key} -> base_cea")
display_map = self._with_matching_rsun(display_map, box.bottom_cea_header)
display_map = display_map.reproject_to(
box.bottom_cea_header,
algorithm="adaptive",
roundtrip_coords=False,
)
except Exception:
pass
# Keep bottom maps in their native/base WCS and let SunPy autoalign
# handle observer-frame plotting on the current axes.
self._apply_display_scaling(display_map, map_key)
return display_map
def _reference_context_map(self):
if self._state is None:
return None
for map_id in _AIA_REFERENCE_IDS:
path = self._filesystem_path_for_key(map_id)
cache_key = f"__ref__:{map_id}"
with self._cache_lock:
if cache_key in self._raw_map_cache:
return self._raw_map_cache[cache_key]
try:
if path:
ref_map = load_sunpy_map_compat(path)
else:
ref_map = self._load_embedded_refmap(f"AIA_{map_id}")
if ref_map is None:
continue
with self._cache_lock:
self._raw_map_cache[cache_key] = ref_map
return ref_map
except Exception:
continue
return None
def _geometry_anchor_coord(self, geom: BoxGeometrySelection, smap, observer_key: str | None = None):
obstime = getattr(smap, "date", None)
if observer_key is None and self._state is not None:
observer_key = self._state.geometry_definition_observer_key
observer_context = self._observer_context(observer_key, obstime)
observer = getattr(observer_context, "observer_coordinate", None)
if observer is None:
observer = self._resolved_observer_for_map(smap, observer_key) or "earth"
if geom.coord_mode == CoordMode.HPC:
return SkyCoord(
Tx=geom.coord_x * u.arcsec,
Ty=geom.coord_y * u.arcsec,
obstime=obstime,
observer=observer,
frame=Helioprojective,
)
if geom.coord_mode == CoordMode.HGC:
return SkyCoord(
lon=geom.coord_x * u.deg,
lat=geom.coord_y * u.deg,
radius=696 * u.Mm,
obstime=obstime,
observer=observer,
frame=HeliographicCarrington,
)
return SkyCoord(
lon=geom.coord_x * u.deg,
lat=geom.coord_y * u.deg,
radius=696 * u.Mm,
obstime=obstime,
observer=observer,
frame=HeliographicStonyhurst,
)
def _build_legacy_box(
self,
smap,
geom: BoxGeometrySelection | None = None,
*,
geometry_observer_key: str | None = None,
):
if self._state is None:
return None
geom = geom or self._state.geometry
if geom is None:
return None
box_dims = u.Quantity([geom.grid_x, geom.grid_y, geom.grid_z], u.pix)
box_res = geom.dx_km * u.km
box_origin = self._geometry_anchor_coord(geom, smap, observer_key=geometry_observer_key)
observer = self._resolved_observer_for_map(
smap,
geometry_observer_key if geometry_observer_key is not None else (
self._state.display_observer_key if self._state is not None else "earth"
),
) or "earth"
obstime = getattr(smap, "date", None)
frame_obs = Helioprojective(observer=observer, obstime=obstime)
frame_hcc = Heliocentric(observer=box_origin, obstime=obstime)
box_dimensions = box_dims / u.pix * box_res
box_center = box_origin.transform_to(frame_hcc)
box_center = SkyCoord(
x=box_center.x,
y=box_center.y,
z=box_center.z + box_dimensions[2] / 2,
frame=box_center.frame,
)
box = Box(frame_obs, box_origin, box_center, box_dims, box_res)
if self._session_box_template is not None and isinstance(getattr(self._session_box_template, "b3d", None), dict):
box.b3d = copy.deepcopy(self._session_box_template.b3d)
self._apply_live_session_state(box, update_frame_obs=False)
return box
def _submap_to_box_bounds(self, smap, box, pad_frac: float | None = None):
if box is None:
return smap
if pad_frac is None:
pad_frac = float(self._state.session_input.pad_frac or 0.10) if self._state is not None else 0.10
try:
fov = box.bounds_coords_bl_tr(pad_frac=pad_frac)
return smap.submap(fov[0], top_right=fov[1])
except Exception:
return smap
def _submap_to_geometry_fov(self, smap):
geometry_observer_key = self._state.geometry_definition_observer_key if self._state is not None else None
return self._submap_to_box_bounds(
smap,
self._build_legacy_box(smap, geometry_observer_key=geometry_observer_key),
)
def _display_window_pixel_bounds(
self,
smap,
*,
pad_factor: float = 1.10,
) -> tuple[float, float, float, float] | None:
if self._view_mode != "box_fov":
return None
projected_edges = self._fov_box_projected_edges(smap)
projected_bbox = self._edge_pixel_bounds(smap, projected_edges) if projected_edges else None
if projected_bbox is not None:
x0, x1, y0, y1 = projected_bbox
elif self._state is not None and self._state.fov is not None:
rect = self._fov_selection_to_pixel_rect(smap, self._state.fov)
x0 = rect.get_x()
x1 = x0 + rect.get_width()
y0 = rect.get_y()
y1 = y0 + rect.get_height()
else:
geometry_observer_key = self._state.geometry_definition_observer_key if self._state is not None else None
box = self._build_legacy_box(smap, geometry_observer_key=geometry_observer_key)
if box is None:
return None
fov = self._box_bounds_to_fov_selection(box, smap)
rect = self._fov_selection_to_pixel_rect(smap, fov)
x0 = rect.get_x()
x1 = x0 + rect.get_width()
y0 = rect.get_y()
y1 = y0 + rect.get_height()
cx = 0.5 * (x0 + x1)
cy = 0.5 * (y0 + y1)
half_w = 0.5 * max(abs(x1 - x0), 1e-6) * float(pad_factor)
half_h = 0.5 * max(abs(y1 - y0), 1e-6) * float(pad_factor)
return (
float(cx - half_w),
float(cx + half_w),
float(cy - half_h),
float(cy + half_h),
)
def _display_window_fov_selection(self, smap) -> DisplayFovSelection | None:
pixel_bounds = self._display_window_pixel_bounds(smap, pad_factor=1.10)
if pixel_bounds is not None:
x0, x1, y0, y1 = pixel_bounds
rect = Rectangle(
(x0, y0),
max(1e-6, x1 - x0),
max(1e-6, y1 - y0),
visible=False,
)
fov = self._pixel_rect_to_fov_selection(smap, rect)
return DisplayFovSelection(
center_x_arcsec=float(fov.center_x_arcsec),
center_y_arcsec=float(fov.center_y_arcsec),
width_arcsec=float(max(fov.width_arcsec, 1e-3)),
height_arcsec=float(max(fov.height_arcsec, 1e-3)),
)
geometry_observer_key = self._state.geometry_definition_observer_key if self._state is not None else None
box = self._build_legacy_box(smap, geometry_observer_key=geometry_observer_key)
if box is None:
return None
fov = self._box_bounds_to_fov_selection(box, smap)
return DisplayFovSelection(
center_x_arcsec=float(fov.center_x_arcsec),
center_y_arcsec=float(fov.center_y_arcsec),
width_arcsec=float(max(fov.width_arcsec, 1e-3) * 1.10),
height_arcsec=float(max(fov.height_arcsec, 1e-3) * 1.10),
)
def _submap_to_explicit_fov(
self,
smap,
pad_factor: float = 1.10,
*,
fov_override: DisplayFovSelection | None = None,
):
fov = fov_override if fov_override is not None else (self._state.fov if (self._state and self._state.fov) else None)
if fov is None:
geometry_observer_key = self._state.geometry_definition_observer_key if self._state is not None else None
box = self._build_legacy_box(smap, geometry_observer_key=geometry_observer_key)
if box is None:
return smap
fov = self._box_bounds_to_fov_selection(box, smap)
half_w = 0.5 * max(float(fov.width_arcsec), 1e-3) * float(pad_factor)
half_h = 0.5 * max(float(fov.height_arcsec), 1e-3) * float(pad_factor)
observer = getattr(smap, "observer_coordinate", None) or "earth"
obstime = getattr(smap, "date", None)
bottom_left = SkyCoord(
Tx=(float(fov.center_x_arcsec) - half_w) * u.arcsec,
Ty=(float(fov.center_y_arcsec) - half_h) * u.arcsec,
frame=Helioprojective(observer=observer, obstime=obstime),
)
top_right = SkyCoord(
Tx=(float(fov.center_x_arcsec) + half_w) * u.arcsec,
Ty=(float(fov.center_y_arcsec) + half_h) * u.arcsec,
frame=Helioprojective(observer=observer, obstime=obstime),
)
try:
return smap.submap(bottom_left, top_right=top_right)
except Exception:
return smap
def _submap_to_pixel_bounds(
self,
smap,
bounds: tuple[float, float, float, float],
*,
margin_pixels: float = 2.0,
):
x0, x1, y0, y1 = bounds
if not all(np.isfinite(v) for v in (x0, x1, y0, y1)):
return smap
x_lo = min(float(x0), float(x1)) - float(margin_pixels)
x_hi = max(float(x0), float(x1)) + float(margin_pixels)
y_lo = min(float(y0), float(y1)) - float(margin_pixels)
y_hi = max(float(y0), float(y1)) + float(margin_pixels)
try:
bottom_left = smap.wcs.pixel_to_world(x_lo, y_lo)
top_right = smap.wcs.pixel_to_world(x_hi, y_hi)
return smap.submap(bottom_left, top_right=top_right)
except Exception:
return smap
@staticmethod
def _target_rsun_meters(target) -> float | None:
try:
if hasattr(target, "rsun_meters"):
return float(target.rsun_meters.to_value(u.m))
except Exception:
pass
try:
if isinstance(target, dict):
value = target.get("rsun_ref")
if value is not None:
return float(value)
except Exception:
pass
return None
@staticmethod
def _with_matching_rsun(smap, target):
target_rsun_m = MapBoxDisplayWidget._target_rsun_meters(target)
if not target_rsun_m or target_rsun_m <= 0:
return smap
try:
current_rsun_m = float(smap.rsun_meters.to_value(u.m))
except Exception:
current_rsun_m = None
if current_rsun_m is not None and abs(current_rsun_m - target_rsun_m) < 1e-3:
return smap
try:
meta = smap.meta.copy()
meta["rsun_ref"] = target_rsun_m
return smap._new_instance(smap.data, meta, plot_settings=smap.plot_settings)
except Exception:
return smap
@staticmethod
def _apply_display_scaling(smap, map_key: str) -> None:
data = np.asarray(smap.data)
finite = np.isfinite(data)
if not finite.any():
return
vals = data[finite]
try:
if map_key in _AIA_COLOR_KEYS:
if bool(getattr(smap, "meta", {}).get(_EMBEDDED_REFMAP_FLAG, False)):
cmap = sunpy_colormaps.cm.cmlist.get(f"sdoaia{map_key}")
if cmap is not None:
smap.plot_settings["cmap"] = cmap
lo = float(np.nanpercentile(vals, 0.5))
hi = float(np.nanpercentile(vals, 99.8))
if np.isfinite(lo) and np.isfinite(hi) and hi > lo:
smap.plot_settings["norm"] = mcolors.Normalize(vmin=lo, vmax=hi)
if map_key in _SIGNED_MAGNETIC_KEYS:
if map_key in _TRANSVERSE_MAGNETIC_KEYS:
pct = 92.5
elif map_key == "br":
pct = 97.5
else:
pct = 99.0
hi = float(np.nanpercentile(np.abs(vals), pct))
if hi > 0:
smap.plot_settings["cmap"] = "gray"
smap.plot_settings["norm"] = mcolors.TwoSlopeNorm(vmin=-hi, vcenter=0.0, vmax=hi)
elif map_key in _VERT_CURRENT_KEYS:
hi = float(np.nanpercentile(np.abs(vals), 99.0))
if hi > 0:
smap.plot_settings["cmap"] = "RdBu_r"
smap.plot_settings["norm"] = mcolors.TwoSlopeNorm(vmin=-hi, vcenter=0.0, vmax=hi)
elif str(map_key).startswith(_EOVSA_REFMAP_PREFIX):
lo = float(np.nanpercentile(vals, 0.5))
hi = float(np.nanpercentile(vals, 99.5))
if np.isfinite(lo) and np.isfinite(hi) and hi > lo:
smap.plot_settings["cmap"] = "hot"
smap.plot_settings["norm"] = mcolors.Normalize(vmin=lo, vmax=hi)
elif map_key in _CHROMO_MASK_KEYS:
cmap = mcolors.ListedColormap([
"#000000", "#1f77b4", "#ff7f0e", "#2ca02c",
"#d62728", "#9467bd", "#8c564b", "#e377c2",
"#7f7f7f",
])
smap.plot_settings["cmap"] = cmap
smap.plot_settings["norm"] = mcolors.BoundaryNorm(np.arange(-0.5, 9.5, 1.0), cmap.N)
elif map_key == "continuum":
lo = float(np.nanpercentile(vals, 1.0))
hi = float(np.nanpercentile(vals, 99.5))
if np.isfinite(lo) and np.isfinite(hi) and hi > lo:
smap.plot_settings["norm"] = mcolors.Normalize(vmin=lo, vmax=hi)
smap.plot_settings["cmap"] = "gray"
elif map_key == "field":
lo = float(np.nanpercentile(vals, 1.0))
hi = float(np.nanpercentile(vals, 99.0))
if np.isfinite(lo) and np.isfinite(hi) and hi > lo:
smap.plot_settings["norm"] = mcolors.Normalize(vmin=lo, vmax=hi)
smap.plot_settings["cmap"] = "magma"
elif map_key in {"inclination", "azimuth"}:
lo = float(np.nanpercentile(vals, 0.5))
hi = float(np.nanpercentile(vals, 99.5))
if np.isfinite(lo) and np.isfinite(hi) and hi > lo:
smap.plot_settings["norm"] = mcolors.Normalize(vmin=lo, vmax=hi)
smap.plot_settings["cmap"] = "twilight"
elif map_key == "disambig":
lo = float(np.nanmin(vals))
hi = float(np.nanmax(vals))
if np.isfinite(lo) and np.isfinite(hi) and hi > lo:
smap.plot_settings["norm"] = mcolors.Normalize(vmin=lo, vmax=hi)
smap.plot_settings["cmap"] = "viridis"
except Exception:
# Display scaling should not break rendering.
pass
def _refresh_plot(self, preserve_current_view: bool = False) -> None:
prev_xlim = prev_ylim = None
if preserve_current_view and self._current_axes is not None:
try:
prev_xlim = self._current_axes.get_xlim()
prev_ylim = self._current_axes.get_ylim()
except Exception:
prev_xlim = prev_ylim = None
self._clear_drag_preview_artists()
self._fig.clear()
self._current_map = None
self._current_axes = None
self._overlay_rect = None
self._overlay_bbox_rect = None
self._projected_box_bbox_rect = None
self._projected_box_fov = None
self._overlay_center_artist = None
self._overlay_corner_artists = []
self._overlay_line_artists = []
self._zoom_anchor_px = None
self._full_view_limits = None
smap = overlay_map = None
try:
smap = self._selected_context_map()
overlay_map = self._selected_bottom_map()
except Exception as exc:
ax = self._fig.add_subplot(111)
ax.text(0.5, 0.5, f"Map load failed:\n{exc}", ha="center", va="center")
ax.axis("off")
self._canvas.draw_idle()
return
if smap is None:
ref_map = self._reference_context_map()
if ref_map is not None:
smap, _coverage_fov = self._reproject_map_for_display_observer(ref_map)
else:
ax = self._fig.add_subplot(111)
ax.text(0.5, 0.5, "No local map available for selected map ID", ha="center", va="center")
ax.axis("off")
self._fig.subplots_adjust(left=0.12, right=0.985, bottom=0.10, top=0.96)
self._canvas.draw_idle()
return
try:
ax = self._fig.add_subplot(111, projection=smap)
self._current_map = smap
self._current_axes = ax
self._emit_observer_info()
ax.set_facecolor("black")
try:
ax.set_box_aspect(1)
except Exception:
pass
try:
ax.set_aspect("equal", adjustable="box")
except Exception:
pass
try:
if self._state is not None and self._state.selected_context_id is None:
# Keep correct context-map extents even when context display is hidden.
smap.plot(axes=ax, annotate=False, alpha=0.0)
else:
smap.plot(axes=ax, annotate=False)
except TypeError:
smap.plot(axes=ax)
try:
context_xlim = ax.get_xlim()
context_ylim = ax.get_ylim()
except Exception:
context_xlim = context_ylim = None
try:
ax.set_title("")
except Exception:
pass
context_key = None
bottom_key = None
if self._state is not None:
context_key = self._canonical_map_key(self._state.selected_context_id, purpose="context")
bottom_key = self._canonical_map_key(self._state.selected_bottom_id, purpose="bottom")
if overlay_map is not None and self._should_plot_bottom_overlay(context_key, bottom_key):
try:
overlay_map.plot(axes=ax, autoalign=True, alpha=0.95, zorder=5)
except Exception:
pass
# These often improve readability and mimic the legacy gxbox style.
try:
smap.draw_grid(axes=ax, color="w", lw=0.5, annotate=False)
except Exception:
pass
try:
smap.draw_limb(axes=ax, color="w", lw=0.8)
except Exception:
pass
self._plot_box_outline(ax, smap)
try:
ax.set_title("")
except Exception:
pass
title = (
f"{self._display_map_label(self._state.selected_context_id, bottom=False)} | "
f"{self._display_map_label(self._state.selected_bottom_id, bottom=True)} | "
f"{self._observer_label_for_key(self._state.display_observer_key)} | {getattr(smap, 'date', '')}"
)
self._fig.text(0.02, 0.992, title, ha="left", va="top", fontsize=10)
if context_xlim is not None and context_ylim is not None:
self._full_view_limits = (context_xlim, context_ylim)
if self._view_mode == "full_sun":
ax.set_xlim(context_xlim)
ax.set_ylim(context_ylim)
else:
self._full_view_limits = (ax.get_xlim(), ax.get_ylim())
if preserve_current_view and prev_xlim is not None and prev_ylim is not None:
self._restore_preserved_view(prev_xlim, prev_ylim)
elif self._view_mode == "box_fov":
self._set_view_to_projected_fov(pad_factor=1.10)
except Exception as exc:
ax = self._fig.add_subplot(111)
ax.text(0.5, 0.5, f"Plot failed:\n{exc}", ha="center", va="center")
ax.axis("off")
self._fig.subplots_adjust(left=0.12, right=0.985, bottom=0.12, top=0.93)
adjusted = self._auto_adjust_axes_margins(ax, top=0.93, pad_px=10.0)
self._render_fieldlines()
if adjusted:
self._canvas.draw()
else:
self._canvas.draw_idle()
self._pending_launch_margin_fix = True
self._update_cursor_for_mode()
[docs]
def open_live_3d_viewer(self) -> None:
self._check_viewer3d_state()
if self._viewer3d is not None:
try:
self._refresh_live_3d_viewer_state()
self._viewer3d.show()
if hasattr(self._viewer3d, "app_window"):
self._viewer3d.app_window.show()
self._viewer3d.app_window.showNormal()
if hasattr(self._viewer3d, "ensure_window_visible"):
self._viewer3d.ensure_window_visible()
self._viewer3d.app_window.raise_()
self._viewer3d.app_window.activateWindow()
if hasattr(self._viewer3d, "schedule_startup_los_view"):
self._viewer3d.schedule_startup_los_view()
return
except Exception:
self._viewer3d = None
self._viewer3d_temp_h5_path = None
self._refresh_open_3d_state()
self._emit_action_state()
if self._entry_box_path is None:
self._set_runtime_status("3D viewer unavailable: no entry box is attached to this selector.")
return
try:
self._ensure_session_model_loaded()
box, obs_time, b3dtype = self._clone_session_model()
if box is None:
raise RuntimeError("No in-memory session model is available for the embedded 3D viewer.")
self._apply_live_session_state(box)
box_norm_direction, box_view_up = _viewer_camera_basis(box, obs_time)
self._fieldline_frame_hcc = getattr(getattr(box, "_center", None), "frame", None)
self._fieldline_frame_obs = getattr(box, "_frame_obs", None)
self._viewer3d_close_handled = False
self._viewer3d = _magfield_viewer_cls()(
box,
time=obs_time,
b3dtype=b3dtype,
parent=self,
box_norm_direction=box_norm_direction,
box_view_up=box_view_up,
session_mode="embedded",
source_model_path=self._entry_box_path,
)
self._viewer3d_temp_h5_path = self._session_temp_h5_path
if hasattr(self._viewer3d, "app_window"):
self._viewer3d.app_window.setWindowTitle(f"GxBox 3D viewer - {self._entry_box_path.name}")
self._viewer3d.app_window.destroyed.connect(self._on_viewer3d_closed)
else:
self._viewer3d.destroyed.connect(self._on_viewer3d_closed)
self._refresh_open_3d_state()
self._emit_action_state()
self._viewer3d_watchdog.start()
host = self.window()
host.hide()
self._hidden_for_live_3d = True
self._viewer3d.show()
if hasattr(self._viewer3d, "app_window"):
self._viewer3d.app_window.show()
self._viewer3d.app_window.showNormal()
if hasattr(self._viewer3d, "ensure_window_visible"):
self._viewer3d.ensure_window_visible()
self._viewer3d.app_window.raise_()
self._viewer3d.app_window.activateWindow()
if hasattr(self._viewer3d, "schedule_startup_los_view"):
self._viewer3d.schedule_startup_los_view()
self._set_runtime_status(f"Opened live 3D viewer for: {self._entry_box_path}")
except Exception as exc:
self._viewer3d = None
self._viewer3d_temp_h5_path = None
self._refresh_open_3d_state()
self._emit_action_state()
self._set_runtime_status(f"3D viewer launch failed: {exc}")
[docs]
def clear_fieldlines(self) -> None:
self._fieldline_streamlines = []
self._fieldline_z_base = 0.0
while self._fieldline_artists:
artist = self._fieldline_artists.pop()
try:
artist.remove()
except Exception:
pass
self._can_clear_lines = False
self._emit_action_state()
self._canvas.draw_idle()
@staticmethod
def _should_plot_bottom_overlay(context_key: str | None, bottom_key: str | None) -> bool:
context_key = str(context_key or "")
bottom_key = str(bottom_key or "")
if not context_key or not bottom_key:
return False
if context_key == bottom_key:
return False
# Bottom overlay is useful for image-like context maps, but it obscures
# signed diagnostic maps such as Vert_current almost completely.
if context_key.startswith(_EOVSA_REFMAP_PREFIX):
return True
return context_key in _BOTTOM_OVERLAY_CONTEXT_KEYS
[docs]
def plot_fieldlines(self, streamlines, z_base=0.0) -> None:
self._fieldline_streamlines = list(streamlines or [])
self._fieldline_z_base = float(z_base)
rendered = self._render_fieldlines()
self._can_clear_lines = bool(self._fieldline_streamlines)
self._emit_action_state()
self._canvas.draw_idle()
if self._fieldline_streamlines:
if rendered > 0:
self._set_runtime_status(
f"Received {len(self._fieldline_streamlines)} field-line bundle(s) from 3D viewer; "
f"rendered {rendered} line(s)."
)
else:
self._set_runtime_status(
f"Received {len(self._fieldline_streamlines)} field-line bundle(s) from 3D viewer, "
"but no line segments projected into the current 2D view."
)
def _render_fieldlines(self) -> int:
while self._fieldline_artists:
artist = self._fieldline_artists.pop()
try:
artist.remove()
except Exception:
pass
if not self._fieldline_streamlines or self._current_axes is None or self._current_map is None:
return 0
rendered_count = 0
try:
frame_hcc = self._fieldline_frame_hcc
current_observer = getattr(self._current_map, "observer_coordinate", None)
current_obstime = getattr(self._current_map, "date", None)
if frame_hcc is None or current_observer is None:
self._set_runtime_status(
"Field-line overlay unavailable: no legacy-equivalent 3D viewer frames are attached."
)
return 0
frame_obs = Helioprojective(observer=current_observer, obstime=current_obstime)
cmap = mcolors.LinearSegmentedColormap.from_list(
"selector_fieldlines",
["#4c9aff", "#f6c945", "#e85d3f"],
N=256,
)
norm = mcolors.Normalize(vmin=0.0, vmax=1000.0)
for streamlines_subset in self._fieldline_streamlines:
for coord, field in self._extract_streamlines(streamlines_subset):
# Mirror the legacy GxBox field-line overlay behavior:
# convert streamline coords from HCC to observer HPC, project to
# map pixels, then render pixel-space LineCollection segments.
coord_world = local_cartesian_to_world(
coord,
frame=frame_hcc,
z_base_mm=self._fieldline_z_base,
)
coord_hpc = project_world_to_observer_hpc(
coord_world,
observer=current_observer,
obstime=current_obstime,
)
projected = project_world_to_pixel(coord_hpc, self._current_map)
if projected is None:
continue
x, y = projected
magnitude = np.asarray(field["magnitude"], dtype=float)
if x.size < 2 or y.size < 2 or magnitude.size < 2:
continue
finite = np.isfinite(x) & np.isfinite(y)
if np.count_nonzero(finite) < 2:
continue
segments = []
colors = []
for i in range(len(x) - 1):
if not (finite[i] and finite[i + 1]):
continue
segments.append(((x[i], y[i]), (x[i + 1], y[i + 1])))
color_idx = min(i, magnitude.size - 1)
colors.append(cmap(norm(magnitude[color_idx])))
if not segments:
continue
lc = LineCollection(segments, colors=colors, linewidths=0.7, alpha=0.7)
lc.set_zorder(20)
self._current_axes.add_collection(lc)
self._fieldline_artists.append(lc)
rendered_count += 1
except Exception as exc:
self._set_runtime_status(f"Field-line overlay failed: {exc}")
return 0
return rendered_count
@staticmethod
def _extract_streamlines(streamlines) -> list[tuple[np.ndarray, dict[str, np.ndarray]]]:
out = []
lines_arr = np.asarray(streamlines.lines)
points = np.asarray(streamlines.points)
i = 0
n_lines = int(lines_arr.shape[0])
while i < n_lines:
num_points = int(lines_arr[i])
start_idx = int(lines_arr[i + 1])
end_idx = start_idx + num_points
coord = points[start_idx:end_idx]
bx = np.asarray(streamlines["bx"][start_idx:end_idx])
by = np.asarray(streamlines["by"][start_idx:end_idx])
bz = np.asarray(streamlines["bz"][start_idx:end_idx])
out.append(
(
coord,
{
"bx": bx,
"by": by,
"bz": bz,
"magnitude": np.sqrt(bx ** 2 + by ** 2 + bz ** 2),
},
)
)
i += num_points + 1
return out
def _set_runtime_status(self, message: str) -> None:
self._last_status_base_text = message
self._emit_status_text()
@staticmethod
def _padded_fov_selection(
fov: DisplayFovSelection,
pad_factor: float,
) -> DisplayFovSelection:
return DisplayFovSelection(
center_x_arcsec=float(fov.center_x_arcsec),
center_y_arcsec=float(fov.center_y_arcsec),
width_arcsec=max(float(fov.width_arcsec) * float(pad_factor), 1e-3),
height_arcsec=max(float(fov.height_arcsec) * float(pad_factor), 1e-3),
)
@staticmethod
def _fov_contains(
outer: DisplayFovSelection | None,
inner: DisplayFovSelection | None,
*,
margin_arcsec: float = 2.0,
) -> bool:
if outer is None or inner is None:
return False
outer_half_w = 0.5 * float(outer.width_arcsec)
outer_half_h = 0.5 * float(outer.height_arcsec)
inner_half_w = 0.5 * float(inner.width_arcsec)
inner_half_h = 0.5 * float(inner.height_arcsec)
return (
float(inner.center_x_arcsec) - inner_half_w >= float(outer.center_x_arcsec) - outer_half_w + margin_arcsec
and float(inner.center_x_arcsec) + inner_half_w <= float(outer.center_x_arcsec) + outer_half_w - margin_arcsec
and float(inner.center_y_arcsec) - inner_half_h >= float(outer.center_y_arcsec) - outer_half_h + margin_arcsec
and float(inner.center_y_arcsec) + inner_half_h <= float(outer.center_y_arcsec) + outer_half_h - margin_arcsec
)
def _current_display_prepare_fov(
self,
observer_key: str,
*,
obstime=None,
pad_factor: float = 1.6,
) -> DisplayFovSelection | None:
if self._state is None or self._view_mode != "box_fov" or self._state.fov is None:
return None
compare_time = obstime if obstime is not None else self._state.session_input.time_iso
if not self._observers_share_los(self._state.fov_definition_observer_key, observer_key, compare_time):
return None
return self._padded_fov_selection(self._state.fov, pad_factor)
def _display_observer_reproject_header_for_fov(self, smap, observer, obstime, pad_factor: float = 1.10):
if self._state is None or self._state.fov is None:
return None
display_key = self._normalize_observer_key(self._state.display_observer_key)
fov_key = self._normalize_observer_key(self._state.fov_definition_observer_key)
if not self._observers_share_los(display_key, fov_key, obstime):
return None
fov = self._padded_fov_selection(self._state.fov, pad_factor)
return self._display_observer_reproject_header_for_selection(smap, observer, obstime, fov)
def _display_observer_reproject_header_for_selection(self, smap, observer, obstime, fov: DisplayFovSelection | None):
if fov is None:
return None
try:
scale_x = abs(float(smap.scale.axis1.to_value(u.arcsec / u.pix)))
scale_y = abs(float(smap.scale.axis2.to_value(u.arcsec / u.pix)))
except Exception:
return None
if not (np.isfinite(scale_x) and np.isfinite(scale_y) and scale_x > 0 and scale_y > 0):
return None
width = max(float(fov.width_arcsec), 4.0)
height = max(float(fov.height_arcsec), 4.0)
nx = max(32, int(np.ceil(width / scale_x)))
ny = max(32, int(np.ceil(height / scale_y)))
try:
target_center = SkyCoord(
Tx=float(fov.center_x_arcsec) * u.arcsec,
Ty=float(fov.center_y_arcsec) * u.arcsec,
frame=Helioprojective(observer=observer, obstime=obstime),
)
header = make_fitswcs_header(
np.empty((ny, nx), dtype=np.float32),
target_center,
scale=u.Quantity([scale_x, scale_y], u.arcsec / u.pix),
)
try:
header["rsun_ref"] = float(smap.rsun_meters.to_value(u.m))
except Exception:
pass
return header
except Exception:
return None
[docs]
def save_current_plot(self, output_path: str) -> None:
self._fig.savefig(output_path, dpi=150, bbox_inches="tight")
def _restore_preserved_view(self, prev_xlim, prev_ylim) -> None:
if self._current_axes is None:
return
prev_width = abs(float(prev_xlim[1] - prev_xlim[0]))
prev_height = abs(float(prev_ylim[1] - prev_ylim[0]))
if prev_width <= 0 or prev_height <= 0:
return
if self._state is not None and self._state.fov is not None and self._current_map is not None:
try:
observer = getattr(self._current_map, "observer_coordinate", None) or "earth"
obstime = getattr(self._current_map, "date", None)
fov = self._state.fov
center_world = SkyCoord(
Tx=float(fov.center_x_arcsec) * u.arcsec,
Ty=float(fov.center_y_arcsec) * u.arcsec,
frame=Helioprojective(observer=observer, obstime=obstime),
)
cpx, cpy = self._current_map.wcs.world_to_pixel(center_world)
if np.isfinite(cpx) and np.isfinite(cpy):
self._set_view_window(float(cpx), float(cpy), prev_width, prev_height)
return
except Exception:
pass
self._current_axes.set_xlim(prev_xlim)
self._current_axes.set_ylim(prev_ylim)
@staticmethod
def _default_context_id(session_input: SelectorSessionInput) -> Optional[str]:
map_ids = list(session_input.map_ids or [])
preferred = [
"171", "193", "211", "304", "335", "1600",
"Bz", "Ic", "B_rho", "B_theta", "B_phi", "disambig",
# Backward-compatible legacy labels.
"Br", "Bp", "Bt",
]
for key in preferred:
if key in map_ids:
return key
return map_ids[0] if map_ids else None
@staticmethod
def _default_bottom_id(session_input: SelectorSessionInput) -> Optional[str]:
base_maps = dict(session_input.base_maps or {})
if "bz" in base_maps:
return "Bz"
if "ic" in base_maps:
return "Ic"
if "bx" in base_maps:
return "Bx"
if "by" in base_maps:
return "By"
if "vert_current" in base_maps:
return "Vert_current"
if "chromo_mask" in base_maps:
return "chromo_mask"
return None
def _edge_pixel_bounds(self, smap, edges) -> tuple[float, float, float, float] | None:
xs: list[float] = []
ys: list[float] = []
for edge in edges:
try:
px, py = smap.wcs.world_to_pixel(edge)
except Exception:
continue
px = np.asarray(px, dtype=float).ravel()
py = np.asarray(py, dtype=float).ravel()
finite = np.isfinite(px) & np.isfinite(py)
if np.any(finite):
xs.extend(px[finite].tolist())
ys.extend(py[finite].tolist())
if not xs or not ys:
return None
return (
float(np.nanmin(xs)),
float(np.nanmax(xs)),
float(np.nanmin(ys)),
float(np.nanmax(ys)),
)
def _plot_box_outline(self, ax, smap) -> None:
if self._state is None or self._state.geometry is None:
return
try:
box = self._build_legacy_box(
smap,
geometry_observer_key=self._state.geometry_definition_observer_key,
)
if box is None:
return
self._overlay_line_artists = []
for edge in box.bottom_edges:
self._overlay_line_artists.extend(
ax.plot_coord(edge, color="tab:red", ls="--", marker="", lw=1.0, zorder=20)
)
for edge in box.non_bottom_edges:
self._overlay_line_artists.extend(
ax.plot_coord(edge, color="tab:red", ls="-", marker="", lw=1.0, zorder=20)
)
full_bounds = self._edge_pixel_bounds(smap, list(box.bottom_edges) + list(box.non_bottom_edges))
bottom_bounds = self._edge_pixel_bounds(smap, list(box.bottom_edges))
if full_bounds is None or bottom_bounds is None:
return
x0, x1, y0, y1 = full_bounds
bx0, bx1, by0, by1 = bottom_bounds
self._projected_box_fov = self._box_bounds_to_fov_selection(box, smap)
self._projected_box_bbox_rect = self._fov_selection_to_pixel_rect(smap, self._projected_box_fov)
if self._state.fov is None:
self._state.fov = DisplayFovSelection(
center_x_arcsec=self._projected_box_fov.center_x_arcsec,
center_y_arcsec=self._projected_box_fov.center_y_arcsec,
width_arcsec=self._projected_box_fov.width_arcsec,
height_arcsec=self._projected_box_fov.height_arcsec,
)
self._state.fov_definition_observer_key = self._normalize_observer_key(
self._state.display_observer_key
)
if self._fov_change_callback is not None:
self._fov_change_callback(self._state.fov)
if self._state.fov_box is None:
self._state.fov_box = self._compute_fov_box_from_geometry()
fov_rect = self._fov_selection_to_pixel_rect(smap, self._state.fov)
fx0, fy0 = fov_rect.get_x(), fov_rect.get_y()
fw, fh = fov_rect.get_width(), fov_rect.get_height()
projected_edges = self._fov_box_projected_edges(smap)
for edge in projected_edges:
try:
self._overlay_line_artists.extend(
ax.plot_coord(edge, color="deepskyblue", ls="-", marker="", lw=0.9, zorder=21)
)
except Exception:
continue
projected_face = self._fov_box_projected_face(smap)
if projected_face is not None:
try:
self._overlay_line_artists.extend(
ax.plot_coord(projected_face, color="deepskyblue", ls="-", marker="", lw=1.6, zorder=22)
)
except Exception:
pass
projected_bbox = self._edge_pixel_bounds(smap, projected_edges) if projected_edges else None
if projected_bbox is not None:
pfx0, pfx1, pfy0, pfy1 = projected_bbox
fx0, fy0 = pfx0, pfy0
fw, fh = max(1e-6, pfx1 - pfx0), max(1e-6, pfy1 - pfy0)
# Invisible rectangles retained as the internal geometry extents for
# button-driven manipulations and Box-FOV calculations.
self._overlay_rect = Rectangle(
(bx0, by0),
max(1e-6, bx1 - bx0),
max(1e-6, by1 - by0),
visible=False,
)
self._overlay_bbox_rect = Rectangle(
(fx0, fy0),
max(1e-6, fw),
max(1e-6, fh),
visible=False,
)
self._zoom_anchor_px = (
float(fx0 + 0.5 * max(1e-6, fw)),
float(fy0 + 0.5 * max(1e-6, fh)),
)
anchor = self._geometry_anchor_coord(self._state.geometry, smap).transform_to(box._frame_obs)
cpx, cpy = smap.wcs.world_to_pixel(anchor)
self._overlay_center_artist = ax.plot(
[float(cpx)], [float(cpy)],
marker="+", color="yellow", ms=10, mew=1.5,
transform=ax.get_transform("pixel"),
)[0]
self._overlay_line_artists.append(self._overlay_center_artist)
except Exception:
# Overlay failure should not break map display.
return
def _geometry_center_hpc_for_map(self, geom: BoxGeometrySelection, smap):
return self._geometry_anchor_coord(geom, smap).transform_to(
Helioprojective(
observer=self._resolved_observer_for_map(
smap,
self._state.geometry_definition_observer_key if self._state is not None else "earth",
) or "earth",
obstime=getattr(smap, "date", None),
)
)
def _box_half_extent_arcsec(self, n_pix: int, dx_km: float, smap) -> float:
# Approximate small-angle conversion using observer distance.
dsun_km = None
try:
observer = self._resolved_observer_for_map(smap)
if observer is not None:
dsun_km = observer.radius.to_value(u.km)
except Exception:
try:
dsun_km = smap.dsun.to_value(u.km)
except Exception:
try:
dsun_obs = smap.meta.get("dsun_obs")
if dsun_obs is not None:
dsun_km = (float(dsun_obs) * u.m).to_value(u.km)
except Exception:
dsun_km = None
if not dsun_km or dsun_km <= 0:
dsun_km = 1.496e8 # fallback ~1 AU
half_size_km = 0.5 * float(n_pix) * float(dx_km)
return half_size_km / dsun_km * 206265.0
def _geometry_pixel_arcsec(self, geom: BoxGeometrySelection, smap) -> float:
dsun_km = self._dsun_km_from_map(smap)
return float(max(geom.dx_km, 1e-6) / max(dsun_km, 1e-6) * 206265.0)
def _geometry_from_world_center(self, geom: BoxGeometrySelection, world_center) -> BoxGeometrySelection:
out = BoxGeometrySelection(
coord_mode=geom.coord_mode,
coord_x=geom.coord_x,
coord_y=geom.coord_y,
grid_x=geom.grid_x,
grid_y=geom.grid_y,
grid_z=geom.grid_z,
dx_km=geom.dx_km,
)
try:
if geom.coord_mode == CoordMode.HPC:
observer_key = self._state.geometry_definition_observer_key if self._state is not None else "earth"
source_context = self._observer_context(
observer_key,
getattr(self._current_map, "date", None),
)
c = world_center.transform_to(
Helioprojective(
obstime=getattr(source_context, "date", None) or getattr(self._current_map, "date", None),
observer=getattr(source_context, "observer_coordinate", None)
or self._resolved_observer_for_map(self._current_map, observer_key)
or "earth",
)
)
out.coord_x = float(c.Tx.to_value(u.arcsec))
out.coord_y = float(c.Ty.to_value(u.arcsec))
elif geom.coord_mode == CoordMode.HGC:
c = world_center.transform_to(
HeliographicCarrington(
obstime=getattr(self._current_map, "date", None),
observer=self._resolved_observer_for_map(self._current_map, observer_key) or "earth",
)
)
out.coord_x = float(c.lon.to_value(u.deg))
out.coord_y = float(c.lat.to_value(u.deg))
else:
c = world_center.transform_to(
HeliographicStonyhurst(obstime=getattr(self._current_map, "date", None))
)
out.coord_x = float(c.lon.to_value(u.deg))
out.coord_y = float(c.lat.to_value(u.deg))
except Exception:
pass
return out
[docs]
def show_box_fov_view(self, pad_factor: float | None = None) -> None:
# `pad_factor` is handled by the display-crop helper; keep the method
# signature stable for existing button hookups.
self._view_mode = "box_fov"
self._refresh_plot()
def _set_view_window(self, cx: float, cy: float, width: float, height: float) -> None:
if self._current_axes is None:
return
ax = self._current_axes
xlim = ax.get_xlim()
ylim = ax.get_ylim()
x_dir = 1.0 if xlim[1] >= xlim[0] else -1.0
y_dir = 1.0 if ylim[1] >= ylim[0] else -1.0
half_w = 0.5 * max(width, 4.0)
half_h = 0.5 * max(height, 4.0)
ax.set_xlim((cx - half_w, cx + half_w) if x_dir > 0 else (cx + half_w, cx - half_w))
ax.set_ylim((cy - half_h, cy + half_h) if y_dir > 0 else (cy + half_h, cy - half_h))
self._canvas.draw_idle()
def _set_view_to_projected_fov(self, pad_factor: float = 1.10) -> None:
if self._current_axes is None:
return
rect = self._overlay_bbox_rect or self._projected_box_bbox_rect
if rect is None:
return
x0 = float(rect.get_x())
y0 = float(rect.get_y())
width = float(rect.get_width())
height = float(rect.get_height())
if not (np.isfinite(x0) and np.isfinite(y0) and np.isfinite(width) and np.isfinite(height)):
return
width = max(width * float(pad_factor), 4.0)
height = max(height * float(pad_factor), 4.0)
self._set_view_window(
cx=x0 + 0.5 * float(rect.get_width()),
cy=y0 + 0.5 * float(rect.get_height()),
width=width,
height=height,
)
def _set_view_window_hpc(
self,
center_x_arcsec: float,
center_y_arcsec: float,
width_arcsec: float,
height_arcsec: float,
) -> None:
if self._current_axes is None or self._current_map is None:
return
half_w = 0.5 * max(float(width_arcsec), 1e-3)
half_h = 0.5 * max(float(height_arcsec), 1e-3)
observer = self._resolved_observer_for_map(self._current_map, self._state.display_observer_key) or "earth"
obstime = getattr(self._current_map, "date", None)
bottom_left = SkyCoord(
Tx=(center_x_arcsec - half_w) * u.arcsec,
Ty=(center_y_arcsec - half_h) * u.arcsec,
frame=Helioprojective(observer=observer, obstime=obstime),
)
top_right = SkyCoord(
Tx=(center_x_arcsec + half_w) * u.arcsec,
Ty=(center_y_arcsec + half_h) * u.arcsec,
frame=Helioprojective(observer=observer, obstime=obstime),
)
try:
corners = SkyCoord([bottom_left, top_right])
px, py = self._current_map.wcs.world_to_pixel(corners)
px = np.asarray(px, dtype=float).ravel()
py = np.asarray(py, dtype=float).ravel()
finite = np.isfinite(px) & np.isfinite(py)
if not np.any(finite):
return
px = px[finite]
py = py[finite]
x0, x1 = float(np.nanmin(px)), float(np.nanmax(px))
y0, y1 = float(np.nanmin(py)), float(np.nanmax(py))
self._set_view_window(
cx=0.5 * (x0 + x1),
cy=0.5 * (y0 + y1),
width=max(x1 - x0, 4.0),
height=max(y1 - y0, 4.0),
)
except Exception:
return
def _scale_view(self, factor: float, center_px: tuple[float, float] | None = None) -> None:
if self._current_axes is None:
return
ax = self._current_axes
x0, x1 = ax.get_xlim()
y0, y1 = ax.get_ylim()
if self._zoom_anchor_px is not None:
try:
cx, cy = self._zoom_anchor_px
if not (np.isfinite(cx) and np.isfinite(cy)):
raise ValueError("non-finite zoom center")
except Exception:
cx = 0.5 * (x0 + x1)
cy = 0.5 * (y0 + y1)
else:
rect = self._overlay_bbox_rect or self._projected_box_bbox_rect
if rect is not None:
try:
cx = float(rect.get_x()) + 0.5 * float(rect.get_width())
cy = float(rect.get_y()) + 0.5 * float(rect.get_height())
if not (np.isfinite(cx) and np.isfinite(cy)):
raise ValueError("non-finite zoom center")
except Exception:
cx = 0.5 * (x0 + x1)
cy = 0.5 * (y0 + y1)
elif center_px is not None:
cx, cy = center_px
else:
cx = 0.5 * (x0 + x1)
cy = 0.5 * (y0 + y1)
width = abs(x1 - x0) * float(factor)
height = abs(y1 - y0) * float(factor)
self._set_view_window(cx, cy, max(width, 4.0), max(height, 4.0))
def _nudge_box_size(self, axis: str, sign: int) -> None:
if not self._geometry_edit_enabled or self._state is None or self._state.geometry is None:
return
geom = self._state.geometry
step = self._coarse_box_grid_step(axis)
new_geom = BoxGeometrySelection(
coord_mode=geom.coord_mode,
coord_x=geom.coord_x,
coord_y=geom.coord_y,
grid_x=geom.grid_x,
grid_y=geom.grid_y,
grid_z=geom.grid_z,
dx_km=geom.dx_km,
)
if axis == "x":
new_geom.grid_x = max(1, geom.grid_x + int(sign) * step)
elif axis == "y":
new_geom.grid_y = max(1, geom.grid_y + int(sign) * step)
else:
return
self.set_geometry_selection(new_geom)
def _nudge_box_size_xy(self, sign: int) -> None:
if not self._geometry_edit_enabled or self._state is None or self._state.geometry is None:
return
geom = self._state.geometry
step_x = self._coarse_box_grid_step("x")
step_y = self._coarse_box_grid_step("y")
new_geom = BoxGeometrySelection(
coord_mode=geom.coord_mode,
coord_x=geom.coord_x,
coord_y=geom.coord_y,
grid_x=max(1, geom.grid_x + int(sign) * step_x),
grid_y=max(1, geom.grid_y + int(sign) * step_y),
grid_z=geom.grid_z,
dx_km=geom.dx_km,
)
self.set_geometry_selection(new_geom)
def _nudge_fov_size(self, axis: str, sign: int) -> None:
if self._state is None or self._state.fov is None:
return
fov = self._state.fov
step = self._coarse_fov_step(axis)
width = float(fov.width_arcsec)
height = float(fov.height_arcsec)
if axis == "x":
width = max(step, width + int(sign) * step)
if self._state.square_fov:
height = width
elif axis == "y":
if self._state.square_fov:
return
height = max(step, height + int(sign) * step)
else:
return
self.set_fov_selection(
DisplayFovSelection(
center_x_arcsec=fov.center_x_arcsec,
center_y_arcsec=fov.center_y_arcsec,
width_arcsec=width,
height_arcsec=height,
)
)
def _nudge_fov_size_xy(self, sign: int) -> None:
if self._state is None or self._state.fov is None:
return
fov = self._state.fov
step_x = self._coarse_fov_step("x")
step_y = self._coarse_fov_step("y")
width = max(step_x, float(fov.width_arcsec) + int(sign) * step_x)
height = max(step_y, float(fov.height_arcsec) + int(sign) * step_y)
if self._state.square_fov:
height = width
self.set_fov_selection(
DisplayFovSelection(
center_x_arcsec=fov.center_x_arcsec,
center_y_arcsec=fov.center_y_arcsec,
width_arcsec=width,
height_arcsec=height,
)
)
def _nudge_fov_center(self, axis: str, sign: int) -> None:
if self._state is None or self._state.fov is None:
return
fov = self._state.fov
step = self._coarse_fov_step(axis)
cx = float(fov.center_x_arcsec)
cy = float(fov.center_y_arcsec)
if axis == "x":
cx += float(sign) * step
elif axis == "y":
cy += float(sign) * step
else:
return
self.set_fov_selection(
DisplayFovSelection(
center_x_arcsec=cx,
center_y_arcsec=cy,
width_arcsec=fov.width_arcsec,
height_arcsec=fov.height_arcsec,
)
)
def _nudge_box_center(self, axis: str, sign: int) -> None:
if not self._geometry_edit_enabled or self._state is None or self._state.geometry is None or self._current_map is None:
return
geom = self._state.geometry
step_arcsec = self._coarse_box_center_step_arcsec(axis)
if not np.isfinite(step_arcsec) or step_arcsec <= 0:
return
center_hpc = self._geometry_center_hpc_for_map(geom, self._current_map)
tx = float(center_hpc.Tx.to_value(u.arcsec))
ty = float(center_hpc.Ty.to_value(u.arcsec))
if axis == "x":
tx += float(sign) * step_arcsec
elif axis == "y":
ty += float(sign) * step_arcsec
else:
return
nudged_center = SkyCoord(
Tx=tx * u.arcsec,
Ty=ty * u.arcsec,
frame=Helioprojective(
observer=getattr(self._current_map, "observer_coordinate", None) or "earth",
obstime=getattr(self._current_map, "date", None),
),
)
new_geom = self._geometry_from_world_center(geom, nudged_center)
self.set_geometry_selection(new_geom)
def _coarse_box_grid_step(self, axis: str) -> int:
if self._state is None or self._state.geometry is None:
return 1
geom = self._state.geometry
dim = geom.grid_x if axis == "x" else geom.grid_y if axis == "y" else geom.grid_z
return max(1, int(round(float(dim) * 0.10)))
def _coarse_fov_step(self, axis: str) -> float:
if self._state is None or self._state.fov is None:
return 1.0
fov = self._state.fov
span = float(fov.width_arcsec) if axis == "x" else float(fov.height_arcsec)
base_step = 1.0
if self._state.geometry is not None and self._current_map is not None:
try:
base_step = max(1e-3, self._geometry_pixel_arcsec(self._state.geometry, self._current_map))
except Exception:
base_step = 1.0
return max(base_step, abs(span) * 0.10)
def _coarse_box_center_step_arcsec(self, axis: str) -> float:
if self._state is None or self._state.geometry is None or self._current_map is None:
return 0.0
geom = self._state.geometry
step_arcsec = self._geometry_pixel_arcsec(geom, self._current_map)
if not np.isfinite(step_arcsec) or step_arcsec <= 0:
return 0.0
grid_step = self._coarse_box_grid_step(axis)
return float(step_arcsec) * float(grid_step)
def _compute_fov_box_from_geometry(self) -> Optional[DisplayFovBoxSelection]:
if self._state is None or self._state.fov is None or self._current_map is None:
return None
obstime = getattr(self._current_map, "date", None)
geometry_observer_key = self._state.geometry_definition_observer_key
source_map = self._observer_context(geometry_observer_key, obstime) or self._current_map
box = self._build_legacy_box(
source_map,
geometry_observer_key=geometry_observer_key,
)
if box is None:
return None
observer = self._resolved_observer_for_map(self._current_map, self._state.display_observer_key) or "earth"
try:
world = box.model_box_corners_world()
if world is None:
return None
fov_box = build_fov_box_from_red_box_world(world, observer=observer, obstime=obstime)
if fov_box is None:
return None
return DisplayFovBoxSelection(
center_x_arcsec=float(fov_box["xc_arcsec"]),
center_y_arcsec=float(fov_box["yc_arcsec"]),
width_arcsec=float(fov_box["xsize_arcsec"]),
height_arcsec=float(fov_box["ysize_arcsec"]),
z_min_mm=float(fov_box["zmin_mm"]),
z_max_mm=float(fov_box["zmax_mm"]),
observer_key=self._normalize_observer_key(self._state.display_observer_key),
)
except Exception:
return None
def _fov_box_world_corners(self, smap, fov_box: DisplayFovBoxSelection) -> SkyCoord | None:
if self._state is None:
return None
obstime = getattr(smap, "date", None)
source_observer_key = self._normalize_observer_key(getattr(fov_box, "observer_key", None))
source_context = self._observer_context(source_observer_key, obstime)
source_map = source_context or smap
box = self._build_legacy_box(
source_map,
geometry_observer_key=self._state.geometry_definition_observer_key,
)
if box is None:
return None
box_frame = getattr(getattr(box, "_center", None), "frame", None)
if box_frame is None:
return None
source_observer = self._resolved_observer_for_map(source_map, source_observer_key) or "earth"
meta = fov_box.as_observer_metadata(square=bool(self._state.square_fov))
try:
return observer_fov_box_to_world_corners(
xc_arcsec=float(meta["xc_arcsec"]),
yc_arcsec=float(meta["yc_arcsec"]),
xsize_arcsec=float(meta["xsize_arcsec"]),
ysize_arcsec=float(meta["ysize_arcsec"]),
zmin_mm=float(meta["zmin_mm"]),
zmax_mm=float(meta["zmax_mm"]),
observer=source_observer,
obstime=getattr(source_map, "date", None),
target_frame=box_frame,
)
except Exception:
return None
[docs]
def recompute_fov_from_box(self) -> None:
if self._state is None or self._projected_box_fov is None:
return
self._state.fov_definition_observer_key = self._normalize_observer_key(self._state.display_observer_key)
self._state.fov_box = self._compute_fov_box_from_geometry()
if self._state.fov_box is not None:
width = float(self._state.fov_box.width_arcsec)
height = float(self._state.fov_box.height_arcsec)
if self._state.square_fov:
side = max(width, height)
width = side
height = side
selection = DisplayFovSelection(
center_x_arcsec=float(self._state.fov_box.center_x_arcsec),
center_y_arcsec=float(self._state.fov_box.center_y_arcsec),
width_arcsec=width,
height_arcsec=height,
)
else:
width = self._projected_box_fov.width_arcsec
height = self._projected_box_fov.height_arcsec
if self._state.square_fov:
height = width
selection = DisplayFovSelection(
center_x_arcsec=self._projected_box_fov.center_x_arcsec,
center_y_arcsec=self._projected_box_fov.center_y_arcsec,
width_arcsec=width,
height_arcsec=height,
)
self.set_fov_selection(
selection
)
self._refresh_status_text()
def _on_scroll(self, event) -> None:
if self._current_axes is None or event.inaxes is not self._current_axes:
return
if event.xdata is None or event.ydata is None:
return
if getattr(event, "button", None) == "up":
self._scale_view(1 / 1.12, center_px=(event.xdata, event.ydata))
elif getattr(event, "button", None) == "down":
self._scale_view(1.12, center_px=(event.xdata, event.ydata))
def _overlay_rect_bounds(self):
if self._overlay_rect is None:
return None
x0, y0 = self._overlay_rect.get_x(), self._overlay_rect.get_y()
w, h = self._overlay_rect.get_width(), self._overlay_rect.get_height()
return x0, y0, x0 + w, y0 + h
def _set_static_overlay_visible(self, visible: bool) -> None:
for artist in self._overlay_line_artists:
try:
artist.set_visible(bool(visible))
except Exception:
continue
def _clear_drag_preview_artists(self) -> None:
for artist_name in (
"_drag_preview_box_artist",
"_drag_preview_fov_artist",
"_drag_preview_center_artist",
):
artist = getattr(self, artist_name, None)
if artist is not None:
try:
artist.remove()
except Exception:
pass
setattr(self, artist_name, None)
self._drag_preview_background = None
self._drag_preview_active = False
def _ensure_drag_preview(self) -> bool:
if self._current_axes is None or self._current_map is None:
return False
if self._drag_preview_active:
return True
self._set_static_overlay_visible(False)
self._canvas.draw()
try:
self._drag_preview_background = self._canvas.copy_from_bbox(self._current_axes.bbox)
except Exception:
self._set_static_overlay_visible(True)
self._canvas.draw_idle()
return False
pixel_transform = self._current_axes.get_transform("pixel")
self._drag_preview_box_artist = Rectangle(
(0.0, 0.0), 1.0, 1.0,
fill=False, ec="tab:red", ls="--", lw=1.2,
transform=pixel_transform, animated=True, visible=True,
)
self._drag_preview_fov_artist = Rectangle(
(0.0, 0.0), 1.0, 1.0,
fill=False, ec="deepskyblue", ls="-", lw=0.9,
transform=pixel_transform, animated=True, visible=True,
)
self._drag_preview_center_artist = self._current_axes.plot(
[0.0], [0.0],
marker="+", color="yellow", ms=10, mew=1.5,
transform=pixel_transform,
animated=True,
)[0]
self._current_axes.add_patch(self._drag_preview_box_artist)
self._current_axes.add_patch(self._drag_preview_fov_artist)
self._drag_preview_active = True
return True
def _update_drag_preview(self, box_rect: Rectangle, center_px: tuple[float, float]) -> None:
if not self._ensure_drag_preview():
return
fov_rect = self._overlay_bbox_rect
try:
self._drag_preview_box_artist.set_bounds(
float(box_rect.get_x()),
float(box_rect.get_y()),
float(box_rect.get_width()),
float(box_rect.get_height()),
)
if fov_rect is not None:
self._drag_preview_fov_artist.set_bounds(
float(fov_rect.get_x()),
float(fov_rect.get_y()),
float(fov_rect.get_width()),
float(fov_rect.get_height()),
)
self._drag_preview_fov_artist.set_visible(True)
else:
self._drag_preview_fov_artist.set_visible(False)
self._drag_preview_center_artist.set_data([float(center_px[0])], [float(center_px[1])])
self._canvas.restore_region(self._drag_preview_background)
self._current_axes.draw_artist(self._drag_preview_box_artist)
if self._drag_preview_fov_artist.get_visible():
self._current_axes.draw_artist(self._drag_preview_fov_artist)
self._current_axes.draw_artist(self._drag_preview_center_artist)
self._canvas.blit(self._current_axes.bbox)
except Exception:
self._end_drag_preview(restore_static=True)
def _end_drag_preview(self, *, restore_static: bool) -> None:
self._clear_drag_preview_artists()
if restore_static:
self._set_static_overlay_visible(True)
self._canvas.draw_idle()
def _geometry_preview_overlay(self, geom: BoxGeometrySelection) -> tuple[Rectangle, tuple[float, float]] | None:
if self._current_map is None or self._state is None:
return None
box = self._build_legacy_box(
self._current_map,
geom=geom,
geometry_observer_key=self._state.geometry_definition_observer_key,
)
if box is None:
return None
bottom_bounds = self._edge_pixel_bounds(self._current_map, list(box.bottom_edges))
if bottom_bounds is None:
return None
bx0, bx1, by0, by1 = bottom_bounds
anchor = self._geometry_anchor_coord(geom, self._current_map).transform_to(box._frame_obs)
cpx, cpy = self._current_map.wcs.world_to_pixel(anchor)
return (
Rectangle(
(bx0, by0),
max(1e-6, bx1 - bx0),
max(1e-6, by1 - by0),
visible=False,
),
(float(cpx), float(cpy)),
)
def _fov_selection_to_pixel_rect(self, smap, fov: DisplayFovSelection | None) -> Rectangle:
if fov is None:
if self._projected_box_bbox_rect is not None:
return Rectangle(
(self._projected_box_bbox_rect.get_x(), self._projected_box_bbox_rect.get_y()),
self._projected_box_bbox_rect.get_width(),
self._projected_box_bbox_rect.get_height(),
visible=False,
)
return Rectangle((0.0, 0.0), 10.0, 10.0, visible=False)
observer_key = self._state.fov_definition_observer_key if self._state is not None else "earth"
source_context = self._observer_context(observer_key, getattr(smap, "date", None))
observer = getattr(source_context, "observer_coordinate", None) or "earth"
obstime = getattr(source_context, "date", None) or getattr(smap, "date", None)
half_w = 0.5 * max(float(fov.width_arcsec), 1e-3)
half_h = 0.5 * max(float(fov.height_arcsec), 1e-3)
bottom_left = SkyCoord(
Tx=(fov.center_x_arcsec - half_w) * u.arcsec,
Ty=(fov.center_y_arcsec - half_h) * u.arcsec,
frame=Helioprojective(observer=observer, obstime=obstime),
)
top_right = SkyCoord(
Tx=(fov.center_x_arcsec + half_w) * u.arcsec,
Ty=(fov.center_y_arcsec + half_h) * u.arcsec,
frame=Helioprojective(observer=observer, obstime=obstime),
)
try:
px, py = smap.wcs.world_to_pixel(SkyCoord([bottom_left, top_right]))
px = np.asarray(px, dtype=float).ravel()
py = np.asarray(py, dtype=float).ravel()
finite = np.isfinite(px) & np.isfinite(py)
if np.any(finite):
px = px[finite]
py = py[finite]
x0, x1 = float(np.nanmin(px)), float(np.nanmax(px))
y0, y1 = float(np.nanmin(py)), float(np.nanmax(py))
return Rectangle((x0, y0), max(1e-6, x1 - x0), max(1e-6, y1 - y0), visible=False)
except Exception:
pass
# Fallback to projected-box bounds if corner projection fails.
if self._projected_box_bbox_rect is not None:
return Rectangle(
(self._projected_box_bbox_rect.get_x(), self._projected_box_bbox_rect.get_y()),
self._projected_box_bbox_rect.get_width(),
self._projected_box_bbox_rect.get_height(),
visible=False,
)
return Rectangle((0.0, 0.0), 10.0, 10.0, visible=False)
def _fov_box_projected_edges(self, smap) -> list[SkyCoord]:
if self._state is None:
return []
fov_box = self._state.fov_box
fov_rect = self._state.fov
if fov_box is None and fov_rect is None:
return []
observer = self._resolved_observer_for_map(smap, self._state.display_observer_key) or "earth"
obstime = getattr(smap, "date", None)
frame_obs = Helioprojective(observer=observer, obstime=obstime)
if fov_box is not None:
corners_world = self._fov_box_world_corners(smap, fov_box)
if corners_world is not None and len(corners_world) == 8:
projected_edges = project_coordinate_edges_to_observer_hpc(
corners_world,
edge_pairs=_BOX_EDGE_INDEX_PAIRS,
frame_obs=frame_obs,
)
if projected_edges is not None:
return projected_edges
fov_like = fov_rect or fov_box
if fov_like is None:
return []
source_observer_key = self._state.fov_definition_observer_key
source_context = self._observer_context(source_observer_key, obstime)
source_observer = getattr(source_context, "observer_coordinate", None) or observer
source_obstime = getattr(source_context, "date", None) or obstime
source_frame = Helioprojective(observer=source_observer, obstime=source_obstime)
half_w = 0.5 * max(float(fov_like.width_arcsec), 1e-3)
half_h = 0.5 * max(float(fov_like.height_arcsec), 1e-3)
base_corners = observer_rectangle_to_hpc_corners(
xc_arcsec=float(fov_like.center_x_arcsec),
yc_arcsec=float(fov_like.center_y_arcsec),
xsize_arcsec=2.0 * half_w,
ysize_arcsec=2.0 * half_h,
observer=source_observer,
obstime=source_obstime,
)
if base_corners is None or len(base_corners) != 4:
return []
corners = SkyCoord(list(base_corners) + list(base_corners))
projected_edges = project_coordinate_edges_to_observer_hpc(
corners,
edge_pairs=_BOX_EDGE_INDEX_PAIRS,
frame_obs=frame_obs,
)
return projected_edges or []
def _fov_box_projected_face(self, smap) -> SkyCoord | None:
if self._state is None or self._state.fov_box is None:
return None
fov_box = self._state.fov_box
observer = self._resolved_observer_for_map(smap, self._state.display_observer_key) or "earth"
obstime = getattr(smap, "date", None)
frame_obs = Helioprojective(observer=observer, obstime=obstime)
corners_world = self._fov_box_world_corners(smap, fov_box)
if corners_world is None or len(corners_world) != 8:
return None
return project_box_front_face_to_observer_hpc(corners_world, frame_obs=frame_obs)
def _box_bounds_to_fov_selection(self, box, smap) -> DisplayFovSelection:
bounds = box.bounds_coords.transform_to(
Helioprojective(
observer=self._resolved_observer_for_map(smap, self._state.display_observer_key) or "earth",
obstime=getattr(smap, "date", None),
)
)
tx = np.asarray(bounds.Tx.to_value(u.arcsec), dtype=float).ravel()
ty = np.asarray(bounds.Ty.to_value(u.arcsec), dtype=float).ravel()
finite = np.isfinite(tx) & np.isfinite(ty)
if not np.any(finite):
return DisplayFovSelection(0.0, 0.0, 10.0, 10.0)
tx = tx[finite]
ty = ty[finite]
xmin, xmax = float(np.nanmin(tx)), float(np.nanmax(tx))
ymin, ymax = float(np.nanmin(ty)), float(np.nanmax(ty))
return DisplayFovSelection(
center_x_arcsec=0.5 * (xmin + xmax),
center_y_arcsec=0.5 * (ymin + ymax),
width_arcsec=max(1e-3, xmax - xmin),
height_arcsec=max(1e-3, ymax - ymin),
)
def _pixel_rect_to_fov_selection(self, smap, rect: Rectangle) -> DisplayFovSelection:
x0, y0 = rect.get_x(), rect.get_y()
w, h = rect.get_width(), rect.get_height()
cx = x0 + 0.5 * w
cy = y0 + 0.5 * h
world = smap.wcs.pixel_to_world(cx, cy)
observer_key = self._state.fov_definition_observer_key if self._state is not None else "earth"
source_context = self._observer_context(observer_key, getattr(smap, "date", None))
hpc = world.transform_to(
Helioprojective(
observer=getattr(source_context, "observer_coordinate", None)
or self._resolved_observer_for_map(smap, observer_key)
or "earth",
obstime=getattr(source_context, "date", None) or getattr(smap, "date", None),
)
)
scale_x = self._map_pixel_scale_arcsec(smap, axis=0)
scale_y = self._map_pixel_scale_arcsec(smap, axis=1)
return DisplayFovSelection(
center_x_arcsec=float(hpc.Tx.to_value(u.arcsec)),
center_y_arcsec=float(hpc.Ty.to_value(u.arcsec)),
width_arcsec=float(abs(w) * scale_x),
height_arcsec=float(abs(h) * scale_y),
)
@staticmethod
def _map_pixel_scale_arcsec(smap, axis: int) -> float:
try:
if axis == 0:
return abs(float(smap.scale.axis1.to_value(u.arcsec / u.pix)))
return abs(float(smap.scale.axis2.to_value(u.arcsec / u.pix)))
except Exception:
return 0.6
def _hit_test_overlay(self, ex: float, ey: float):
bounds = self._overlay_rect_bounds()
if bounds is None:
return None
x0, y0, x1, y1 = bounds
w, h = x1 - x0, y1 - y0
cx, cy = x0 + 0.5 * w, y0 + 0.5 * h
tol = max(6.0, 0.03 * max(w, h))
if abs(ex - cx) <= tol and abs(ey - cy) <= tol:
return {"kind": "move"}
corners = {"bl": (x0, y0), "br": (x1, y0), "tr": (x1, y1), "tl": (x0, y1)}
for corner_name, (hx, hy) in corners.items():
if abs(ex - hx) <= tol and abs(ey - hy) <= tol:
return {"kind": "resize_corner", "corner": corner_name}
if y0 - tol <= ey <= y1 + tol and abs(ex - x0) <= tol:
return {"kind": "resize_x", "side": "left"}
if y0 - tol <= ey <= y1 + tol and abs(ex - x1) <= tol:
return {"kind": "resize_x", "side": "right"}
if x0 - tol <= ex <= x1 + tol and abs(ey - y0) <= tol:
return {"kind": "resize_y", "side": "bottom"}
if x0 - tol <= ex <= x1 + tol and abs(ey - y1) <= tol:
return {"kind": "resize_y", "side": "top"}
if x0 <= ex <= x1 and y0 <= ey <= y1:
return {"kind": "inside"}
return None
def _build_drag_state_from_click(self, ex: float, ey: float):
bounds = self._overlay_rect_bounds()
if bounds is None:
return None
x0, y0, x1, y1 = bounds
w, h = x1 - x0, y1 - y0
cx, cy = x0 + 0.5 * w, y0 + 0.5 * h
hit = self._hit_test_overlay(ex, ey)
if self._interaction_mode == "auto":
if hit is None:
return None
if hit["kind"] in {"move", "inside"}:
return {"mode": "move", "dx": ex - cx, "dy": ey - cy}
if hit["kind"] == "resize_corner":
corner_name = hit["corner"]
return {
"mode": "resize",
"corner": corner_name,
"anchor_x": x1 if "l" in corner_name else x0,
"anchor_y": y1 if "b" in corner_name else y0,
}
if hit["kind"] == "resize_x":
return {"mode": "resize_x", "anchor_x": x1 if hit["side"] == "left" else x0}
if hit["kind"] == "resize_y":
return {"mode": "resize_y", "anchor_y": y1 if hit["side"] == "bottom" else y0}
return None
if hit is None and self._interaction_mode in {"move", "resize_xy", "resize_x", "resize_y"}:
return None
if self._interaction_mode == "move":
return {"mode": "move", "dx": ex - cx, "dy": ey - cy}
if self._interaction_mode == "resize_xy":
# Pick the active corner by click quadrant around current center.
return {
"mode": "resize",
"corner": ("t" if ey >= cy else "b") + ("r" if ex >= cx else "l"),
"anchor_x": x0 if ex >= cx else x1,
"anchor_y": y0 if ey >= cy else y1,
}
if self._interaction_mode == "resize_x":
return {"mode": "resize_x", "anchor_x": x0 if ex >= cx else x1}
if self._interaction_mode == "resize_y":
return {"mode": "resize_y", "anchor_y": y0 if ey >= cy else y1}
return None
def _update_cursor_for_mode(self) -> None:
if self._drag_state is not None:
return
if self._interaction_mode == "move":
self._canvas.setCursor(Qt.SizeAllCursor)
elif self._interaction_mode == "resize_x":
self._canvas.setCursor(Qt.SizeHorCursor)
elif self._interaction_mode == "resize_y":
self._canvas.setCursor(Qt.SizeVerCursor)
elif self._interaction_mode == "resize_xy":
self._canvas.setCursor(Qt.SizeFDiagCursor)
else:
self._canvas.setCursor(Qt.ArrowCursor)
def _update_hover_cursor(self, event) -> None:
if self._interaction_mode != "auto":
self._update_cursor_for_mode()
return
if event is None or event.inaxes is not self._current_axes or event.xdata is None or event.ydata is None:
self._canvas.setCursor(Qt.ArrowCursor)
return
hit = self._hit_test_overlay(float(event.xdata), float(event.ydata))
if hit is None:
self._canvas.setCursor(Qt.ArrowCursor)
return
if hit["kind"] in {"move", "inside"}:
self._canvas.setCursor(Qt.SizeAllCursor)
elif hit["kind"] == "resize_x":
self._canvas.setCursor(Qt.SizeHorCursor)
elif hit["kind"] == "resize_y":
self._canvas.setCursor(Qt.SizeVerCursor)
elif hit["kind"] == "resize_corner":
self._canvas.setCursor(Qt.SizeFDiagCursor if hit["corner"] in {"bl", "tr"} else Qt.SizeBDiagCursor)
else:
self._canvas.setCursor(Qt.ArrowCursor)
def _on_mouse_press(self, event) -> None:
if not self._mouse_actions_enabled:
return
if event.button != 1 or event.inaxes is None:
return
if self._state is None or self._state.geometry is None or self._current_map is None or self._overlay_rect is None:
return
if event.inaxes is not self._current_axes:
return
ex, ey = event.xdata, event.ydata
if ex is None or ey is None:
return
self._drag_state = self._build_drag_state_from_click(float(ex), float(ey))
if self._drag_state is not None:
self._drag_preview_geometry = None
self._update_cursor_for_mode()
def _on_mouse_move(self, event) -> None:
if not self._mouse_actions_enabled:
return
if self._drag_state is None:
self._update_hover_cursor(event)
return
if event.inaxes is not self._current_axes or event.xdata is None or event.ydata is None:
return
if self._state is None or self._state.geometry is None or self._current_map is None:
return
geom = self._state.geometry
smap = self._current_map
x = float(event.xdata)
y = float(event.ydata)
try:
if self._drag_state["mode"] == "move":
new_cx = x - self._drag_state["dx"]
new_cy = y - self._drag_state["dy"]
new_geom = self._geometry_from_pixel_edit(
geom,
center_px=(new_cx, new_cy),
)
elif self._drag_state["mode"] == "resize":
ax_x = float(self._drag_state["anchor_x"])
ax_y = float(self._drag_state["anchor_y"])
cx = 0.5 * (ax_x + x)
cy = 0.5 * (ax_y + y)
new_geom = self._geometry_from_pixel_edit(
geom,
center_px=(cx, cy),
size_px=(abs(x - ax_x), abs(y - ax_y)),
)
elif self._drag_state["mode"] == "resize_x":
rect = self._overlay_rect
y0, h = rect.get_y(), rect.get_height()
ax_x = float(self._drag_state["anchor_x"])
cx = 0.5 * (ax_x + x)
cy = y0 + 0.5 * h
new_geom = self._geometry_from_pixel_edit(
geom,
center_px=(cx, cy),
size_px=(abs(x - ax_x), h),
)
elif self._drag_state["mode"] == "resize_y":
rect = self._overlay_rect
x0, w = rect.get_x(), rect.get_width()
ax_y = float(self._drag_state["anchor_y"])
cx = x0 + 0.5 * w
cy = 0.5 * (ax_y + y)
new_geom = self._geometry_from_pixel_edit(
geom,
center_px=(cx, cy),
size_px=(w, abs(y - ax_y)),
)
else:
return
except Exception:
return
preview = self._geometry_preview_overlay(new_geom)
if preview is None:
return
box_rect, center_px = preview
self._drag_preview_geometry = new_geom
self._update_drag_preview(box_rect, center_px)
def _on_mouse_release(self, event) -> None:
if not self._mouse_actions_enabled:
return
pending_geom = self._drag_preview_geometry
self._drag_state = None
self._drag_preview_geometry = None
if pending_geom is not None:
self._end_drag_preview(restore_static=False)
self.set_geometry_selection(pending_geom)
else:
self._end_drag_preview(restore_static=True)
self._update_hover_cursor(event)
def _geometry_from_pixel_edit(self, geom: BoxGeometrySelection, center_px=None, size_px=None) -> BoxGeometrySelection:
smap = self._current_map
out = BoxGeometrySelection(
coord_mode=geom.coord_mode,
coord_x=geom.coord_x,
coord_y=geom.coord_y,
grid_x=geom.grid_x,
grid_y=geom.grid_y,
grid_z=geom.grid_z,
dx_km=geom.dx_km,
)
if center_px is not None:
world = smap.wcs.pixel_to_world(float(center_px[0]), float(center_px[1]))
try:
if geom.coord_mode == CoordMode.HPC:
c = world.transform_to(Helioprojective(
obstime=getattr(smap, "date", None),
observer=self._resolved_observer_for_map(
smap,
self._state.geometry_definition_observer_key if self._state is not None else "earth",
) or "earth",
))
out.coord_x = float(c.Tx.to_value(u.arcsec))
out.coord_y = float(c.Ty.to_value(u.arcsec))
elif geom.coord_mode == CoordMode.HGC:
c = world.transform_to(HeliographicCarrington(
obstime=getattr(smap, "date", None),
observer=self._resolved_observer_for_map(
smap,
self._state.geometry_definition_observer_key if self._state is not None else "earth",
) or "earth",
))
out.coord_x = float(c.lon.to_value(u.deg))
out.coord_y = float(c.lat.to_value(u.deg))
else:
c = world.transform_to(HeliographicStonyhurst(obstime=getattr(smap, "date", None)))
out.coord_x = float(c.lon.to_value(u.deg))
out.coord_y = float(c.lat.to_value(u.deg))
except Exception:
pass
if size_px is not None:
try:
center_hpc = self._geometry_center_hpc_for_map(out, smap)
cpx, cpy = smap.wcs.world_to_pixel(center_hpc)
wpx = max(1.0, float(size_px[0]))
hpx = max(1.0, float(size_px[1]))
x0 = float(cpx) - 0.5 * wpx
x1 = float(cpx) + 0.5 * wpx
y0 = float(cpy) - 0.5 * hpx
y1 = float(cpy) + 0.5 * hpx
wx = smap.wcs.pixel_to_world([x0, x1], [float(cpy), float(cpy)])
wy = smap.wcs.pixel_to_world([float(cpx), float(cpx)], [y0, y1])
dx_arcsec = abs(wx[1].Tx.to_value(u.arcsec) - wx[0].Tx.to_value(u.arcsec))
dy_arcsec = abs(wy[1].Ty.to_value(u.arcsec) - wy[0].Ty.to_value(u.arcsec))
dsun_km = self._dsun_km_from_map(smap)
width_km = dx_arcsec / 206265.0 * dsun_km
height_km = dy_arcsec / 206265.0 * dsun_km
out.grid_x = max(1, int(round(width_km / max(out.dx_km, 1e-6))))
out.grid_y = max(1, int(round(height_km / max(out.dx_km, 1e-6))))
except Exception:
pass
return out
def _dsun_km_from_map(self, smap) -> float:
try:
observer = self._resolved_observer_for_map(smap)
if observer is not None:
return float(observer.radius.to_value(u.km))
except Exception:
try:
return float(smap.dsun.to_value(u.km))
except Exception:
dsun_obs = smap.meta.get("dsun_obs")
if dsun_obs is not None:
return float((float(dsun_obs) * u.m).to_value(u.km))
return 1.496e8