Source code for pyampp.gxbox.gx_fov2box

import ctypes
import subprocess
import shlex
import shutil
import sys
import os
import signal
import re
import ast
import io
import contextlib
import datetime as dt
import threading
import time as time_mod
from dataclasses import dataclass, replace
from pathlib import Path
from typing import Dict, Any, Optional, Tuple, Callable, List

import astropy.units as u
import numpy as np
import typer
from astropy.coordinates import SkyCoord
from astropy.time import Time
from astropy.io import fits
from pyAMaFiL.mag_field_lin_fff import MagFieldLinFFF
from pyAMaFiL import mag_field_wrapper as pyamafil_mag_field_wrapper
from pyAMaFiL.mag_field_proc import (
    MagFieldProcessor,
    mfp_util_invert_index_array,
    mfp_util_transpose_index,
)
from sunpy.coordinates import (Heliocentric, HeliographicCarrington, HeliographicStonyhurst,
                               Helioprojective, get_earth)
from sunpy.map import Map

from pyampp.data.downloader import SDOImageDownloader
from pyampp.gx_chromo.combo_model import combo_model
from pyampp.gx_chromo.decompose import decompose
from pyampp.gxbox.box import Box
from pyampp.gxbox.boxutils import (
    hmi_b2ptr,
    hmi_disambig,
    read_b3d_h5,
    write_b3d_h5,
    compute_vertical_current,
    load_sunpy_map_compat,
    map_from_data_header_compat,
    remap_vertical_current_inputs,
)
from pyampp.io import load_model
from pyampp.io.refmaps import (
    build_fits_refmaps_for_model,
    build_refmap_payload_for_model,
    discover_fits_refmap_map_ids,
    model_obstime_from_base_index,
)
from pyampp.io.model import _normalize_loaded_model_dict
from pyampp.gxbox.gx_box2id import gx_box2id
from pyampp.gxbox.observer_restore import (
    build_pb0r_metadata_from_ephemeris,
    resolve_named_observer,
    normalize_observer_key,
)
from pyampp.util.config import (
    DOWNLOAD_DIR,
    GXMODEL_DIR,
    AIA_EUV_PASSBANDS,
    AIA_UV_PASSBANDS,
    IDL_HMI_RSUN_M,
)


[docs] app = typer.Typer(help="Run gx_fov2box pipeline without GUI and save model stages.")
_VALID_REPROJECT_ALGORITHMS = ("adaptive", "exact", "interpolation") def _patch_pyamafil_mag_field_wrapper_for_legacy_libraries() -> None: wrapper_classes = [] for candidate in (MagFieldProcessor.__mro__[1], pyamafil_mag_field_wrapper.MagFieldWrapper): if candidate not in wrapper_classes: wrapper_classes.append(candidate) for wrapper_cls in wrapper_classes: if getattr(wrapper_cls, "_pyampp_legacy_nlfff_patch", False): continue original_init = wrapper_cls.__init__ original_set_setting = wrapper_cls.set_setting def _fallback_init(self, lib_path=""): if lib_path == "": lib_path = list(Path(pyamafil_mag_field_wrapper.__file__).resolve().parent.glob("WWNLFFFReconstruction*"))[0].as_posix() self._MagFieldWrapper__mptr1 = np.ctypeslib.ndpointer(dtype=np.float64, ndim=1, flags="C") self._MagFieldWrapper__mptr2 = np.ctypeslib.ndpointer(dtype=np.float64, ndim=2, flags="C") self._MagFieldWrapper__mptr3 = np.ctypeslib.ndpointer(dtype=np.float64, ndim=3, flags="C") self._MagFieldWrapper__mpint1 = np.ctypeslib.ndpointer(dtype=np.int32, ndim=1, flags="C") self._MagFieldWrapper__mpint2 = np.ctypeslib.ndpointer(dtype=np.int32, ndim=2, flags="C") self._MagFieldWrapper__mpint3 = np.ctypeslib.ndpointer(dtype=np.int32, ndim=3, flags="C") self._MagFieldWrapper__mp64 = np.ctypeslib.ndpointer(dtype=np.uint64, ndim=1, flags="C") self._MagFieldWrapper__mpstr = ctypes.POINTER(ctypes.c_char) self._MagFieldWrapper__mvoid = ctypes.c_void_p self._MagFieldWrapper__mint = ctypes.c_int32 self._MagFieldWrapper__mreal = ctypes.c_double self._MagFieldWrapper__mdw = ctypes.c_uint32 self._MagFieldWrapper__m64 = ctypes.c_uint64 lib_mfw = ctypes.CDLL(lib_path) create_func = lib_mfw.utilInitialize create_func.argtypes = [] create_func.restype = self._MagFieldWrapper__mdw set_int_func = lib_mfw.utilSetInt set_int_func.argtypes = [self._MagFieldWrapper__mpstr, self._MagFieldWrapper__mint] set_int_func.restype = self._MagFieldWrapper__mint set_double_func = lib_mfw.utilSetDouble set_double_func.argtypes = [self._MagFieldWrapper__mpstr, self._MagFieldWrapper__mreal] set_double_func.restype = self._MagFieldWrapper__mint get_double_func = lib_mfw.utilGetDouble get_double_func.argtypes = [self._MagFieldWrapper__mpstr, self._MagFieldWrapper__mptr1] get_double_func.restype = self._MagFieldWrapper__mint set_setting_func = getattr(lib_mfw, "utilSetSetting", None) if set_setting_func is not None: set_setting_func.argtypes = [self._MagFieldWrapper__mpstr, self._MagFieldWrapper__mreal] set_setting_func.restype = self._MagFieldWrapper__mint get_version_func = lib_mfw.utilGetVersion get_version_func.argtypes = [self._MagFieldWrapper__mpstr, self._MagFieldWrapper__mint] get_version_func.restype = self._MagFieldWrapper__mint nlfff_func = lib_mfw.mfoNLFFFCore nlfff_func.argtypes = [self._MagFieldWrapper__mpint1, self._MagFieldWrapper__mptr3, self._MagFieldWrapper__mptr3, self._MagFieldWrapper__mptr3] nlfff_func.restype = self._MagFieldWrapper__mint lines_func = lib_mfw.mfoGetLines lines_func.restype = self._MagFieldWrapper__mdw self._MagFieldWrapper__lib_path = lib_path self._MagFieldWrapper__func_set = { "create_func": create_func, "set_int_func": set_int_func, "set_double_func": set_double_func, "get_double_func": get_double_func, "set_setting_func": set_setting_func, "get_version_func": get_version_func, "NLFFF_func": nlfff_func, "lines_func": lines_func, } self._MagFieldWrapper__pointer = create_func() def _compat_init(self, lib_path=""): try: original_init(self, lib_path) except AttributeError as exc: if "utilSetSetting" not in str(exc): raise _fallback_init(self, lib_path) def _compat_set_setting(self, prop, value): func_set = self._MagFieldWrapper__func_set set_setting_func = func_set.get("set_setting_func") if set_setting_func is not None: return original_set_setting(self, prop, value) if isinstance(value, (bool, np.bool_, int, np.integer)) and not isinstance(value, (float, np.floating)): return func_set["set_int_func"](prop.encode("utf-8"), np.int32(value)) return func_set["set_double_func"](prop.encode("utf-8"), np.float64(value)) wrapper_cls.__init__ = _compat_init wrapper_cls.set_setting = _compat_set_setting wrapper_cls._pyampp_legacy_nlfff_patch = True _patch_pyamafil_mag_field_wrapper_for_legacy_libraries() def _spherical_screen_context_for_observer(observer): if hasattr(Helioprojective, "assume_spherical_screen"): return Helioprojective.assume_spherical_screen(observer) try: from sunpy.coordinates.screens import SphericalScreen return SphericalScreen(observer) except Exception: return contextlib.nullcontext() def _is_offdisk_submap_nan_error(exc: Exception) -> bool: text = str(exc).lower() return ( "submap" in text and "nan" in text and ( "off-disk" in text or "off disk" in text or "sphericalscreen" in text or "spherical_screen" in text ) ) def _submap_with_fov_safe(smap: Map, fov_bottom_left: SkyCoord, fov_top_right: SkyCoord) -> Map: frame = smap.coordinate_frame try: bl = fov_bottom_left.transform_to(frame) tr = fov_top_right.transform_to(frame) return smap.submap(bl, top_right=tr) except ValueError as exc: if not _is_offdisk_submap_nan_error(exc): raise observer = getattr(frame, "observer", None) with _spherical_screen_context_for_observer(observer): bl = fov_bottom_left.transform_to(frame) tr = fov_top_right.transform_to(frame) return smap.submap(bl, top_right=tr) def _fill_nonfinite_2d(arr: Any, *, label: str) -> np.ndarray: """Fill non-finite pixels in 2D maps with local 4-neighbor interpolation.""" a = np.asarray(arr, dtype=float) if a.ndim != 2: return a invalid = ~np.isfinite(a) if not np.any(invalid): return a n_invalid = int(np.count_nonzero(invalid)) if not np.any(np.isfinite(a)): print(f"Warning: {label} has no finite pixels; using zeros.") return np.zeros_like(a, dtype=float) filled = a.copy() finite_mean = float(np.nanmean(filled)) filled[invalid] = finite_mean unresolved = invalid.copy() max_iter = int(filled.shape[0] + filled.shape[1]) for _ in range(max_iter): if not np.any(unresolved): break up = np.full_like(filled, np.nan, dtype=float) down = np.full_like(filled, np.nan, dtype=float) left = np.full_like(filled, np.nan, dtype=float) right = np.full_like(filled, np.nan, dtype=float) up[1:, :] = filled[:-1, :] down[:-1, :] = filled[1:, :] left[:, 1:] = filled[:, :-1] right[:, :-1] = filled[:, 1:] neighbor_mean = np.nanmean(np.stack([up, down, left, right], axis=0), axis=0) updatable = unresolved & np.isfinite(neighbor_mean) if not np.any(updatable): break filled[updatable] = neighbor_mean[updatable] unresolved[updatable] = False if np.any(unresolved): filled[unresolved] = finite_mean print(f"Info: inpainted {n_invalid} non-finite pixels in {label}.") return filled @dataclass
[docs] class Fov2BoxConfig:
[docs] time: Optional[str]
[docs] coords: Optional[Tuple[float, float]]
[docs] hpc: bool
[docs] hgc: bool
[docs] hgs: bool
[docs] cea: bool
[docs] top: bool
[docs] box_dims: Optional[Tuple[int, int, int]]
[docs] dx_km: float
[docs] pad_frac: float
[docs] data_dir: str
[docs] gxmodel_dir: str
[docs] nlfff_lib: Optional[str]
[docs] download_backend: str
[docs] drms_sequential: bool
[docs] force_download: bool
[docs] entry_box: Optional[str]
[docs] save_empty_box: bool
[docs] save_potential: bool
[docs] save_bounds: bool
[docs] save_nas: bool
[docs] save_gen: bool
[docs] save_chr: bool
[docs] stop_after: Optional[str]
[docs] empty_box_only: bool
[docs] potential_only: bool
[docs] nlfff_only: bool
[docs] generic_only: bool
[docs] use_potential: bool
[docs] skip_lines: bool
[docs] center_vox: bool
[docs] reduce_passed: Optional[int]
[docs] euv: bool
[docs] uv: bool
[docs] sfq: bool
[docs] observer_name: str
[docs] fov_xc: Optional[float]
[docs] fov_yc: Optional[float]
[docs] fov_xsize: Optional[float]
[docs] fov_ysize: Optional[float]
[docs] square_fov: bool
[docs] jump2potential: bool
[docs] jump2bounds: bool
[docs] jump2nlfff: bool
[docs] jump2lines: bool
[docs] jump2chromo: bool
[docs] rebuild: bool
[docs] rebuild_from_none: bool
[docs] info: bool
[docs] reproject_algorithm: str
[docs] reproject_scan: Optional[str]
[docs] refmaps_path: Optional[Tuple[str, ...]] = None
@dataclass
[docs] class TransitionPlan:
[docs] target_stage: str
[docs] jump_chain: Optional[Tuple[str, ...]]
[docs] active_jump: Optional[str]
[docs] goto_lines: bool
[docs] goto_chromo: bool
@dataclass
[docs] class ResumePreparedState:
[docs] base_group: Dict[str, Any]
[docs] refmaps: Dict[str, Any]
[docs] base_bz_arr: np.ndarray
[docs] base_ic_arr: np.ndarray
[docs] bottom_bz_data: np.ndarray
[docs] projection_tag: str
[docs] base: str
[docs] dr3: np.ndarray
@dataclass
[docs] class ObservationPreparedState:
[docs] obs_time: Time
[docs] maps: Dict[str, Any]
[docs] base_group: Dict[str, Any]
[docs] refmaps: Dict[str, Any]
[docs] base_bz_arr: np.ndarray
[docs] base_ic_arr: np.ndarray
[docs] bottom_bz_data: np.ndarray
[docs] vert_current_error: Optional[str]
[docs] projection_tag: str
[docs] base: str
[docs] dr3: np.ndarray
[docs] observer_metadata: Dict[str, Any]
@dataclass
[docs] class PreparedRunState:
[docs] obs_time: Time
[docs] maps: Optional[Dict[str, Any]]
[docs] base_group: Dict[str, Any]
[docs] refmaps: Dict[str, Any]
[docs] base_bz_arr: np.ndarray
[docs] base_ic_arr: np.ndarray
[docs] bottom_bz_data: np.ndarray
[docs] vert_current_error: Optional[str]
[docs] projection_tag: str
[docs] base: str
[docs] dr3: np.ndarray
[docs] observer_metadata: Optional[Dict[str, Any]]
[docs] lineage_root: str
[docs] lineage_marker: str
[docs] entry_stage_for_marker: str
[docs] entry_corona: Optional[Dict[str, Any]] = None
[docs] entry_model: Optional[str] = None
[docs] entry_lines: Optional[Dict[str, Any]] = None
@dataclass
[docs] class PreparedTransitionInputs:
[docs] active_jump: Optional[str]
[docs] goto_lines: bool
[docs] goto_chromo: bool
[docs] entry_lines: Optional[dict]
[docs] pot_box: Optional[dict]
[docs] bnd_box: Optional[dict]
[docs] nlfff_box: Optional[dict]
@dataclass
[docs] class NasStageResult:
[docs] nlfff_box: dict
[docs] finalized: bool
@dataclass
[docs] class GenChrStageResult:
[docs] finalized: bool
@dataclass
[docs] class GenStageResult:
[docs] stage_prefix: str
[docs] lines: Optional[dict]
[docs] finalized: bool
@dataclass
[docs] class StageSaveContext:
[docs] cfg: Fov2BoxConfig
[docs] prepared_run: PreparedRunState
[docs] execute_cmd: str
[docs] out_dir: Path
[docs] default_grid: dict
[docs] empty_grid: np.ndarray
[docs] produced: List[Path]
def _infer_time_from_path(path: Path) -> Optional[str]: stem = path.name if "hmi.M_720s." in stem: try: tag = stem.split("hmi.M_720s.", 1)[1].split(".", 1)[0] if len(tag) == 15: return f"{tag[0:4]}-{tag[4:6]}-{tag[6:8]}T{tag[9:11]}:{tag[11:13]}:{tag[13:15]}" except Exception: return None return None def _extract_time_tokens(text: str) -> list[str]: if not text: return [] s = _decode_id_text(text) patterns = [ r"\d{4}-\d{2}-\d{2}[T ]\d{2}:\d{2}:\d{2}(?:\.\d+)?(?:Z)?", r"\d{4}\.\d{2}\.\d{2}_\d{2}:\d{2}:\d{2}(?:_TAI)?", r"\d{1,2}-[A-Za-z]{3}-\d{2,4}\s+\d{2}:\d{2}:\d{2}", ] out: list[str] = [] for pat in patterns: for m in re.finditer(pat, s): tok = m.group(0).strip() if tok and tok not in out: out.append(tok) return out def _parse_time_candidate(token: str) -> Optional[Time]: t = _decode_id_text(token).strip() if not t: return None # First try astropy's native parser. try: return Time(t) except Exception: pass # JSOC T_REC-like values, often e.g. 2025.11.26_15:36:00_TAI m = re.match(r"^(\d{4})\.(\d{2})\.(\d{2})_(\d{2}):(\d{2}):(\d{2})(?:_(TAI))?$", t, flags=re.IGNORECASE) if m: yyyy, mm, dd, hh, mi, ss, scale = m.groups() dtx = dt.datetime(int(yyyy), int(mm), int(dd), int(hh), int(mi), int(ss)) try: return Time(dtx, scale="tai" if scale else "utc").utc except Exception: return None # IDL-like values, e.g. 26-Nov-25 15:47:52 for fmt in ("%d-%b-%y %H:%M:%S", "%d-%b-%Y %H:%M:%S"): try: dtx = dt.datetime.strptime(t, fmt) return Time(dtx, scale="utc") except Exception: pass return None def _extract_execute_time(execute_text: str) -> Optional[str]: if not execute_text: return None for tok in _extract_time_tokens(execute_text): parsed = _parse_time_candidate(tok) if parsed is not None: return parsed.utc.isot return None def _infer_time_from_entry_loaded(entry_loaded: Dict[str, Any], entry_path: Path) -> Optional[str]: """ Infer model observation time from entry data. Priority: 1) explicit metadata/chromo attrs time fields 2) base/index metadata text (actual model index header source) 3) execute command text (requested time) 4) filename/path fallback """ meta = entry_loaded.get("metadata", {}) if isinstance(entry_loaded, dict) else {} if isinstance(meta, dict): for key in ("obs_time", "date_obs", "date-obs", "t_rec", "time"): if key in meta: parsed = _parse_time_candidate(str(meta.get(key))) if parsed is not None: return parsed.utc.isot # Group attrs can carry true observation time in converted models. for group_name in ("chromo", "corona", "base"): grp = entry_loaded.get(group_name) if isinstance(grp, dict): attrs = grp.get("attrs", {}) if isinstance(attrs, dict): for key in ("obs_time", "date_obs", "date-obs", "t_rec", "time"): if key in attrs: parsed = _parse_time_candidate(str(attrs.get(key))) if parsed is not None: return parsed.utc.isot # Prefer index/header text over execute (execute can be requested time). index_candidates = [] base = entry_loaded.get("base") if isinstance(base, dict) and "index" in base: index_candidates.append(base.get("index")) if isinstance(meta, dict) and "index" in meta: index_candidates.append(meta.get("index")) for raw in index_candidates: for tok in _extract_time_tokens(_decode_id_text(raw)): parsed = _parse_time_candidate(tok) if parsed is not None: return parsed.utc.isot execute_text = _decode_id_text(meta.get("execute", "")) if isinstance(meta, dict) else "" inferred_exec = _extract_execute_time(execute_text) if inferred_exec: return inferred_exec return _infer_time_from_path(entry_path) def _format_coord_tag(lon_deg: float, lat_deg: float, suffix: str = "CR") -> str: lon = (lon_deg + 180.0) % 360.0 - 180.0 lon_dir = "W" if lon >= 0 else "E" lat_dir = "N" if lat_deg >= 0 else "S" lon_val = f"{abs(int(round(lon))):02d}" lat_val = f"{abs(int(round(lat_deg))):02d}" return f"{lon_dir}{lon_val}{lat_dir}{lat_val}{suffix}" def _stage_output_dir(gxmodel_dir: str, obs_time: Time) -> Path: date_str = obs_time.to_datetime().strftime("%Y-%m-%d") out_dir = Path(gxmodel_dir) / date_str out_dir.mkdir(parents=True, exist_ok=True) return out_dir def _stage_file_base(obs_time: Time, coord_tag: str, projection_tag: str = "CEA") -> str: time_tag = obs_time.to_datetime().strftime("%Y%m%d_%H%M%S") return f"hmi.M_720s.{time_tag}.{coord_tag}.{projection_tag}" def _stage_filename(out_dir: Path, base: str, stage_tag: str) -> Path: return out_dir / f"{base}.{stage_tag}.h5" def _effective_box_dims_from_base( box_dims: Tuple[int, int, int], base_group: Dict[str, Any], ) -> Tuple[int, int, int]: bx = base_group.get("bx") if isinstance(base_group, dict) else None if bx is None: return box_dims arr = np.asarray(bx) if arr.ndim != 2: return box_dims ny, nx = arr.shape return int(nx), int(ny), int(box_dims[2]) def _last_stage_tag(stop_after: Optional[str]) -> str: if stop_after: stop = stop_after.lower() if stop in ("dl", "download", "downloads"): return "DL" if stop in ("none", "empty", "empty_box"): return "NONE" if stop in ("bnd", "bounds"): return "BND" if stop == "pot": return "POT" if stop == "nas": return "NAS" if stop == "gen": return "GEN" if stop == "chr": return "CHR" return "CHR" def _canon_stage_name(tag: Optional[str]) -> str: if not tag: return "CHR" t = tag.lower() if t in ("dl", "download", "downloads"): return "DL" if t in ("none", "empty", "empty_box"): return "NONE" if t in ("pot", "potential"): return "POT" if t in ("bnd", "bounds"): return "BND" if t in ("nas", "nlfff"): return "NAS" if t in ("gen", "nas.gen", "lines"): return "GEN" if t in ("chr", "nas.chr", "chromo"): return "CHR" return t.upper() def _entry_stage_from_loaded(loaded: Dict[str, Any], entry_path: Path) -> str: # Prefer explicit metadata id/model_type when present. model_id = _decode_id_text(loaded.get("metadata", {}).get("id", "")).upper() if ".POT.GEN.CHR" in model_id: return "CHR" if ".NAS.GEN.CHR" in model_id: return "CHR" if ".POT.CHR" in model_id: return "CHR" if ".POT.GEN" in model_id: return "GEN" if ".NAS.CHR" in model_id: return "CHR" if ".NAS.GEN" in model_id: return "GEN" if ".NAS." in model_id: return "NAS" if ".POT" in model_id: return "POT" if ".BND" in model_id: return "BND" if ".NONE" in model_id: return "NONE" if "chromo" in loaded: return "CHR" if "lines" in loaded: return "GEN" if "corona" in loaded: model_type = str(loaded["corona"].get("attrs", {}).get("model_type", "")).lower() if model_type in ("none",): return "NONE" if model_type in ("pot",): return "POT" if model_type in ("bnd", "bounds"): return "BND" if model_type in ("nlfff",): return "NAS" return "NAS" # Fallback from filename. up = entry_path.name.upper() if ".POT.GEN.CHR" in up: return "CHR" if ".NAS.GEN.CHR" in up: return "CHR" if ".POT.CHR" in up: return "CHR" if ".POT.GEN" in up: return "GEN" if ".NAS.CHR" in up: return "CHR" if ".NAS.GEN" in up: return "GEN" if ".NAS." in up: return "NAS" if ".POT" in up: return "POT" if ".BND" in up: return "BND" if ".NONE" in up: return "NONE" return "NONE" def _detect_target_stage(cfg: Fov2BoxConfig, entry_stage: Optional[str]) -> str: if cfg.rebuild or cfg.rebuild_from_none: return "NONE" if cfg.jump2potential: return "POT" if cfg.jump2bounds: return "BND" if cfg.jump2nlfff: return "NAS" if cfg.jump2lines: return "GEN" if cfg.jump2chromo: return "CHR" if entry_stage: return entry_stage return "NONE" _ALLOWED_JUMP_CHAINS: Dict[Tuple[str, str], Tuple[str, ...]] = { ("NONE", "NONE"): ("NONE",), ("NONE", "POT"): ("NONE", "POT"), ("NONE", "BND"): ("NONE", "POT", "BND"), ("POT", "NONE"): ("POT", "NONE"), ("POT", "POT"): ("POT",), ("POT", "BND"): ("POT", "BND"), ("POT", "NAS"): ("POT", "BND", "NAS"), ("POT", "GEN"): ("POT", "GEN"), ("POT", "CHR"): ("POT", "CHR"), ("BND", "NONE"): ("BND", "NONE"), ("BND", "POT"): ("BND", "POT"), ("BND", "BND"): ("BND",), ("BND", "NAS"): ("BND", "NAS"), ("NAS", "NONE"): ("NAS", "NONE"), ("NAS", "POT"): ("NAS", "POT"), ("NAS", "BND"): ("NAS", "BND"), ("NAS", "NAS"): ("NAS",), ("NAS", "GEN"): ("NAS", "GEN"), ("NAS", "CHR"): ("NAS", "CHR"), ("GEN", "NONE"): ("GEN", "NONE"), ("GEN", "POT"): ("GEN", "POT"), ("GEN", "BND"): ("GEN", "BND"), ("GEN", "NAS"): ("GEN", "NAS"), ("GEN", "GEN"): ("GEN",), ("GEN", "CHR"): ("GEN", "CHR"), ("CHR", "NONE"): ("CHR", "NONE"), ("CHR", "POT"): ("CHR", "POT"), ("CHR", "BND"): ("CHR", "BND"), ("CHR", "NAS"): ("CHR", "NAS"), ("CHR", "GEN"): ("CHR", "GEN"), ("CHR", "CHR"): ("CHR",), } def _jump_chain(entry_stage: str, target_stage: str) -> Optional[Tuple[str, ...]]: return _ALLOWED_JUMP_CHAINS.get((entry_stage, target_stage)) def _jump_allowed(entry_stage: str, target_stage: str) -> bool: return _jump_chain(entry_stage, target_stage) is not None def _active_jump_for_target(target_stage: str, use_entry_box: bool) -> Optional[str]: if not use_entry_box: return None return { "POT": "potential", "BND": "bounds", "NAS": "nlfff", "GEN": "lines", "CHR": "chromo", }.get(target_stage) def _plan_transition(cfg: Fov2BoxConfig, entry_stage: Optional[str]) -> TransitionPlan: target_stage = _detect_target_stage(cfg, entry_stage) jump_chain = None use_entry_resume = bool(cfg.entry_box and not cfg.rebuild and not cfg.rebuild_from_none) if use_entry_resume: assert entry_stage is not None jump_chain = _jump_chain(entry_stage, target_stage) if jump_chain is None: raise ValueError( f"Incompatible jump request: entry stage {entry_stage} cannot jump to {target_stage}. " "Unsupported transitions are rejected explicitly." ) active_jump = _active_jump_for_target(target_stage, use_entry_resume) goto_lines = active_jump in ("lines", "chromo") goto_chromo = active_jump == "chromo" or cfg.skip_lines return TransitionPlan( target_stage=target_stage, jump_chain=jump_chain, active_jump=active_jump, goto_lines=goto_lines, goto_chromo=goto_chromo, ) def _prepare_resume_state(entry_loaded: Dict[str, Any], cfg: Fov2BoxConfig, obs_time: Time) -> ResumePreparedState: base_group = dict(entry_loaded.get("base", {})) if not all(k in base_group for k in ("bx", "by", "bz")): raise ValueError("Entry-box resume requires base/bx, base/by, and base/bz.") if "ic" not in base_group: base_group["ic"] = np.asarray(base_group["bz"], dtype=float).copy() if "chromo_mask" not in base_group: base_group["chromo_mask"] = decompose( np.asarray(base_group["bz"], dtype=float).T, np.asarray(base_group["ic"], dtype=float).T, ).T refmaps = dict(entry_loaded.get("refmaps", {})) base_bz_arr = np.asarray(base_group["bz"], dtype=float) base_ic_arr = np.asarray(base_group["ic"], dtype=float) projection_tag = _decode_id_text(entry_loaded.get("metadata", {}).get("projection", "CEA")).upper() axis_order = entry_loaded.get("metadata", {}).get("axis_order_3d") entry_corona_for_dr = _h5_corona_to_internal_xyz(entry_loaded.get("corona"), axis_order) if isinstance(entry_corona_for_dr, dict) and "dr" in entry_corona_for_dr: d = np.asarray(entry_corona_for_dr["dr"], dtype=float).reshape(-1) dr3 = np.array([d[0], d[min(1, d.size - 1)], d[min(2, d.size - 1)]], dtype=float) else: dr = float((cfg.dx_km * u.km / (IDL_HMI_RSUN_M * u.m).to(u.km)).value) dr3 = np.array([dr, dr, dr], dtype=float) src_id = _decode_id_text(entry_loaded.get("metadata", {}).get("id", "")).strip() base = "" if src_id: base, _ = _split_stage_id(src_id) if not base: base = Path(cfg.entry_box).stem if cfg.entry_box else _stage_file_base(obs_time, "UNKNOWN", projection_tag=projection_tag) return ResumePreparedState( base_group=base_group, refmaps=refmaps, base_bz_arr=base_bz_arr, base_ic_arr=base_ic_arr, bottom_bz_data=base_bz_arr, projection_tag=projection_tag, base=base, dr3=dr3, ) def _prepare_observation_state( cfg: Fov2BoxConfig, requested_obs_time: Time, box_dims_resolved: Tuple[int, int, int], run_logged_step: Callable[[str, Callable[[], Any]], Any], t_start: float, ) -> Optional[ObservationPreparedState]: import time as time_mod obs_time = requested_obs_time data_dir_path = Path(cfg.data_dir).expanduser().resolve() disambig_method = 0 if cfg.sfq else 2 print("Checking/downloading HMI/AIA data...") dl_t0 = time_mod.perf_counter() maps, download_info = _load_hmi_maps_from_downloader( obs_time, data_dir_path, cfg.euv, cfg.uv, download_backend=cfg.download_backend, drms_sequential=cfg.drms_sequential, force_download=cfg.force_download, disambig_method=disambig_method, strict_required=(_last_stage_tag(cfg.stop_after) != "DL"), ) dl_elapsed = time_mod.perf_counter() - dl_t0 print(f"Done in {dl_elapsed:.2f}s") resolved_obs_time = download_info.get("resolved_obs_time") if resolved_obs_time: resolved_obs_time = Time(resolved_obs_time) if abs((resolved_obs_time - requested_obs_time).to_value("sec")) > 1.0: print( "Resolved source bundle time: " f"{resolved_obs_time.isot} " f"(requested {requested_obs_time.isot})" ) obs_time = resolved_obs_time if _last_stage_tag(cfg.stop_after) == "DL": print("Stopped after download stage by request.") total = time_mod.perf_counter() - t_start print(f"Total elapsed: {total:.2f}s") return None def _prepare_geometry(): rsun_local = (IDL_HMI_RSUN_M * u.m).to(u.km) observer_local = _resolve_cli_observer(maps.get("field"), cfg.observer_name, obs_time) if cfg.hpc: box_origin_local = SkyCoord(cfg.coords[0] * u.arcsec, cfg.coords[1] * u.arcsec, obstime=obs_time, observer=observer_local, rsun=rsun_local, frame=Helioprojective) elif cfg.hgc: box_origin_local = SkyCoord(lon=cfg.coords[0] * u.deg, lat=cfg.coords[1] * u.deg, radius=rsun_local, obstime=obs_time, observer=observer_local, frame=HeliographicCarrington) else: box_origin_local = SkyCoord(lon=cfg.coords[0] * u.deg, lat=cfg.coords[1] * u.deg, radius=rsun_local, obstime=obs_time, observer=observer_local, frame=HeliographicStonyhurst) box_dims_q_local = u.Quantity(list(box_dims_resolved)) * u.pix box_res_local = (cfg.dx_km * u.km).to(u.Mm) frame_obs_local = Helioprojective(observer=observer_local, obstime=obs_time, rsun=rsun_local) frame_hcc_local = Heliocentric(observer=box_origin_local, obstime=obs_time) box_center_local = box_origin_local.transform_to(frame_hcc_local) center_z_local = box_center_local.z + (box_dims_q_local[2] / u.pix * box_res_local) / 2 box_center_local = SkyCoord(x=box_center_local.x, y=box_center_local.y, z=center_z_local, frame=frame_hcc_local) box_local = Box(frame_obs_local, box_origin_local, box_center_local, box_dims_q_local, box_res_local) if cfg.top: bottom_wcs_header_local = box_local.bottom_top_header(dsun_obs=maps["field"].dsun) projection_tag_local = "TOP" else: bottom_wcs_header_local = box_local.bottom_cea_header projection_tag_local = "CEA" fov_coords_local = box_local.bounds_coords_bl_tr(pad_frac=cfg.pad_frac) return ( rsun_local, observer_local, box_origin_local, bottom_wcs_header_local, projection_tag_local, fov_coords_local, ) ( rsun, observer, box_origin, bottom_wcs_header, projection_tag, fov_coords, ) = run_logged_step("Preparing observer and box geometry", _prepare_geometry) map_bp, map_bt, map_br = run_logged_step( "Converting HMI vector field components", lambda: hmi_b2ptr(maps["field"], maps["inclination"], maps["azimuth"]), ) def submap_with_fov(_map: Map) -> Map: return _submap_with_fov_safe(_map, fov_coords[0], fov_coords[1]) def _extract_cutouts(): return ( submap_with_fov(map_bp), submap_with_fov(map_bt), submap_with_fov(map_br), submap_with_fov(maps["continuum"]), submap_with_fov(maps["magnetogram"]), ) map_bp, map_bt, map_br, map_cont, map_los = run_logged_step( "Extracting FOV cutouts from source maps", _extract_cutouts, ) map_bx = map_bp map_by = map_from_data_header_compat(-map_bt.data, map_bt.meta) map_bz = map_br def _reproject_cutouts(): _algo = cfg.reproject_algorithm bottom_bx_local = map_bx.reproject_to(bottom_wcs_header, algorithm=_algo) bottom_by_local = map_by.reproject_to(bottom_wcs_header, algorithm=_algo) bottom_bz_local = map_bz.reproject_to(bottom_wcs_header, algorithm=_algo) base_bz_local = map_los.reproject_to(bottom_wcs_header, algorithm=_algo) base_ic_local = map_cont.reproject_to(bottom_wcs_header, algorithm=_algo) return ( bottom_bx_local, bottom_by_local, bottom_bz_local, base_bz_local, base_ic_local, np.asarray(base_bz_local.data, dtype=float), np.asarray(base_ic_local.data, dtype=float), np.asarray(bottom_bz_local.data, dtype=float), ) ( bottom_bx, bottom_by, bottom_bz, base_bz, base_ic, base_bz_arr, base_ic_arr, bottom_bz_data, ) = run_logged_step("Reprojecting cutouts onto the model base grid", _reproject_cutouts) def _build_base_products(): index_local = _build_index_header( bottom_wcs_header, bottom_bz, observer_override=observer, obs_time_override=obs_time, rsun_override=rsun, ) bx_local = _fill_nonfinite_2d(bottom_bx.data, label="bottom_bx (OBS reprojection)") by_local = _fill_nonfinite_2d(bottom_by.data, label="bottom_by (OBS reprojection)") bz_local = _fill_nonfinite_2d(bottom_bz.data, label="bottom_bz (OBS reprojection)") ic_local = _fill_nonfinite_2d(base_ic.data, label="base_ic (OBS reprojection)") chromo_mask_local = decompose(base_bz_arr.T, base_ic_arr.T).T return { "bx": bx_local, "by": by_local, "bz": bz_local, "ic": ic_local, "chromo_mask": chromo_mask_local, "index": index_local, } base_group = run_logged_step("Building base-layer products", _build_base_products) refmaps: Dict[str, Any] = {} refmap_model_time = model_obstime_from_base_index({"base": base_group}) or obs_time.isot def add_refmap(ref_id: str, smap: Map) -> None: refmaps[ref_id] = build_refmap_payload_for_model( smap, model_obstime=refmap_model_time, target_fov=(fov_coords[0], fov_coords[1]), reproject_algorithm=cfg.reproject_algorithm, ) def _collect_refmaps(): hmi_t0 = time_mod.perf_counter() add_refmap("Bz_reference", maps["magnetogram"]) add_refmap("Ic_reference", maps["continuum"]) print(f"HMI references: 2/2 in {time_mod.perf_counter() - hmi_t0:.2f}s") local_vert_current_error = None def _compute_vert_current(): vc_bx_local, vc_by_local, vc_bz_local = remap_vertical_current_inputs( map_bx, map_by, map_bz ) vc_header_local = vc_bx_local.wcs.to_header().tostring(sep="\n", endcard=True) rsun_arcsec_local = vc_bx_local.rsun_obs.to_value(u.arcsec) crpix1_local, crpix2_local = vc_bx_local.wcs.wcs.crpix cdelt1_local = vc_bx_local.scale.axis1.to_value(u.arcsec / u.pix) cdelt2_local = vc_bx_local.scale.axis2.to_value(u.arcsec / u.pix) jz_local = compute_vertical_current( vc_bz_local.data, vc_bx_local.data, vc_by_local.data, vc_header_local, rsun_arcsec_local, crpix1=crpix1_local, crpix2=crpix2_local, cdelt1_arcsec=cdelt1_local, cdelt2_arcsec=cdelt2_local, ) jz_map = map_from_data_header_compat(jz_local, vc_bx_local.wcs.to_header()) refmaps["Vert_current"] = build_refmap_payload_for_model( jz_map, model_obstime=refmap_model_time, target_fov=(fov_coords[0], fov_coords[1]), reproject_algorithm=cfg.reproject_algorithm, ) try: run_logged_step("Vertical current", _compute_vert_current) except Exception as exc: local_vert_current_error = str(exc) print(f"Vertical current: skipped ({local_vert_current_error})") aia_passbands = [pb for pb in (AIA_EUV_PASSBANDS + AIA_UV_PASSBANDS) if f"AIA_{pb}" in maps] total_aia = len(aia_passbands) aia_t0 = time_mod.perf_counter() for idx, pb in enumerate(aia_passbands, start=1): key = f"AIA_{pb}" add_refmap(key, maps[key]) elapsed = time_mod.perf_counter() - aia_t0 print(f"AIA references: {idx}/{total_aia} ({pb}) in {elapsed:.2f}s") cache_dir = data_dir_path / obs_time.datetime.strftime("%Y-%m-%d") cache_t0 = time_mod.perf_counter() cache_map_ids = { path: map_id for path, map_id in discover_fits_refmap_map_ids([cache_dir], generic=False).items() if map_id not in refmaps } if cache_map_ids: missing_cache_refmaps = build_fits_refmaps_for_model( list(cache_map_ids.keys()), model_obstime=refmap_model_time, target_fov=(fov_coords[0], fov_coords[1]), reproject_algorithm=cfg.reproject_algorithm, map_ids=cache_map_ids, generic=False, ) for key, payload in missing_cache_refmaps.items(): refmaps[key] = payload print( "Cached context references: " f"{len(missing_cache_refmaps)} from {cache_dir} " f"in {time_mod.perf_counter() - cache_t0:.2f}s" ) if cfg.refmaps_path: external_t0 = time_mod.perf_counter() external_refmaps = build_fits_refmaps_for_model( cfg.refmaps_path, model_obstime=refmap_model_time, target_fov=(fov_coords[0], fov_coords[1]), reproject_algorithm=cfg.reproject_algorithm, generic=True, ) for key, payload in external_refmaps.items(): refmaps[key] = payload print( "External references: " f"{len(external_refmaps)} from {len(cfg.refmaps_path)} path(s) " f"in {time_mod.perf_counter() - external_t0:.2f}s" ) return local_vert_current_error vert_current_error = run_logged_step("Collecting reference maps and derived products", _collect_refmaps) obs_dr = (cfg.dx_km * u.km) / rsun dr3 = np.array([obs_dr.value, obs_dr.value, obs_dr.value]) coord_tag = _format_coord_tag( box_origin.transform_to(HeliographicCarrington(obstime=obs_time)).lon.to_value(u.deg), box_origin.transform_to(HeliographicCarrington(obstime=obs_time)).lat.to_value(u.deg), ) base = _stage_file_base(obs_time, coord_tag, projection_tag=projection_tag) observer_metadata = _observer_metadata_from_source_map(maps["field"], cfg) return ObservationPreparedState( obs_time=obs_time, maps=maps, base_group=base_group, refmaps=refmaps, base_bz_arr=base_bz_arr, base_ic_arr=base_ic_arr, bottom_bz_data=bottom_bz_data, vert_current_error=vert_current_error, projection_tag=projection_tag, base=base, dr3=dr3, observer_metadata=observer_metadata, ) def _prepare_run_state( cfg: Fov2BoxConfig, resume_mode: bool, entry_loaded: Optional[Dict[str, Any]], entry_stage: Optional[str], target_stage: str, requested_obs_time: Time, box_dims_resolved: Tuple[int, int, int], observer_metadata: Optional[Dict[str, Any]], run_logged_step: Callable[[str, Callable[[], Any]], Any], t_start: float, ) -> Optional[PreparedRunState]: if resume_mode: assert entry_loaded is not None prepared_resume = _prepare_resume_state(entry_loaded, cfg, requested_obs_time) expected_shape_3d = _effective_box_dims_from_base(box_dims_resolved, prepared_resume.base_group) entry_meta = dict(entry_loaded.get("metadata", {})) entry_corona = _prepare_stage_corona_payload( entry_loaded.get("corona"), source="disk", axis_order_3d=entry_meta.get("axis_order_3d"), expected_shape_3d=expected_shape_3d, ) entry_model = None if isinstance(entry_corona, dict): entry_model = _decode_id_text(entry_corona.get("attrs", {}).get("model_type", "")).strip().lower() or None entry_lines = entry_loaded.get("lines") if entry_lines is None: # Backward compatibility with legacy GEN files that stored line keys under chromo. entry_lines = entry_loaded.get("chromo") lineage_root = _decode_id_text(entry_meta.get("lineage", "")).strip() lineage_marker = "" entry_stage_for_marker = "" if not lineage_root: entry_id = _decode_id_text(entry_meta.get("id", "")).strip() _, entry_stage_tag = _split_stage_id(entry_id) if entry_id else ("", "") entry_stage_for_marker = entry_stage_tag or _stage_tag_from_stage(entry_stage or target_stage) lineage_marker = f"ENTRY.{entry_stage_for_marker}" lineage_root = lineage_marker return PreparedRunState( obs_time=requested_obs_time, maps=None, base_group=prepared_resume.base_group, refmaps=prepared_resume.refmaps, base_bz_arr=prepared_resume.base_bz_arr, base_ic_arr=prepared_resume.base_ic_arr, bottom_bz_data=prepared_resume.bottom_bz_data, vert_current_error=None, projection_tag=prepared_resume.projection_tag, base=prepared_resume.base, dr3=prepared_resume.dr3, observer_metadata=observer_metadata, lineage_root=lineage_root, lineage_marker=lineage_marker, entry_stage_for_marker=entry_stage_for_marker, entry_corona=entry_corona, entry_model=entry_model, entry_lines=entry_lines, ) prepared_obs = _prepare_observation_state( cfg, requested_obs_time, box_dims_resolved, run_logged_step, t_start, ) if prepared_obs is None: return None return PreparedRunState( obs_time=prepared_obs.obs_time, maps=prepared_obs.maps, base_group=prepared_obs.base_group, refmaps=prepared_obs.refmaps, base_bz_arr=prepared_obs.base_bz_arr, base_ic_arr=prepared_obs.base_ic_arr, bottom_bz_data=prepared_obs.bottom_bz_data, vert_current_error=prepared_obs.vert_current_error, projection_tag=prepared_obs.projection_tag, base=prepared_obs.base, dr3=prepared_obs.dr3, observer_metadata=prepared_obs.observer_metadata, lineage_root="OBS", lineage_marker="", entry_stage_for_marker="", entry_corona=None, entry_model=None, entry_lines=None, ) _CARRY_FORWARD_LINE_KEYS: Tuple[str, ...] = ( "start_idx", "end_idx", "av_field", "phys_length", "voxel_status", ) _REQUIRED_LINE_KEYS: Tuple[str, ...] = ( "codes", "apex_idx", "start_idx", "end_idx", "seed_idx", "av_field", "phys_length", "voxel_status", ) def _prepare_stage_corona_payload( corona: Optional[Dict[str, Any]], *, source: str, axis_order_3d: Optional[str] = None, expected_shape_3d: Optional[Tuple[int, int, int]] = None, allowed_model_types: Optional[Tuple[str, ...]] = None, ) -> Optional[Dict[str, Any]]: if corona is None: return None if not isinstance(corona, dict): raise ValueError(f"Canonical corona payload must be a dict for {source} stage input.") prepared = _h5_corona_to_internal_xyz(corona, axis_order_3d) if source == "disk" else corona component_shapes = [] for key in ("bx", "by", "bz"): if key not in prepared: raise ValueError(f"Canonical corona payload is missing corona/{key} for {source} stage input.") arr = np.asarray(prepared[key]) if arr.ndim != 3: raise ValueError(f"Canonical corona payload must provide 3D corona/{key} for {source} stage input.") component_shapes.append(tuple(int(v) for v in arr.shape)) if len(set(component_shapes)) != 1: raise ValueError( f"Canonical corona payload must use one shared 3D shape for bx/by/bz; got {component_shapes!r}." ) if expected_shape_3d is not None and component_shapes[0] != tuple(int(v) for v in expected_shape_3d): raise ValueError( "Canonical corona payload shape does not match the prepared model volume. " f"expected={tuple(int(v) for v in expected_shape_3d)!r}, got={component_shapes[0]!r}" ) dr = prepared.get("dr") if dr is None: raise ValueError(f"Canonical corona payload is missing corona/dr for {source} stage input.") dr_arr = np.asarray(dr, dtype=float).reshape(-1) if dr_arr.size == 0: raise ValueError(f"Canonical corona payload must provide non-empty corona/dr for {source} stage input.") attrs = prepared.get("attrs") model_type = "" if isinstance(attrs, dict): model_type = _decode_id_text(attrs.get("model_type", "")).strip().lower() if allowed_model_types is not None and model_type not in allowed_model_types: raise ValueError( f"Canonical corona payload model_type must be one of {allowed_model_types!r}; got {model_type!r}." ) return prepared def _prepare_stage_lines_payload( lines: Optional[Dict[str, Any]], *, source: str, require_complete: bool, ) -> Optional[Dict[str, Any]]: if lines is None: if require_complete: raise ValueError(f"Canonical line payload is required for {source} stage input.") return None if not isinstance(lines, dict): raise ValueError(f"Canonical line payload must be a dict for {source} stage input.") required_keys = _REQUIRED_LINE_KEYS if require_complete else _CARRY_FORWARD_LINE_KEYS missing = [key for key in required_keys if key not in lines] if missing: if require_complete: raise ValueError( f"Canonical line payload is missing required keys for {source} stage input: {', '.join(missing)}" ) return None return lines def _compute_none_stage_box( prepared_run: PreparedRunState, box_dims_resolved: Tuple[int, int, int], ) -> dict: # Internal in-memory cube contract is (x, y, z). HDF5 canonical (z, y, x) # conversion is applied only in the save path. nx, ny, nz = (int(box_dims_resolved[0]), int(box_dims_resolved[1]), int(box_dims_resolved[2])) bx_base = _fill_nonfinite_2d(prepared_run.base_group["bx"], label="base/bx (NONE boundary)") by_base = _fill_nonfinite_2d(prepared_run.base_group["by"], label="base/by (NONE boundary)") bz_base = _fill_nonfinite_2d(prepared_run.base_group["bz"], label="base/bz (NONE boundary)") expected_base_shape = (ny, nx) for label, arr in (("bx", bx_base), ("by", by_base), ("bz", bz_base)): if arr.shape != expected_base_shape: raise ValueError( f"NONE stage expects base/{label} shape (ny,nx)={expected_base_shape!r}; got {arr.shape!r}." ) corona_bx = np.zeros((nx, ny, nz), dtype=float) corona_by = np.zeros((nx, ny, nz), dtype=float) corona_bz = np.zeros((nx, ny, nz), dtype=float) corona_bx[:, :, 0] = bx_base.T corona_by[:, :, 0] = by_base.T corona_bz[:, :, 0] = bz_base.T return { "corona": _prepare_stage_corona_payload( { "bx": corona_bx, "by": corona_by, "bz": corona_bz, "dr": prepared_run.dr3, "attrs": {"model_type": "none"}, }, source="memory", expected_shape_3d=box_dims_resolved, allowed_model_types=("none",), ) } def _normalize_runtime_stage_box_for_pipeline( stage_box: dict, *, prepared_run: PreparedRunState, stage_tag: str, source_axis_order_3d: str = "xyz", ) -> dict: working_box = dict(stage_box) base_group = dict(prepared_run.base_group) if "ic" not in base_group: base_group["ic"] = np.asarray(prepared_run.base_ic_arr, dtype=float).copy() working_box.setdefault("base", base_group) working_box.setdefault("refmaps", prepared_run.refmaps) if prepared_run.observer_metadata is not None and "observer" not in working_box: working_box["observer"] = dict(prepared_run.observer_metadata) metadata = dict(working_box.get("metadata", {})) if isinstance(working_box.get("metadata"), dict) else {} metadata.setdefault("id", f"{prepared_run.base}.{stage_tag}") metadata.setdefault("projection", _decode_id_text(prepared_run.projection_tag).upper()) metadata.setdefault("axis_order_2d", "yx") metadata.setdefault("axis_order_3d", "zyx") metadata.setdefault("vector_layout", "split_components") working_box["metadata"] = metadata normalized_h5_box = _normalize_stage_for_h5( working_box, source_axis_order_3d=source_axis_order_3d, ) normalized_model = _normalize_loaded_model_dict( normalized_h5_box, source_path=Path(f"{metadata['id']}.h5"), source_kind="h5", strict=False, stored_contract=None, ) normalized_metadata = normalized_model.get("metadata") if isinstance(normalized_metadata, dict) and "geometry_contract" in normalized_metadata: metadata["geometry_contract"] = normalized_metadata["geometry_contract"] working_box["metadata"] = metadata return working_box def _compute_pot_stage_box( prepared_run: PreparedRunState, box_dims_resolved: Tuple[int, int, int], ) -> dict: bnddata = np.asarray(prepared_run.bottom_bz_data, dtype=float).copy() bnddata[np.isnan(bnddata)] = 0.0 maglib_lff = MagFieldLinFFF() maglib_lff.set_field(bnddata) pot_res = maglib_lff.LFFF_cube(nz=box_dims_resolved[2], alpha=0.0) return { "corona": _prepare_stage_corona_payload( { "bx": pot_res["by"].swapaxes(0, 1), "by": pot_res["bx"].swapaxes(0, 1), "bz": pot_res["bz"].swapaxes(0, 1), "dr": prepared_run.dr3, "attrs": {"model_type": "pot"}, }, source="memory", expected_shape_3d=box_dims_resolved, allowed_model_types=("pot",), ) } def _compute_bnd_stage_box( prepared_run: PreparedRunState, pot_box: dict, box_dims_resolved: Tuple[int, int, int], ) -> dict: corona = pot_box.get("corona", pot_box) bnd = { "bx": np.array(corona["bx"], copy=True), "by": np.array(corona["by"], copy=True), "bz": np.array(corona["bz"], copy=True), "dr": prepared_run.dr3, "attrs": {"model_type": "bnd"}, } bnd["bx"][:, :, 0] = np.asarray(prepared_run.base_group["bx"], dtype=float).T bnd["by"][:, :, 0] = np.asarray(prepared_run.base_group["by"], dtype=float).T bnd["bz"][:, :, 0] = np.asarray(prepared_run.base_group["bz"], dtype=float).T return { "corona": _prepare_stage_corona_payload( bnd, source="memory", expected_shape_3d=box_dims_resolved, allowed_model_types=("bnd",), ) } def _prepare_transition_stage_inputs( prepared_run: PreparedRunState, transition_plan: TransitionPlan, *, entry_stage: Optional[str], box_dims_resolved: Tuple[int, int, int], ) -> PreparedTransitionInputs: active_jump = transition_plan.active_jump entry_corona = prepared_run.entry_corona if active_jump else None entry_lines = prepared_run.entry_lines if active_jump else None goto_lines = transition_plan.goto_lines goto_chromo = transition_plan.goto_chromo if goto_lines: if not entry_corona: raise ValueError("--jump2lines/--jump2chromo requires --entry-box with corona model_type=pot|nlfff") return PreparedTransitionInputs( active_jump=active_jump, goto_lines=goto_lines, goto_chromo=goto_chromo, entry_lines=entry_lines, pot_box=None, bnd_box=None, nlfff_box=entry_corona, ) pot_box = None bnd_box = None nlfff_box = None if active_jump in ("potential", "bounds", "nlfff"): pot_box, bnd_box, nlfff_box = _prepare_resume_jump_boxes( active_jump, entry_stage, entry_corona, ) if pot_box is not None: pot_box = _prepare_stage_corona_payload( pot_box, source="memory", expected_shape_3d=box_dims_resolved, allowed_model_types=("pot",), ) if bnd_box is not None: bnd_box = _prepare_stage_corona_payload( bnd_box, source="memory", expected_shape_3d=box_dims_resolved, allowed_model_types=("bnd",), ) if nlfff_box is not None: nlfff_box = _prepare_stage_corona_payload( nlfff_box, source="memory", expected_shape_3d=box_dims_resolved, allowed_model_types=("nlfff",), ) if active_jump == "nlfff" and entry_stage == "POT" and pot_box is not None: bnd_box = _compute_bnd_stage_box(prepared_run, pot_box, box_dims_resolved)["corona"] return PreparedTransitionInputs( active_jump=active_jump, goto_lines=goto_lines, goto_chromo=goto_chromo, entry_lines=entry_lines, pot_box=pot_box, bnd_box=bnd_box, nlfff_box=nlfff_box, ) def _resolve_stage_prefix( nlfff_box: dict, *, resume_mode: bool, entry_stage: Optional[str], target_stage: str, ) -> str: is_pot_skip_path = ( resume_mode and entry_stage == "POT" and target_stage in ("GEN", "CHR") ) if str(nlfff_box.get("attrs", {}).get("model_type", "")).lower() == "pot" or is_pot_skip_path: return "POT" return "NAS" def _run_gen_stage( cfg: Fov2BoxConfig, prepared_run: PreparedRunState, nlfff_box: dict, *, resume_mode: bool, entry_stage: Optional[str], target_stage: str, goto_chromo: bool, entry_lines: Optional[dict], save_stage: Callable[..., None], finalize: Callable[[], None], stage_times: Dict[str, float], stage_progress_cls: type, ) -> GenStageResult: stage_prefix = _resolve_stage_prefix( nlfff_box, resume_mode=resume_mode, entry_stage=entry_stage, target_stage=target_stage, ) gen_stage_tag = f"{stage_prefix}.GEN" if goto_chromo: lines = _prepare_stage_lines_payload( entry_lines, source="disk", require_complete=False, ) if cfg.skip_lines: print("Skipping GEN line computation by request...") return GenStageResult(stage_prefix=stage_prefix, lines=lines, finalized=False) with stage_progress_cls(f"Computing {gen_stage_tag} model") as progress: gen_load_t0 = time_mod.perf_counter() maglib = MagFieldProcessor(cfg.nlfff_lib or "") _load_maglib_idl_cube(maglib, nlfff_box, prepared_run.dr3) gen_load_elapsed = time_mod.perf_counter() - gen_load_t0 resolved_reduce_passed = cfg.reduce_passed if cfg.reduce_passed is not None else (0 if cfg.center_vox else 1) resolved_chromo_level = (1000.0 / float(cfg.dx_km)) if cfg.dx_km else 1.0 trace_kwargs = { "seeds": None, "chromo_level": resolved_chromo_level, "reduce_passed": resolved_reduce_passed, } gen_trace_t0 = time_mod.perf_counter() lines = _lines_fast(maglib, **trace_kwargs) gen_trace_elapsed = time_mod.perf_counter() - gen_trace_t0 print( f"\n{gen_stage_tag} tracer load: {gen_load_elapsed:.2f}s | " f"line trace: {gen_trace_elapsed:.2f}s | " f"reduce_passed={resolved_reduce_passed!r} | " f"chromo_level={resolved_chromo_level:.12g}" ) lines = _prepare_stage_lines_payload( lines, source="memory", require_complete=True, ) compute_lines_time = progress.finish() gen_save_t0 = time_mod.perf_counter() lines_group = _make_lines_group(lines, prepared_run.dr3) save_stage(gen_stage_tag, {"corona": nlfff_box, "lines": lines_group}) print(f"{gen_stage_tag} stage save: {time_mod.perf_counter() - gen_save_t0:.2f}s") stage_times[gen_stage_tag] = compute_lines_time if cfg.generic_only or _last_stage_tag(cfg.stop_after) == "GEN": finalize() return GenStageResult(stage_prefix=stage_prefix, lines=lines, finalized=True) return GenStageResult(stage_prefix=stage_prefix, lines=lines, finalized=False) def _run_chr_stage( prepared_run: PreparedRunState, nlfff_box: dict, *, stage_prefix: str, lines: Optional[dict], save_stage: Callable[..., None], stage_times: Dict[str, float], stage_progress_cls: type, ) -> None: chr_stage_tag = f"{stage_prefix}.CHR" if lines is not None: chr_stage_tag = f"{stage_prefix}.GEN.CHR" header = _make_header(prepared_run.maps["field"]) if prepared_run.maps is not None else {} chromo_mask = None if "chromo_mask" in prepared_run.base_group: chromo_mask = np.asarray(prepared_run.base_group["chromo_mask"]).T chromo_box = combo_model( nlfff_box, prepared_run.dr3, prepared_run.base_bz_arr.T, prepared_run.base_ic_arr.T, chromo_mask=chromo_mask, ) for k in ["codes", "apex_idx", "start_idx", "end_idx", "seed_idx", "av_field", "phys_length", "voxel_status"]: if lines is not None and k in lines: chromo_box[k] = lines[k] if "phys_length" in chromo_box: chromo_box["phys_length"] *= prepared_run.dr3[0] chromo_box["attrs"] = header with stage_progress_cls(f"Computing {chr_stage_tag} model") as progress: chr_stage = {"corona": nlfff_box, "chromo": _make_chromo_group(chromo_box)} if lines is not None: chr_stage["lines"] = _make_lines_group(lines, prepared_run.dr3) save_stage(chr_stage_tag, chr_stage, chromo_source_axis_order_2d="xy") stage_times[chr_stage_tag] = progress.finish() def _save_stage_passthrough( stage_tag: str, stage_box: dict, *, context: StageSaveContext, source_axis_order_3d: str = "xyz", chromo_source_axis_order_2d: str = "yx", ) -> dict: if not _should_save_stage(stage_tag, context.cfg): return stage_box prepared_run = context.prepared_run working_box = dict(stage_box) if "base" not in working_box: working_box["base"] = prepared_run.base_group if "refmaps" not in working_box: working_box["refmaps"] = prepared_run.refmaps if "corona" in working_box: corona = working_box["corona"] if "dr" not in corona: corona["dr"] = prepared_run.dr3 corona_for_id = corona if _decode_id_text(source_axis_order_3d).strip().lower() == "zyx": corona_for_id = _h5_corona_to_internal_xyz(dict(corona), "zyx") voxel_id, corona_base = gx_box2id( {"corona": corona_for_id, "lines": working_box.get("lines"), "chromo": working_box.get("chromo")}, return_corona_base=True, ) if corona_base is not None: corona["corona_base"] = int(corona_base) working_box["corona"] = corona grid = dict(context.default_grid) if voxel_id is not None: grid["voxel_id"] = voxel_id else: grid["voxel_id"] = gx_box2id({ "corona": {"bx": context.empty_grid, "by": context.empty_grid, "bz": context.empty_grid, "dr": prepared_run.dr3} }) if "chromo" in working_box and "dz" in working_box["chromo"]: grid["dz"] = working_box["chromo"]["dz"] working_box["grid"] = grid elif "grid" not in working_box: working_box["grid"] = context.default_grid stage_id = f"{prepared_run.base}.{stage_tag}" lineage_suffix = _canonical_lineage_suffix(stage_tag) metadata = { "execute": context.execute_cmd, "id": stage_id, "lineage": ( f"{prepared_run.lineage_marker}->{_lineage_delta_from_entry(prepared_run.entry_stage_for_marker or stage_tag, stage_tag)}.h5" if prepared_run.lineage_marker else _merge_lineage(prepared_run.lineage_root, lineage_suffix) ), "disambiguation": "SFQ" if context.cfg.sfq else "HMI", "projection": _decode_id_text(prepared_run.projection_tag).upper(), "axis_order_2d": "yx", "axis_order_3d": "zyx", "vector_layout": "split_components", } if prepared_run.vert_current_error: metadata["vert_current_error"] = prepared_run.vert_current_error working_box["metadata"] = metadata merged_observer = _merge_observer_metadata( prepared_run.observer_metadata, working_box.get("observer") if isinstance(working_box.get("observer"), dict) else None, ) if merged_observer is not None: working_box["observer"] = merged_observer working_box = _normalize_stage_for_h5( working_box, source_axis_order_3d=source_axis_order_3d, chromo_source_axis_order_2d=chromo_source_axis_order_2d, ) out_path = _stage_filename(context.out_dir, prepared_run.base, stage_tag) write_b3d_h5(str(out_path), working_box) print(f"Saved {stage_tag}: {out_path}") context.produced.append(out_path) return stage_box def _run_nas_stage( cfg: Fov2BoxConfig, prepared_run: PreparedRunState, pot_box: Optional[dict], bnd_box: Optional[dict], nlfff_box: Optional[dict], expected_shape_3d: Tuple[int, int, int], save_stage: Callable[..., None], finalize: Callable[[], None], stage_times: Dict[str, float], stage_progress_cls: type, ) -> NasStageResult: if nlfff_box is None: if cfg.use_potential: if pot_box is None: raise ValueError("pot_box must be available when use_potential is enabled") nlfff_box = { "bx": np.array(pot_box["bx"], copy=True), "by": np.array(pot_box["by"], copy=True), "bz": np.array(pot_box["bz"], copy=True), "dr": prepared_run.dr3, "attrs": {"model_type": "pot"}, } nlfff_box = _prepare_stage_corona_payload( nlfff_box, source="memory", expected_shape_3d=expected_shape_3d, allowed_model_types=("pot",), ) else: if bnd_box is None: raise ValueError("bnd_box must be available before computing the NAS stage") with stage_progress_cls("Computing NAS model") as progress: nlfff_box = _solve_nlfff_from_bnd( bnd_box, prepared_run.dr3, nlfff_lib=cfg.nlfff_lib, ) nlfff_box = _prepare_stage_corona_payload( nlfff_box, source="memory", expected_shape_3d=expected_shape_3d, allowed_model_types=("nlfff",), ) save_stage("NAS", {"corona": nlfff_box}) stage_times["NAS"] = progress.finish() if cfg.nlfff_only or _last_stage_tag(cfg.stop_after) == "NAS": finalize() return NasStageResult(nlfff_box=nlfff_box, finalized=True) elif str(nlfff_box.get("attrs", {}).get("model_type", "")).lower() == "nlfff": nlfff_box = _prepare_stage_corona_payload( nlfff_box, source="memory", expected_shape_3d=expected_shape_3d, allowed_model_types=("nlfff",), ) # Jumped to NAS from an existing NAS/GEN/CHR entry box. with stage_progress_cls("Preparing NAS model from entry data") as progress: save_stage("NAS", {"corona": nlfff_box}) stage_times["NAS"] = progress.finish() if cfg.nlfff_only or _last_stage_tag(cfg.stop_after) == "NAS": finalize() return NasStageResult(nlfff_box=nlfff_box, finalized=True) if nlfff_box is None: raise ValueError("NAS stage did not produce an nlfff_box") return NasStageResult(nlfff_box=nlfff_box, finalized=False) def _run_gen_chr_stages( cfg: Fov2BoxConfig, prepared_run: PreparedRunState, nlfff_box: dict, *, resume_mode: bool, entry_stage: Optional[str], target_stage: str, goto_chromo: bool, entry_lines: Optional[dict], save_stage: Callable[..., None], finalize: Callable[[], None], stage_times: Dict[str, float], stage_progress_cls: type, ) -> GenChrStageResult: gen_result = _run_gen_stage( cfg, prepared_run, nlfff_box, resume_mode=resume_mode, entry_stage=entry_stage, target_stage=target_stage, goto_chromo=goto_chromo, entry_lines=entry_lines, save_stage=save_stage, finalize=finalize, stage_times=stage_times, stage_progress_cls=stage_progress_cls, ) if gen_result.finalized: return GenChrStageResult(finalized=True) _run_chr_stage( prepared_run, nlfff_box, stage_prefix=gen_result.stage_prefix, lines=gen_result.lines, save_stage=save_stage, stage_times=stage_times, stage_progress_cls=stage_progress_cls, ) return GenChrStageResult(finalized=False) def _warn_impossible_save_requests(cfg: Fov2BoxConfig, start_stage: str) -> None: order = {"NONE": 0, "POT": 1, "BND": 2, "NAS": 3, "GEN": 4, "CHR": 5} start_ord = order.get(start_stage, 0) req = [ ("NONE", cfg.save_empty_box, "--save-empty-box"), ("BND", cfg.save_bounds, "--save-bounds"), ("POT", cfg.save_potential, "--save-potential"), ("NAS", cfg.save_nas, "--save-nas"), ("GEN", cfg.save_gen, "--save-gen"), ("CHR", cfg.save_chr, "--save-chr"), ] impossible = [flag for st, enabled, flag in req if enabled and order.get(st, 0) < start_ord] if impossible: print( "Warning: impossible save request(s) before selected start stage " f"{start_stage}: {', '.join(impossible)} (ignored)." ) def _clone_corona_with_model_type(corona: Dict[str, Any], model_type: str) -> Dict[str, Any]: cloned = dict(corona) attrs = cloned.get("attrs") attrs_dict = dict(attrs) if isinstance(attrs, dict) else {} attrs_dict["model_type"] = model_type cloned["attrs"] = attrs_dict return cloned def _prepare_resume_jump_boxes( active_jump: Optional[str], entry_stage: Optional[str], entry_corona: Optional[Dict[str, Any]], ) -> Tuple[Optional[Dict[str, Any]], Optional[Dict[str, Any]], Optional[Dict[str, Any]]]: pot_box = None bnd_box = None nlfff_box = None if active_jump not in ("potential", "bounds", "nlfff"): return pot_box, bnd_box, nlfff_box if not entry_corona: raise ValueError("--entry-box does not contain corona vectors required by selected jump/recompute action.") if active_jump == "potential": # NONE->POT must compute a fresh potential field from the base boundary. if entry_stage != "NONE": pot_box = _clone_corona_with_model_type(entry_corona, "pot") return pot_box, bnd_box, nlfff_box if active_jump == "bounds": # NONE->BND and POT->BND need real forward-stage computation instead of # relabeling the entry corona as a later stage. if entry_stage == "POT": pot_box = _clone_corona_with_model_type(entry_corona, "pot") elif entry_stage not in (None, "NONE"): bnd_box = _clone_corona_with_model_type(entry_corona, "bnd") return pot_box, bnd_box, nlfff_box if entry_stage == "POT": pot_box = _clone_corona_with_model_type(entry_corona, "pot") elif entry_stage == "BND": bnd_box = _clone_corona_with_model_type(entry_corona, "bnd") elif entry_stage in ("NAS", "GEN", "CHR"): nlfff_box = _clone_corona_with_model_type(entry_corona, "nlfff") else: raise ValueError(f"Unsupported entry stage for --jump2nlfff path: {entry_stage}") return pot_box, bnd_box, nlfff_box def _stage_tag_from_stage(stage: str) -> str: return { "NONE": "NONE", "POT": "POT", "BND": "BND", "NAS": "NAS", "GEN": "NAS.GEN", "CHR": "NAS.CHR", }.get(stage, "NAS.CHR") def _extract_execute_paths(execute_text: str) -> Tuple[Optional[str], Optional[str]]: """ Extract data/model directory defaults from metadata/execute. Supports both Python CLI form and IDL gx_fov2box form. """ if not execute_text: return None, None data_dir: Optional[str] = None gxmodel_dir: Optional[str] = None text = str(execute_text) # Python CLI style: gx-fov2box ... --data-dir X --gxmodel-dir Y try: parts = shlex.split(text) except Exception: parts = [] for i, token in enumerate(parts): if token == "--data-dir" and i + 1 < len(parts): data_dir = parts[i + 1] elif token.startswith("--data-dir="): data_dir = token.split("=", 1)[1] elif token == "--gxmodel-dir" and i + 1 < len(parts): gxmodel_dir = parts[i + 1] elif token.startswith("--gxmodel-dir="): gxmodel_dir = token.split("=", 1)[1] # IDL style fallback: TMP_DIR='...' and OUT_DIR='...' if data_dir is None: m = re.search(r"\bTMP_DIR\s*=\s*['\"]([^'\"]+)['\"]", text, flags=re.IGNORECASE) if not m: m = re.search(r"\bTMP_DIR\s*=\s*([^,\s]+)", text, flags=re.IGNORECASE) if m: data_dir = m.group(1).strip().strip("'\"") if gxmodel_dir is None: m = re.search(r"\bOUT_DIR\s*=\s*['\"]([^'\"]+)['\"]", text, flags=re.IGNORECASE) if not m: m = re.search(r"\bOUT_DIR\s*=\s*([^,\s]+)", text, flags=re.IGNORECASE) if m: gxmodel_dir = m.group(1).strip().strip("'\"") return data_dir, gxmodel_dir def _extract_execute_geometry(execute_text: str) -> Tuple[Optional[Tuple[float, float]], Optional[str], Optional[str]]: """ Extract center coordinates, frame and projection hints from metadata/execute text. Returns: (coords, frame, projection) - coords: (x, y) as floats - frame: one of {"hpc","hgc","hgs"} or None - projection: one of {"cea","top"} or None """ if not execute_text: return None, None, None text = str(execute_text) coords: Optional[Tuple[float, float]] = None frame: Optional[str] = None projection: Optional[str] = None # Python CLI style: --coords X Y [--hpc|--hgc|--hgs] [--cea|--top] try: parts = shlex.split(text) except Exception: parts = [] for i, token in enumerate(parts): if token == "--coords" and i + 2 < len(parts): try: coords = (float(parts[i + 1]), float(parts[i + 2])) except Exception: pass elif token.startswith("--coords="): try: vals = token.split("=", 1)[1].split(",") if len(vals) == 2: coords = (float(vals[0]), float(vals[1])) except Exception: pass elif token == "--hpc": frame = "hpc" elif token == "--hgc": frame = "hgc" elif token == "--hgs": frame = "hgs" elif token == "--cea": projection = "cea" elif token == "--top": projection = "top" # IDL canonical pattern: # CENTER_ARCSEC=[ -280, -250] if coords is None: m = re.search( r"\bCENTER_ARCSEC\s*=\s*\[\s*([+-]?\d+(?:\.\d+)?)\s*,\s*([+-]?\d+(?:\.\d+)?)\s*\]", text, flags=re.IGNORECASE, ) if m: coords = (float(m.group(1)), float(m.group(2))) frame = frame or "hpc" # Optional degree-based patterns. if coords is None: m = re.search( r"\bCENTER_HGC\s*=\s*\[\s*([+-]?\d+(?:\.\d+)?)\s*,\s*([+-]?\d+(?:\.\d+)?)\s*\]", text, flags=re.IGNORECASE, ) if m: coords = (float(m.group(1)), float(m.group(2))) frame = "hgc" if coords is None: m = re.search( r"\bCENTER_HGS\s*=\s*\[\s*([+-]?\d+(?:\.\d+)?)\s*,\s*([+-]?\d+(?:\.\d+)?)\s*\]", text, flags=re.IGNORECASE, ) if m: coords = (float(m.group(1)), float(m.group(2))) frame = "hgs" # Projection hint in IDL execute. if projection is None: up = text.upper() if " /TOP" in up or re.search(r",\s*TOP\s*=\s*1\b", up): projection = "top" elif " /CEA" in up or re.search(r",\s*CEA\s*=\s*1\b", up): projection = "cea" return coords, frame, projection def _flag_explicit_on_cli(flag: str) -> bool: args = sys.argv[1:] return any(arg == flag or arg.startswith(f"{flag}=") for arg in args) def _normalize_reproject_algorithm(value: str) -> str: return str(value or "").strip().lower() def _parse_reproject_scan(value: Optional[str]) -> Optional[Tuple[str, ...]]: if value is None: return None raw = str(value).strip().lower() if not raw: return None if raw == "all": return _VALID_REPROJECT_ALGORITHMS values = [] for item in raw.split(","): alg = _normalize_reproject_algorithm(item) if not alg: continue if alg not in _VALID_REPROJECT_ALGORITHMS: raise ValueError( f"Unsupported reprojection algorithm '{alg}' in --reproject-scan. " f"Choose from: {', '.join(_VALID_REPROJECT_ALGORITHMS)}." ) if alg not in values: values.append(alg) if not values: raise ValueError("--reproject-scan did not include any valid reprojection algorithms.") return tuple(values) def _warn_path_default(role: str, path_text: str) -> None: p = Path(path_text).expanduser() if p.exists(): return parent = p.parent if parent.exists(): print(f"Warning: {role} from metadata/execute does not exist yet: {p} (will be used as requested).") else: print(f"Warning: {role} parent directory does not exist on this system: {parent}") def _lines_quiet(maglib: Any, **kwargs: Any) -> Dict[str, Any]: """Suppress noisy stdout/stderr emitted by the underlying line tracer.""" sink = io.StringIO() with contextlib.redirect_stdout(sink), contextlib.redirect_stderr(sink): return maglib.lines(**kwargs) def _lines_fast(maglib: Any, **kwargs: Any) -> Dict[str, Any]: """ Call the pyAMaFiL line tracer without the unconditional ``print(res)`` performed by ``MagFieldProcessor.lines()``. """ reduce_passed = kwargs.get("reduce_passed") chromo_level = kwargs.get("chromo_level", 1) seeds = kwargs.get("seeds") max_length = kwargs.get("max_length", 0) step = kwargs.get("step", 1.0) tolerance = kwargs.get("tolerance", 1e-3) tolerance_bound = kwargs.get("tolerance_bound", 1e-3) n_processes = kwargs.get("n_processes", 0) debug_input = kwargs.get("debug_input", False) bx = getattr(maglib, "_MagFieldProcessor__bx", None) by = getattr(maglib, "_MagFieldProcessor__by", None) bz = getattr(maglib, "_MagFieldProcessor__bz", None) if bx is None or by is None or bz is None: return maglib.lines(**kwargs) res = maglib.lines_wrapper( bx, by, bz, reduce_passed, chromo_level, seeds, max_length, step, tolerance, tolerance_bound, n_processes, debug_input, ) shape_zyx = np.flip(bx.shape) transpose = seeds is None res["voxel_status"] = mfp_util_transpose_index(res["voxel_status"], shape_zyx, transpose) res["phys_length"] = mfp_util_transpose_index(res["phys_length"], shape_zyx, transpose) res["av_field"] = mfp_util_transpose_index(res["av_field"], shape_zyx, transpose) res["codes"] = mfp_util_transpose_index(res["codes"], shape_zyx, transpose) res["apex_idx"] = mfp_util_transpose_index(mfp_util_invert_index_array(res["apex_idx"], shape_zyx), shape_zyx, transpose) res["start_idx"] = mfp_util_transpose_index(mfp_util_invert_index_array(res["start_idx"], shape_zyx), shape_zyx, transpose) res["end_idx"] = mfp_util_transpose_index(mfp_util_invert_index_array(res["end_idx"], shape_zyx), shape_zyx, transpose) res["seed_idx"] = mfp_util_transpose_index(mfp_util_invert_index_array(res["seed_idx"], shape_zyx), shape_zyx, transpose) if res.get("coords") is not None: res["coords"] = res["coords"][:, [1, 0, 2, 3]] return res def _load_maglib_idl_cube(maglib: Any, box: Dict[str, Any], dr: Any = 0.001 * u.solRad) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Load cube components into ``MagFieldProcessor`` in the same boundary layout captured from the IDL ``mfoLines`` call: - internal box arrays are normalized to ``(x, y, z)`` - DLL input arrays are ``(z, x, y)`` - ``bx``/``by`` are swapped relative to the raw box field names """ bx = np.asarray(box["by"]).transpose(2, 0, 1) by = np.asarray(box["bx"]).transpose(2, 0, 1) bz = np.asarray(box["bz"]).transpose(2, 0, 1) load_vars = getattr(maglib, "_MagFieldProcessor__load_vars") load_vars(bx, by, bz, dr) return bx, by, bz def _solve_nlfff_from_bnd( bnd_box: Dict[str, Any], dr: Any, *, nlfff_lib: Optional[str] = None, ) -> Dict[str, Any]: """Run the NLFFF solver from an internal ``(x, y, z)`` BND cube. The solver-facing layout must match the IDL ``mfoLines`` path: load the cube through the same component swap and ``(z, x, y)`` transpose used by ``_load_maglib_idl_cube()``, then preserve the returned solver component names and cube layout. Real BND->NAS parity shows the DLL already returns ``bx``/``by`` under the canonical internal names expected by the stage contract; swapping them again corrupts the horizontal field while leaving ``bz`` and array shapes intact. """ maglib = MagFieldProcessor(nlfff_lib or "") _load_maglib_idl_cube(maglib, bnd_box, dr) res_nlf = maglib.NLFFF() return { "bx": np.asarray(res_nlf["bx"]), "by": np.asarray(res_nlf["by"]), "bz": np.asarray(res_nlf["bz"]), "dr": np.asarray(bnd_box["dr"], dtype=float).copy(), "attrs": {"model_type": "nlfff"}, } def _decode_id_text(v: Any) -> str: if hasattr(v, "item"): try: return _decode_id_text(v.item()) except Exception: pass if isinstance(v, (bytes, bytearray)): return bytes(v).decode("utf-8", errors="ignore") s = str(v or "").strip() if s.startswith(("b'", 'b"', "B'", 'B"')): try: # Normalize uppercase byte literal prefix (e.g. B'CEA') to valid Python syntax. lit_src = f"b{s[1:]}" if s[:1] == "B" else s lit = ast.literal_eval(lit_src) if isinstance(lit, (bytes, bytearray)): return bytes(lit).decode("utf-8", errors="ignore") except Exception: pass return s def _split_stage_id(model_id: str) -> Tuple[str, str]: sid = _decode_id_text(model_id) up = sid.upper() for suffix in ( ".POT.GEN.CHR", ".NAS.GEN.CHR", ".POT.CHR", ".NAS.CHR", ".POT.GEN", ".NAS.GEN", ".NAS", ".BND", ".POT", ".NONE", ): if up.endswith(suffix): return sid[: -len(suffix)], suffix[1:] return sid, "" def _canonical_lineage_suffix(stage_tag: str) -> str: mapping = { "NONE": "NONE", "POT": "NONE.POT", "BND": "NONE.POT.BND", "NAS": "NONE.POT.BND.NAS", "POT.GEN": "NONE.POT.GEN", "POT.CHR": "NONE.POT.CHR", "POT.GEN.CHR": "NONE.POT.GEN.CHR", "NAS.GEN": "NONE.POT.BND.NAS.GEN", "NAS.CHR": "NONE.POT.BND.NAS.CHR", "NAS.GEN.CHR": "NONE.POT.BND.NAS.GEN.CHR", } return mapping.get(stage_tag, stage_tag) def _merge_lineage(root: str, suffix: str) -> str: r = [t for t in str(root or "").split(".") if t] s = [t for t in str(suffix or "").split(".") if t] if not r: return ".".join(s) if not s: return ".".join(r) max_k = min(len(r), len(s)) overlap = 0 for i in range(max_k, 0, -1): if r[-i:] == s[:i]: overlap = i break return ".".join(r + s[overlap:]) def _lineage_delta_from_entry(entry_stage_tag: str, saved_stage_tag: str) -> str: """Return lineage segment from entry stage to saved stage (inclusive of implied steps).""" e = [t for t in _canonical_lineage_suffix(entry_stage_tag).split(".") if t] s = [t for t in _canonical_lineage_suffix(saved_stage_tag).split(".") if t] # Remove shared prefix (e.g., NONE.POT), keep what was computed after entry. i = 0 while i < min(len(e), len(s)) and e[i] == s[i]: i += 1 tail = s[i:] if not tail: # Clone/self-save: keep explicit stage identity on RHS. return saved_stage_tag return ".".join(tail) def _should_save_stage(stage_tag: str, cfg: Fov2BoxConfig) -> bool: stop_tag = _last_stage_tag(cfg.stop_after) if stop_tag == "GEN" and stage_tag in ("NAS.GEN", "POT.GEN"): return True if stop_tag == "CHR" and stage_tag.endswith(".CHR"): return True if stage_tag == stop_tag: return True if stage_tag == "POT": return cfg.save_potential if stage_tag == "NAS": return cfg.save_nas if stage_tag in ("NAS.GEN", "POT.GEN"): return cfg.save_gen if stage_tag.endswith(".CHR"): return cfg.save_chr if stage_tag == "NONE": return cfg.save_empty_box if stage_tag == "BND": return cfg.save_bounds return False def _build_execute_cmd(cfg: Fov2BoxConfig) -> str: cmd = ["gx-fov2box"] if cfg.time: cmd += ["--time", cfg.time] if cfg.coords: cmd += ["--coords", str(cfg.coords[0]), str(cfg.coords[1])] if cfg.hpc: cmd.append("--hpc") elif cfg.hgc: cmd.append("--hgc") else: cmd.append("--hgs") if cfg.top: cmd.append("--top") elif cfg.cea: cmd.append("--cea") if cfg.box_dims: cmd += ["--box-dims", *(str(v) for v in cfg.box_dims)] cmd += ["--dx-km", f"{cfg.dx_km:.6f}"] cmd += ["--pad-frac", f"{cfg.pad_frac:.4f}"] cmd += ["--data-dir", cfg.data_dir] cmd += ["--gxmodel-dir", cfg.gxmodel_dir] if str(cfg.download_backend or "drms").lower() == "fido": cmd.append("--use-fido") if cfg.force_download: cmd.append("--force-download") if cfg.euv: cmd.append("--euv") if cfg.uv: cmd.append("--uv") if cfg.save_empty_box: cmd.append("--save-empty-box") if cfg.save_potential: cmd.append("--save-potential") if cfg.save_bounds: cmd.append("--save-bounds") if cfg.save_nas: cmd.append("--save-nas") if cfg.save_gen: cmd.append("--save-gen") if cfg.save_chr: cmd.append("--save-chr") if cfg.empty_box_only: cmd.append("--empty-box-only") if cfg.potential_only: cmd.append("--potential-only") if cfg.nlfff_only: cmd.append("--nlfff-only") if cfg.generic_only: cmd.append("--generic-only") if cfg.use_potential: cmd.append("--use-potential") if cfg.skip_lines: cmd.append("--skip-lines") if cfg.center_vox: cmd.append("--center-vox") if cfg.reduce_passed is not None: cmd += ["--reduce-passed", str(cfg.reduce_passed)] if cfg.entry_box: cmd += ["--entry-box", cfg.entry_box] if cfg.sfq: cmd.append("--sfq") if cfg.observer_name: cmd += ["--observer-name", cfg.observer_name] if cfg.fov_xc is not None: cmd += ["--fov-xc", f"{cfg.fov_xc:.2f}"] if cfg.fov_yc is not None: cmd += ["--fov-yc", f"{cfg.fov_yc:.2f}"] if cfg.fov_xsize is not None: cmd += ["--fov-xsize", f"{cfg.fov_xsize:.2f}"] if cfg.fov_ysize is not None: cmd += ["--fov-ysize", f"{cfg.fov_ysize:.2f}"] if cfg.square_fov: cmd.append("--square-fov") if cfg.stop_after: cmd += ["--stop-after", cfg.stop_after] if cfg.nlfff_lib: cmd += ["--nlfff-lib", cfg.nlfff_lib] if cfg.jump2potential: cmd.append("--jump2potential") if cfg.jump2bounds: cmd.append("--jump2bounds") if cfg.jump2nlfff: cmd.append("--jump2nlfff") if cfg.jump2lines: cmd.append("--jump2lines") if cfg.jump2chromo: cmd.append("--jump2chromo") if cfg.rebuild: cmd.append("--rebuild") if cfg.rebuild_from_none: cmd.append("--rebuild-from-none") if cfg.reproject_algorithm != "adaptive": cmd += ["--reproject-algorithm", cfg.reproject_algorithm] for path in cfg.refmaps_path or (): cmd += ["--refmaps-path", path] return shlex.join(cmd) def _explicit_observer_fov(cfg: Fov2BoxConfig) -> Optional[dict[str, Any]]: parts = [cfg.fov_xc, cfg.fov_yc, cfg.fov_xsize, cfg.fov_ysize] if all(v is None for v in parts): return None if any(v is None for v in parts): raise ValueError("Observer FOV requires all of --fov-xc/--fov-yc/--fov-xsize/--fov-ysize.") return { "frame": "helioprojective", "xc_arcsec": float(cfg.fov_xc), "yc_arcsec": float(cfg.fov_yc), "xsize_arcsec": float(cfg.fov_xsize), "ysize_arcsec": float(cfg.fov_ysize), "square": bool(cfg.square_fov), } def _observer_metadata_from_source_map(source_map: Optional[Map], cfg: Fov2BoxConfig) -> dict[str, Any]: observer_meta: dict[str, Any] = {"name": str(cfg.observer_name or "earth")} observer_meta["label"] = { "earth": "Earth", "sdo": "SDO", "solar orbiter": "Solar Orbiter", "stereo-a": "STEREO-A", "stereo-b": "STEREO-B", "custom": "Custom", }.get(normalize_observer_key(cfg.observer_name), "Earth") explicit_fov = _explicit_observer_fov(cfg) if explicit_fov is not None: observer_meta["fov"] = explicit_fov ephemeris: dict[str, Any] = {} if source_map is not None: source_date = getattr(source_map, "date", None) try: if source_date is not None: ephemeris["obs_date"] = Time(source_date).isot except Exception: pass obs = _resolve_cli_observer(source_map, cfg.observer_name, Time(source_date) if source_date is not None else None) if obs is not None: try: obs_hgs = obs.transform_to(HeliographicStonyhurst(obstime=source_date)) ephemeris["hgln_obs_deg"] = float(obs_hgs.lon.to_value(u.deg)) ephemeris["hglt_obs_deg"] = float(obs_hgs.lat.to_value(u.deg)) except Exception: pass meta = getattr(source_map, "meta", None) if meta is not None and normalize_observer_key(cfg.observer_name) == "sdo": try: if "HGLN_OBS" in meta and "hgln_obs_deg" not in ephemeris: ephemeris["hgln_obs_deg"] = float(meta["HGLN_OBS"]) if "HGLT_OBS" in meta and "hglt_obs_deg" not in ephemeris: ephemeris["hglt_obs_deg"] = float(meta["HGLT_OBS"]) if "DSUN_OBS" in meta and "dsun_cm" not in ephemeris: ephemeris["dsun_cm"] = float(u.Quantity(meta["DSUN_OBS"], u.m).to_value(u.cm)) if "RSUN_REF" in meta and "rsun_cm" not in ephemeris: ephemeris["rsun_cm"] = float(u.Quantity(meta["RSUN_REF"], u.m).to_value(u.cm)) except Exception: pass if hasattr(source_map, "rsun_meters") and source_map.rsun_meters is not None: ephemeris["rsun_cm"] = float(u.Quantity(source_map.rsun_meters).to_value(u.cm)) if obs is not None: try: ephemeris["dsun_cm"] = float(u.Quantity(obs.radius).to_value(u.cm)) except Exception: pass elif hasattr(source_map, "dsun") and source_map.dsun is not None: ephemeris["dsun_cm"] = float(u.Quantity(source_map.dsun).to_value(u.cm)) if ephemeris: observer_meta["ephemeris"] = ephemeris pb0r = build_pb0r_metadata_from_ephemeris( ephemeris, observer_key=observer_meta.get("name"), obs_time=ephemeris.get("obs_date"), ) if pb0r: observer_meta["pb0r"] = pb0r return observer_meta def _observer_metadata_from_entry(entry_loaded: Optional[Dict[str, Any]], cfg: Fov2BoxConfig) -> Optional[dict[str, Any]]: if not isinstance(entry_loaded, dict): return None base_observer = entry_loaded.get("observer", {}) observer_meta: dict[str, Any] = dict(base_observer) if isinstance(base_observer, dict) else {} observer_meta["name"] = str(cfg.observer_name or observer_meta.get("name") or "earth") observer_meta["label"] = str(observer_meta.get("label") or { "earth": "Earth", "sdo": "SDO", "solar orbiter": "Solar Orbiter", "stereo-a": "STEREO-A", "stereo-b": "STEREO-B", "custom": "Custom", }.get(normalize_observer_key(observer_meta.get("name")), "Earth")) explicit_fov = _explicit_observer_fov(cfg) if explicit_fov is not None: observer_meta["fov"] = explicit_fov ephemeris = dict(observer_meta.get("ephemeris", {})) if isinstance(observer_meta.get("ephemeris"), dict) else {} if not ephemeris: index_text = None base_group = entry_loaded.get("base") if isinstance(base_group, dict): index_text = base_group.get("index") if index_text: try: header = fits.Header.fromstring(_decode_id_text(index_text), sep="\n") if "HGLN_OBS" in header: ephemeris["hgln_obs_deg"] = float(header["HGLN_OBS"]) if "HGLT_OBS" in header: ephemeris["hglt_obs_deg"] = float(header["HGLT_OBS"]) if "RSUN_REF" in header: ephemeris["rsun_cm"] = float(u.Quantity(header["RSUN_REF"], u.m).to_value(u.cm)) if "DSUN_OBS" in header: ephemeris["dsun_cm"] = float(u.Quantity(header["DSUN_OBS"], u.m).to_value(u.cm)) except Exception: pass if ephemeris: ephemeris = { key: ephemeris[key] for key in ("obs_date", "hgln_obs_deg", "hglt_obs_deg", "dsun_cm", "rsun_cm") if key in ephemeris } if ephemeris: observer_meta["ephemeris"] = ephemeris pb0r = build_pb0r_metadata_from_ephemeris( ephemeris, observer_key=observer_meta.get("name"), obs_time=ephemeris.get("obs_date"), ) if pb0r: observer_meta["pb0r"] = pb0r return observer_meta or None def _merge_observer_metadata( base_observer: Optional[dict[str, Any]], stage_observer: Optional[dict[str, Any]], ) -> Optional[dict[str, Any]]: merged: dict[str, Any] = {} if isinstance(base_observer, dict): merged.update(base_observer) if isinstance(stage_observer, dict): for key, value in stage_observer.items(): if key == "ephemeris" and isinstance(value, dict): ephemeris = dict(merged.get("ephemeris", {})) if isinstance(merged.get("ephemeris"), dict) else {} ephemeris.update(value) merged["ephemeris"] = ephemeris elif key in ("fov", "fov_box", "pb0r") and isinstance(value, dict): merged[key] = dict(value) else: merged[key] = value return merged or None def _resolve_cli_observer(source_map: Optional[Map], observer_name: str, obs_time: Optional[Time]): if obs_time is None: return getattr(source_map, "observer_coordinate", None) key = normalize_observer_key(observer_name) if key == "earth": return get_earth(obs_time) if key == "sdo": if source_map is not None and getattr(source_map, "observer_coordinate", None) is not None: try: return source_map.observer_coordinate.transform_to(HeliographicStonyhurst(obstime=obs_time)) except Exception: return source_map.observer_coordinate return get_earth(obs_time) coord = resolve_named_observer(key, obs_time) return coord if coord is not None else get_earth(obs_time) def _build_index_header( bottom_wcs_header, source_map, *, observer_override: Any = None, obs_time_override: Any = None, rsun_override: Any = None, ) -> str: """ Build an IDL-GX compatible INDEX-like FITS header string. This mirrors the intent of IDL `wcs2fitshead(..., /structure)` for the base map WCS: preserve WCS cards and add key TIME/POSITION cards expected by legacy GX Simulator routines. """ header = fits.Header(bottom_wcs_header).copy() date_value = source_map.date if source_map is not None and source_map.date is not None else obs_time_override # Normalize to IDL-GX conventions used in box.index. ctype1 = str(header.get("CTYPE1", "")) ctype2 = str(header.get("CTYPE2", "")) if ctype1.startswith("HGLN-"): try: if date_value is not None and "CRVAL1" in header and "CRVAL2" in header: rsun_value = rsun_override if rsun_value is None and source_map is not None and getattr(source_map, "rsun_meters", None) is not None: rsun_value = source_map.rsun_meters if rsun_value is None: rsun_value = IDL_HMI_RSUN_M * u.m obs_for_carr = observer_override if obs_for_carr is None and source_map is not None: obs_for_carr = getattr(source_map, "observer_coordinate", None) if obs_for_carr is None: obs_for_carr = "earth" origin_hgs = SkyCoord( lon=float(header["CRVAL1"]) * u.deg, lat=float(header["CRVAL2"]) * u.deg, radius=u.Quantity(rsun_value).to(u.m), frame=HeliographicStonyhurst(obstime=date_value), ) origin_hgc = origin_hgs.transform_to( HeliographicCarrington(observer=obs_for_carr, obstime=date_value) ) header["CRVAL1"] = float(origin_hgc.lon.to_value(u.deg)) header["CRVAL2"] = float(origin_hgc.lat.to_value(u.deg)) except Exception: pass header["CTYPE1"] = "CRLN-" + ctype1.split("-", 1)[1] if ctype2.startswith("HGLT-"): header["CTYPE2"] = "CRLT-" + ctype2.split("-", 1)[1] header["SIMPLE"] = int(bool(header.get("SIMPLE", 1))) header["BITPIX"] = int(header.get("BITPIX", 8)) header["NAXIS"] = int(header.get("NAXIS", 2)) if source_map is not None or obs_time_override is not None: date_obs = date_value.isot if date_value is not None else None if date_obs: header["DATE-OBS"] = date_obs header["DATE_OBS"] = date_obs header["DATE"] = date_obs elif "DATE-OBS" in header and "DATE_OBS" not in header: header["DATE_OBS"] = header["DATE-OBS"] if observer_override is not None and getattr(observer_override, "radius", None) is not None: header["DSUN_OBS"] = float(u.Quantity(observer_override.radius).to_value(u.m)) elif source_map is not None and hasattr(source_map, "dsun") and source_map.dsun is not None: header["DSUN_OBS"] = float(source_map.dsun.to_value(u.m)) obs = observer_override if observer_override is not None else ( source_map.observer_coordinate if source_map is not None else None ) if obs is not None: obs_hgs = obs.transform_to(HeliographicStonyhurst(obstime=date_value)) header["HGLN_OBS"] = float(obs_hgs.lon.to_value(u.deg)) header["HGLT_OBS"] = float(obs_hgs.lat.to_value(u.deg)) header["SOLAR_B0"] = float(obs_hgs.lat.to_value(u.deg)) try: obs_hgc = obs.transform_to(HeliographicCarrington(observer="earth", obstime=date_value)) header["CRLN_OBS"] = float(obs_hgc.lon.to_value(u.deg)) header["CRLT_OBS"] = float(obs_hgc.lat.to_value(u.deg)) except Exception: # If Carrington transformation fails (e.g. unsupported observer configuration), # proceed without CRLN_OBS/CRLT_OBS rather than aborting header creation. # Carrington observer transforms can fail for some observer metadata; # keep header generation robust and proceed with available keys. pass if rsun_override is not None: header["RSUN_REF"] = float(u.Quantity(rsun_override).to_value(u.m)) elif source_map is not None and getattr(source_map, "rsun_meters", None) is not None: header["RSUN_REF"] = float(source_map.rsun_meters.to_value(u.m)) if source_map is not None: tel = source_map.meta.get("telescop") if tel: header["TELESCOP"] = str(tel) instr = source_map.meta.get("instrume") if instr: header["INSTRUME"] = str(instr) if "WCSNAME" not in header: header["WCSNAME"] = "Carrington-Heliographic" return header.tostring(sep="\n", endcard=True) def _print_info(cfg: Fov2BoxConfig) -> None: resolved_reduce_passed = cfg.reduce_passed if cfg.reduce_passed is not None else (0 if cfg.center_vox else 1) resolved_chromo_level = (1000.0 / float(cfg.dx_km)) if cfg.dx_km else 1.0 import pyampp import sunpy runtime_rows = [ ("python_executable", sys.executable, "Interpreter used for this run"), ("python_version", sys.version.split()[0], "Python runtime version"), ("gx_fov2box_module", __file__, "Loaded gx_fov2box module path"), ("pyampp_module", pyampp.__file__, "Loaded pyampp package path"), ("sunpy_version", sunpy.__version__, "Loaded sunpy version"), ("gx-fov2box_cli", shutil.which("gx-fov2box"), "Resolved CLI entry point on PATH"), ] rows = [ ("time", cfg.time, "Observation time in ISO format"), ("coords", cfg.coords, "Center coordinates: arcsec (HPC) or degrees (HGC/HGS)"), ("hpc/hgc/hgs", "HPC" if cfg.hpc else "HGC" if cfg.hgc else "HGS", "Coordinate frame"), ("cea/top", "TOP" if cfg.top else "CEA", "Basemap projection"), ("box_dims", cfg.box_dims, "Box dimensions (nx, ny, nz) in pixels"), ("dx_km", cfg.dx_km, "Voxel size in km"), ("pad_frac", cfg.pad_frac, "Padding fraction used for context map FOV"), ("data_dir", cfg.data_dir, "SDO download/cache directory"), ("gxmodel_dir", cfg.gxmodel_dir, "Output gx_models directory"), ("nlfff_lib", cfg.nlfff_lib, "Override WWNLFFFReconstruction library path for NAS and GEN"), ("download_backend", cfg.download_backend, "SDO downloader backend"), ("drms_sequential", cfg.drms_sequential, "Force single-worker DRMS downloads (HMI and AIA)"), ("force_download", cfg.force_download, "Bypass local cache hits and redownload requested SDO products"), ("entry_box", cfg.entry_box, "Path to precomputed HDF5 box"), ("save_empty_box", cfg.save_empty_box, "Save NONE stage"), ("save_potential", cfg.save_potential, "Save POT stage"), ("save_bounds", cfg.save_bounds, "Save BND stage"), ("save_nas", cfg.save_nas, "Save NAS stage"), ("save_gen", cfg.save_gen, "Save NAS.GEN stage"), ("save_chr", cfg.save_chr, "Save NAS.CHR stage"), ("stop_after", cfg.stop_after, "Stop after stage (dl/none/bnd/pot/nas/gen/chr)"), ("empty_box_only", cfg.empty_box_only, "Stop after NONE stage"), ("potential_only", cfg.potential_only, "Stop after POT stage"), ("nlfff_only", cfg.nlfff_only, "Stop after NAS stage"), ("generic_only", cfg.generic_only, "Stop after NAS.GEN stage"), ("use_potential", cfg.use_potential, "Skip NLFFF; reuse potential field"), ("skip_lines", cfg.skip_lines, "Skip GEN line computation and proceed directly to CHR"), ("center_vox", cfg.center_vox, "Compute lines only through voxel centers (sets reduce_passed=0 unless overridden)"), ("reduce_passed", cfg.reduce_passed, "Expert override: 0|1|2|3 (takes precedence over --center-vox)"), ("reduce_passed_resolved", resolved_reduce_passed, "Effective line-reduction mode used by tracer"), ("chromo_level_resolved", resolved_chromo_level, "Effective chromo level passed to line tracer (1 Mm / dx_km)"), ("euv", cfg.euv, "Download AIA EUV context maps"), ("uv", cfg.uv, "Download AIA UV context maps"), ("sfq", cfg.sfq, "Use SFQ disambiguation (method=0)"), ("jump2potential", cfg.jump2potential, "Start from entry box and jump to POT"), ("jump2bounds", cfg.jump2bounds, "Start from entry box and jump to BND"), ("jump2nlfff", cfg.jump2nlfff, "Start from entry box and jump to NAS"), ("jump2lines", cfg.jump2lines, "Start from entry box and jump to GEN"), ("jump2chromo", cfg.jump2chromo, "Start from entry box and jump to CHR"), ("rebuild", cfg.rebuild, "Ignore entry stage payload and rebuild from NONE"), ("rebuild_from_none", cfg.rebuild_from_none, "Strip entry to NONE-equivalent and continue forward"), ] print("gx-fov2box --info") for name, value, desc in runtime_rows + rows: print(f"- {name}: {value} :: {desc}") missing = [] if not cfg.time: missing.append("--time") if not cfg.coords: missing.append("--coords") if not cfg.box_dims: missing.append("--box-dims") if missing: print("\nWarnings:") for item in missing: print(f"- Missing required {item}") if cfg.entry_box: print("- entry_box provided; time/box-dims may be inferred if present") def _load_hmi_maps_from_downloader( time: Time, data_dir: Path, euv: bool, uv: bool, download_backend: str = "drms", drms_sequential: bool = False, force_download: bool = False, disambig_method: int = 2, strict_required: bool = True, ) -> tuple[Dict[str, Map], dict]: import time as time_mod def _missing_items(report: dict[str, Any], *categories: str) -> list[str]: missing: list[str] = [] for category in categories: items = report.get(category, {}) if isinstance(report, dict) else {} if isinstance(items, dict): missing.extend([k for k, exists in items.items() if not exists]) return missing requested_time = Time(time) def _new_downloader(*, obs_time: Time, want_euv: bool, want_uv: bool, want_hmi: bool): kwargs = { "data_dir": str(data_dir), "euv": want_euv, "uv": want_uv, "hmi": want_hmi, "backend": download_backend, "force_download": force_download, } if download_backend == "drms": kwargs["drms_sequential"] = drms_sequential try: return SDOImageDownloader(obs_time, **kwargs) except TypeError as exc: if "drms_sequential" in str(exc): kwargs.pop("drms_sequential", None) return SDOImageDownloader(obs_time, **kwargs) raise downloader = _new_downloader(obs_time=time, want_euv=False, want_uv=False, want_hmi=True) missing_before = _missing_items(downloader.existence_report, "hmi_b", "hmi_m", "hmi_ic") t0 = time_mod.perf_counter() files = dict(downloader.download_images()) hmi_elapsed = time_mod.perf_counter() - t0 downloaded = force_download or len(missing_before) > 0 required = ["field", "inclination", "azimuth", "disambig", "continuum", "magnetogram"] optional_requested: list[str] = [] if euv: optional_requested.extend(list(AIA_EUV_PASSBANDS)) if uv: optional_requested.extend(list(AIA_UV_PASSBANDS)) missing_required = [k for k in required if not files.get(k)] missing_optional = [k for k in optional_requested if not files.get(k)] if missing_required and strict_required: print("Download summary:") print(f"- required missing: {missing_required}") print(f"- optional missing: {missing_optional}") raise RuntimeError(f"Missing required HMI files: {missing_required}") if missing_required and not strict_required: print("Download summary:") print(f"- required missing: {missing_required}") print(f"- optional missing: {missing_optional}") info = { "requested_obs_time": requested_time.isot, "resolved_obs_time": None, "downloaded": downloaded, "elapsed": hmi_elapsed, "hmi_elapsed": hmi_elapsed, "context_elapsed": 0.0, "missing_before": missing_before, "missing_after": missing_required, "missing_required": missing_required, "missing_optional": missing_optional, "files": files, } return {}, info map_field = load_sunpy_map_compat(files["field"]) map_inclination = load_sunpy_map_compat(files["inclination"]) map_azimuth = load_sunpy_map_compat(files["azimuth"]) map_disambig = load_sunpy_map_compat(files["disambig"]) map_conti = load_sunpy_map_compat(files["continuum"]) map_losma = load_sunpy_map_compat(files["magnetogram"]) map_azimuth = hmi_disambig(map_azimuth, map_disambig, method=disambig_method) maps = { "field": map_field, "inclination": map_inclination, "azimuth": map_azimuth, "disambig": map_disambig, "continuum": map_conti, "magnetogram": map_losma, } resolved_obs_time = Time(map_field.date) if getattr(map_field, "date", None) is not None else requested_time context_elapsed = 0.0 if euv or uv: context_downloader = _new_downloader( obs_time=resolved_obs_time, want_euv=euv, want_uv=uv, want_hmi=False, ) context_missing_before = _missing_items(context_downloader.existence_report, "euv", "uv") missing_before.extend(context_missing_before) t0 = time_mod.perf_counter() context_files = context_downloader.download_images() context_elapsed = time_mod.perf_counter() - t0 downloaded = downloaded or force_download or len(context_missing_before) > 0 for key, path in context_files.items(): if key in required: continue files[key] = path missing_optional = [k for k in optional_requested if not files.get(k)] print("Download summary:") print(f"- required missing: {missing_required}") print(f"- optional missing: {missing_optional}") for key, path in files.items(): if key in ("field", "inclination", "azimuth", "disambig", "continuum", "magnetogram"): continue try: maps[f"AIA_{key}"] = load_sunpy_map_compat(path) except Exception: # Skip optional context channels that cannot be loaded. continue info = { "requested_obs_time": requested_time.isot, "resolved_obs_time": resolved_obs_time.isot, "downloaded": downloaded, "elapsed": hmi_elapsed + context_elapsed, "hmi_elapsed": hmi_elapsed, "context_elapsed": context_elapsed, "missing_before": missing_before, "missing_after": missing_required, "missing_required": missing_required, "missing_optional": missing_optional, "files": files, } return maps, info def _make_header(map_field: Map) -> Dict[str, Any]: header_field = map_field.wcs.to_header() field_frame = map_field.center.heliographic_carrington.frame lon, lat = field_frame.lon.value, field_frame.lat.value obs_time = Time(map_field.date) dsun_obs = header_field["DSUN_OBS"] return {"lon": lon, "lat": lat, "dsun_obs": dsun_obs, "obs_time": str(obs_time.iso)} def _make_lines_group(lines: dict, dr3: np.ndarray) -> dict: group = { "dr": np.asarray(dr3, dtype=float), } for key in ( "start_idx", "end_idx", "seed_idx", "apex_idx", "codes", "av_field", "phys_length", "voxel_status", ): if key in lines: group[key] = lines[key] return group def _make_chromo_group(chromo_box: dict) -> dict: group = {} for key in ( "chromo_idx", "chromo_n", "chromo_t", "n_p", "n_hi", "n_htot", "tr", "tr_h", "chromo_layers", "dz", "chromo_mask", ): if key in chromo_box: group[key] = chromo_box[key] # Export CHR vectors explicitly in H5 (no BCUBE replacement in schema). chromo_bcube = chromo_box.get("chromo_bcube") if isinstance(chromo_bcube, np.ndarray) and chromo_bcube.ndim == 4 and chromo_bcube.shape[-1] == 3: group["bx"] = chromo_bcube[:, :, :, 0] group["by"] = chromo_bcube[:, :, :, 1] group["bz"] = chromo_bcube[:, :, :, 2] return group def _to_h5_2d(arr: np.ndarray, ny: int, nx: int, source_axis_order_2d: str = "yx") -> np.ndarray: a = np.asarray(arr) if a.ndim != 2: return a axis_order = _decode_id_text(source_axis_order_2d).strip().lower() if axis_order == "yx": if a.shape == (ny, nx): return a if a.shape == (nx, ny): return a.T raise ValueError( f"Cannot normalize 2D map shape {a.shape} from source order {source_axis_order_2d!r} " f"to (ny,nx)=({ny},{nx})" ) if axis_order == "xy": if a.shape == (nx, ny) or (nx == ny and a.shape == (ny, nx)): return a.T raise ValueError( f"Cannot normalize 2D map shape {a.shape} from source order {source_axis_order_2d!r} " f"to (ny,nx)=({ny},{nx})" ) raise ValueError(f"Unsupported 2D source axis order: {source_axis_order_2d!r}") def _to_h5_3d(arr: np.ndarray, source_axis_order_3d: str) -> np.ndarray: """ Convert a 3D volume to canonical HDF5 order ``(z, y, x)``. ``gx_fov2box`` currently uses explicit internal ``(x, y, z)`` volumes for solver-facing paths, while HDF5 stores 3D volumes canonically as ``(z, y, x)``. Do not infer the transform from the array shape because cubic boxes make that ambiguous. """ a = np.asarray(arr) if a.ndim != 3: return a axis_order = _decode_id_text(source_axis_order_3d).strip().lower() if axis_order == "zyx": return a if axis_order == "xyz": return a.transpose((2, 1, 0)) raise ValueError(f"Unsupported 3D source axis order: {source_axis_order_3d!r}") def _normalize_stage_for_h5( stage_box: dict, source_axis_order_3d: str = "xyz", base_source_axis_order_2d: str = "yx", chromo_source_axis_order_2d: str = "yx", ) -> dict: out = dict(stage_box) if "base" not in out: return out base = dict(out["base"]) if "bx" in base: ny, nx = np.asarray(base["bx"]).shape elif "bz" in base: ny, nx = np.asarray(base["bz"]).shape else: out["base"] = base return out for k in ("bx", "by", "bz", "ic", "chromo_mask"): if k in base: base[k] = _to_h5_2d(base[k], ny, nx, source_axis_order_2d=base_source_axis_order_2d) out["base"] = base if "corona" in out and isinstance(out["corona"], dict): corona = dict(out["corona"]) for k in ("bx", "by", "bz"): if k in corona: corona[k] = _to_h5_3d(corona[k], source_axis_order_3d) out["corona"] = corona if "chromo" in out and isinstance(out["chromo"], dict): chromo = dict(out["chromo"]) for k in ("bx", "by", "bz", "dz"): if k in chromo: chromo[k] = _to_h5_3d(chromo[k], source_axis_order_3d) for k in ("tr", "tr_h", "chromo_mask"): if k in chromo: chromo[k] = _to_h5_2d(chromo[k], ny, nx, source_axis_order_2d=chromo_source_axis_order_2d) out["chromo"] = chromo if "grid" in out and isinstance(out["grid"], dict): grid = dict(out["grid"]) for k in ("voxel_id", "dz"): if k in grid and np.asarray(grid[k]).ndim == 3: grid[k] = _to_h5_3d(grid[k], source_axis_order_3d) out["grid"] = grid return out def _h5_corona_to_internal_xyz(corona: dict, axis_order_3d: Optional[str]) -> dict: if not isinstance(corona, dict): return corona axis_order = _decode_id_text(axis_order_3d).lower() if axis_order != "zyx": return corona out = dict(corona) for k in ("bx", "by", "bz"): if k in out and np.asarray(out[k]).ndim == 3: out[k] = np.asarray(out[k]).transpose((2, 1, 0)) return out def _load_entry_box_any(entry_path: Path) -> Dict[str, Any]: return load_model(entry_path) def _resolve_box_params(cfg: Fov2BoxConfig) -> Tuple[Time, Tuple[int, int, int], float]: obs_time = Time(cfg.time) if cfg.time else None box_dims = cfg.box_dims dx_km = cfg.dx_km if cfg.entry_box: entry_path = Path(cfg.entry_box) if entry_path.exists(): box_b3d = _load_entry_box_any(entry_path) corona = box_b3d.get("corona") if corona is not None: if box_dims is None and "bx" in corona: shape = np.asarray(corona["bx"]).shape axis_order = _decode_id_text(box_b3d.get("metadata", {}).get("axis_order_3d")).lower() if axis_order == "zyx" and len(shape) == 3: box_dims = (int(shape[2]), int(shape[1]), int(shape[0])) else: # internal cubes are x,y,z unless metadata says zyx if len(shape) == 3: box_dims = (int(shape[0]), int(shape[1]), int(shape[2])) else: box_dims = tuple(int(v) for v in shape) if "dr" in corona and dx_km == 0: dr3 = corona["dr"] dx_km = float(dr3[0] * (IDL_HMI_RSUN_M * u.m).to(u.km).value) if obs_time is None: inferred = _infer_time_from_entry_loaded(box_b3d, entry_path) if inferred: obs_time = Time(inferred) if obs_time is None: raise ValueError("--time is required unless it can be inferred from entry_box") if box_dims is None: raise ValueError("--box-dims is required unless it can be inferred from entry_box") return obs_time, tuple(int(v) for v in box_dims), float(dx_km) @app.command()
[docs] def main( time: Optional[str] = typer.Option(None, "--time", help="Observation time ISO (e.g. 2024-05-12T00:00:00)"), coords: Optional[Tuple[float, float]] = typer.Option(None, "--coords", help="Center coords (x y)"), hpc: bool = typer.Option(False, "--hpc/--no-hpc", help="Use helioprojective coordinates"), hgc: bool = typer.Option(False, "--hgc/--no-hgc", help="Use heliographic carrington coordinates"), hgs: bool = typer.Option(False, "--hgs/--no-hgs", help="Use heliographic stonyhurst coordinates"), cea: bool = typer.Option(False, "--cea", help="Use CEA basemap projection"), top: bool = typer.Option(False, "--top", help="Use TOP-view basemap projection"), box_dims: Optional[Tuple[int, int, int]] = typer.Option(None, "--box-dims", help="Box dims in pixels"), dx_km: float = typer.Option(1400.0, "--dx-km", help="Voxel size in km"), pad_frac: float = typer.Option(0.10, "--pad-frac", help="Padding fraction for FOV"), data_dir: str = typer.Option(DOWNLOAD_DIR, "--data-dir", help="SDO data directory"), gxmodel_dir: str = typer.Option(GXMODEL_DIR, "--gxmodel-dir", help="GX model output directory"), nlfff_lib: Optional[str] = typer.Option(None, "--nlfff-lib", help="Override the WWNLFFFReconstruction shared library path for NAS and GEN stages"), download_backend: Optional[str] = typer.Option(None, "--download-backend", help="Compatibility override: explicitly set downloader backend to fido or drms"), use_fido: bool = typer.Option(False, "--use-fido", help="Use the legacy SunPy/Fido downloader instead of the default DRMS backend"), drms_sequential: bool = typer.Option(False, "--drms-sequential", help="Force DRMS downloads to single-worker mode for maximum reliability"), force_download: bool = typer.Option(False, "--force-download", help="Bypass local cache hits and redownload requested SDO products"), entry_box: Optional[str] = typer.Option(None, "--entry-box", help="Existing HDF5/SAV box"), save_empty_box: bool = typer.Option(False, "--save-empty-box", help="Save NONE stage"), save_potential: bool = typer.Option(False, "--save-potential", help="Save POT stage"), save_bounds: bool = typer.Option(False, "--save-bounds", help="Save BND stage"), save_nas: bool = typer.Option(False, "--save-nas", help="Save NAS stage"), save_gen: bool = typer.Option(False, "--save-gen", help="Save NAS.GEN stage"), save_chr: bool = typer.Option(False, "--save-chr", help="Save NAS.CHR stage"), stop_after: Optional[str] = typer.Option(None, "--stop-after", help="Stop after stage (dl/none/pot/bnd/nas/gen/chr)"), empty_box_only: bool = typer.Option(False, "--empty-box-only", help="Stop after NONE"), potential_only: bool = typer.Option(False, "--potential-only", help="Stop after POT"), nlfff_only: bool = typer.Option(False, "--nlfff-only", help="Stop after NAS"), generic_only: bool = typer.Option(False, "--generic-only", help="Stop after NAS.GEN"), use_potential: bool = typer.Option(False, "--use-potential", help="Skip NLFFF"), skip_lines: bool = typer.Option(False, "--skip-lines", help="Skip GEN line computation and go directly to CHR"), center_vox: bool = typer.Option(False, "--center-vox", help="Trace lines through voxel centers"), reduce_passed: Optional[int] = typer.Option( None, "--reduce-passed", min=0, max=3, help="Expert line tracing reduction bitmask: 0|1|2|3 (overrides --center-vox)", ), euv: bool = typer.Option(False, "--euv", help="Download AIA EUV maps"), uv: bool = typer.Option(False, "--uv", help="Download AIA UV maps"), sfq: bool = typer.Option(False, "--sfq", help="Use SFQ disambiguation (method=0)"), observer_name: str = typer.Option("earth", "--observer-name", help="Observer identifier stored in output metadata"), fov_xc: Optional[float] = typer.Option(None, "--fov-xc", help="Observer/image FOV center X in arcsec"), fov_yc: Optional[float] = typer.Option(None, "--fov-yc", help="Observer/image FOV center Y in arcsec"), fov_xsize: Optional[float] = typer.Option(None, "--fov-xsize", help="Observer/image FOV width in arcsec"), fov_ysize: Optional[float] = typer.Option(None, "--fov-ysize", help="Observer/image FOV height in arcsec"), square_fov: bool = typer.Option(False, "--square-fov", help="Observer/image FOV is constrained square"), jump2potential: bool = typer.Option(False, "--jump2potential", help="Jump to POT"), jump2bounds: bool = typer.Option(False, "--jump2bounds", help="Jump to BND"), jump2nlfff: bool = typer.Option(False, "--jump2nlfff", help="Jump to NAS"), jump2lines: bool = typer.Option(False, "--jump2lines", help="Jump to GEN"), jump2chromo: bool = typer.Option(False, "--jump2chromo", help="Jump to CHR"), rebuild: bool = typer.Option(False, "--rebuild", help="Recompute from NONE using entry-box parameters"), rebuild_from_none: bool = typer.Option(False, "--rebuild-from-none", help="Start from entry-box NONE-equivalent and run forward"), info: bool = typer.Option(False, "--info", help="Show resolved defaults and exit"), reproject_algorithm: str = typer.Option( "adaptive", "--reproject-algorithm", help="Reprojection algorithm for HMI cutouts onto model base grid: 'adaptive' (default, matches IDL wcs_remap/ssaa behaviour), 'exact' (flux-conserving, slower), or 'interpolation' (bilinear)", ), reproject_scan: Optional[str] = typer.Option( None, "--reproject-scan", help="Batch-run OBS->NONE once per reprojection algorithm. Use 'all' or a comma-separated list such as 'adaptive,exact,interpolation'. Each run writes to a gxmodel-dir/reproject_<algorithm> subdirectory.", ), refmaps_path: Optional[List[str]] = typer.Option( None, "--refmaps-path", "--refmap-path", help="Directory of external FITS reference maps to add to model refmaps. May be repeated.", ), ) -> None: cfg = Fov2BoxConfig( time=time, coords=coords, hpc=hpc, hgc=hgc, hgs=hgs, cea=cea, top=top, box_dims=box_dims, dx_km=dx_km, pad_frac=pad_frac, data_dir=data_dir, gxmodel_dir=gxmodel_dir, nlfff_lib=nlfff_lib, download_backend=download_backend or "drms", drms_sequential=drms_sequential, force_download=force_download, entry_box=entry_box, save_empty_box=save_empty_box, save_potential=save_potential, save_bounds=save_bounds, save_nas=save_nas, save_gen=save_gen, save_chr=save_chr, stop_after=stop_after, empty_box_only=empty_box_only, potential_only=potential_only, nlfff_only=nlfff_only, generic_only=generic_only, use_potential=use_potential, skip_lines=skip_lines, center_vox=center_vox, reduce_passed=reduce_passed, euv=euv, uv=uv, sfq=sfq, observer_name=observer_name, fov_xc=fov_xc, fov_yc=fov_yc, fov_xsize=fov_xsize, fov_ysize=fov_ysize, square_fov=square_fov, jump2potential=jump2potential, jump2bounds=jump2bounds, jump2nlfff=jump2nlfff, jump2lines=jump2lines, jump2chromo=jump2chromo, rebuild=rebuild, rebuild_from_none=rebuild_from_none, info=info, reproject_algorithm=reproject_algorithm, reproject_scan=reproject_scan, refmaps_path=tuple(refmaps_path or ()), ) cfg.download_backend = str(cfg.download_backend or "drms").strip().lower() if cfg.download_backend not in {"fido", "drms"}: raise ValueError("Unsupported --download-backend. Expected 'fido' or 'drms'.") if use_fido: if download_backend is not None and cfg.download_backend != "fido": raise ValueError("Conflicting downloader controls: --use-fido and --download-backend drms.") cfg.download_backend = "fido" cfg.reproject_algorithm = _normalize_reproject_algorithm(cfg.reproject_algorithm) if cfg.reproject_algorithm not in _VALID_REPROJECT_ALGORITHMS: raise ValueError( f"Unsupported --reproject-algorithm '{cfg.reproject_algorithm}'. " f"Choose from: {', '.join(_VALID_REPROJECT_ALGORITHMS)}." ) reproject_scan_algorithms = _parse_reproject_scan(cfg.reproject_scan) if not any([cfg.hpc, cfg.hgc, cfg.hgs]): cfg.hpc = True if cfg.cea and cfg.top: raise ValueError("Select only one projection: --cea or --top") if not cfg.cea and not cfg.top: cfg.cea = True if cfg.square_fov and cfg.fov_xsize is not None: cfg.fov_ysize = cfg.fov_xsize _explicit_observer_fov(cfg) legacy_stop_flags = [ ("none", cfg.empty_box_only, "--empty-box-only"), ("pot", cfg.potential_only, "--potential-only"), ("nas", cfg.nlfff_only, "--nlfff-only"), ("gen", cfg.generic_only, "--generic-only"), ] active_legacy = [item for item in legacy_stop_flags if item[1]] if len(active_legacy) > 1: names = ", ".join(item[2] for item in active_legacy) raise ValueError(f"Conflicting legacy stop flags: {names}. Use only one or use --stop-after.") if active_legacy: legacy_stop = active_legacy[0][0] if cfg.stop_after is not None and _last_stage_tag(cfg.stop_after) != _last_stage_tag(legacy_stop): raise ValueError( f"Conflicting stop controls: --stop-after={cfg.stop_after} and {active_legacy[0][2]}." ) cfg.stop_after = cfg.stop_after or legacy_stop if cfg.info: _print_info(cfg) return if cfg.rebuild_from_none and not cfg.entry_box: raise ValueError("--rebuild-from-none requires --entry-box") if cfg.rebuild and cfg.rebuild_from_none: raise ValueError("Use only one of --rebuild or --rebuild-from-none") jump_flags_set = any([cfg.jump2potential, cfg.jump2bounds, cfg.jump2nlfff, cfg.jump2lines, cfg.jump2chromo]) if jump_flags_set and not cfg.entry_box: print("Warning: jump2* flags are ignored unless --entry-box is provided.") if cfg.rebuild_from_none and jump_flags_set: raise ValueError("--rebuild-from-none cannot be combined with jump2* flags") if cfg.use_potential and (cfg.jump2nlfff or cfg.jump2bounds): raise ValueError("--use-potential cannot be combined with --jump2nlfff or --jump2bounds") if cfg.skip_lines and _last_stage_tag(cfg.stop_after) == "GEN": raise ValueError("--skip-lines cannot be combined with stop-after GEN (or equivalent).") if cfg.skip_lines and cfg.center_vox: print("Warning: --center-vox is ignored when --skip-lines is set.") cfg.center_vox = False entry_loaded: Optional[Dict[str, Any]] = None entry_stage: Optional[str] = None target_stage = "NONE" data_dir_explicit = _flag_explicit_on_cli("--data-dir") gxmodel_dir_explicit = _flag_explicit_on_cli("--gxmodel-dir") if cfg.entry_box: entry_path = Path(cfg.entry_box) if not entry_path.exists(): raise ValueError(f"--entry-box not found: {entry_path}") entry_loaded = _load_entry_box_any(entry_path) entry_stage = _entry_stage_from_loaded(entry_loaded, entry_path) execute_text = _decode_id_text(entry_loaded.get("metadata", {}).get("execute", "")) exec_data_dir, exec_gxmodel_dir = _extract_execute_paths(execute_text) exec_coords, exec_frame, exec_projection = _extract_execute_geometry(execute_text) frame_explicit = any( _flag_explicit_on_cli(flag) for flag in ("--hpc", "--hgc", "--hgs") ) proj_explicit = any(_flag_explicit_on_cli(flag) for flag in ("--cea", "--top")) if exec_data_dir and not data_dir_explicit: cfg.data_dir = exec_data_dir print(f"Using --data-dir from entry metadata/execute: {cfg.data_dir}") _warn_path_default("data_dir", cfg.data_dir) if exec_gxmodel_dir and not gxmodel_dir_explicit: cfg.gxmodel_dir = exec_gxmodel_dir print(f"Using --gxmodel-dir from entry metadata/execute: {cfg.gxmodel_dir}") _warn_path_default("gxmodel_dir", cfg.gxmodel_dir) if cfg.coords is None and exec_coords is not None: cfg.coords = exec_coords print(f"Using --coords from entry metadata/execute: {cfg.coords}") if not frame_explicit and exec_frame in ("hpc", "hgc", "hgs"): cfg.hpc = exec_frame == "hpc" cfg.hgc = exec_frame == "hgc" cfg.hgs = exec_frame == "hgs" if not proj_explicit and exec_projection in ("cea", "top"): cfg.cea = exec_projection == "cea" cfg.top = exec_projection == "top" transition_plan = _plan_transition(cfg, entry_stage) target_stage = transition_plan.target_stage if reproject_scan_algorithms is not None: if target_stage != "NONE": raise ValueError( "--reproject-scan is limited to OBS->NONE generation. " "Use --stop-after none (or equivalent) for reprojection sweeps." ) scan_root = Path(cfg.gxmodel_dir).expanduser().resolve() script_path = Path(__file__).resolve() print( "Running reprojection sweep for NONE generation: " + ", ".join(reproject_scan_algorithms) ) for algorithm in reproject_scan_algorithms: child_cfg = replace( cfg, gxmodel_dir=str(scan_root / f"reproject_{algorithm}"), save_empty_box=True, save_potential=False, save_bounds=False, save_nas=False, save_gen=False, save_chr=False, stop_after="none", empty_box_only=False, potential_only=False, nlfff_only=False, generic_only=False, use_potential=False, skip_lines=False, reproject_algorithm=algorithm, reproject_scan=None, ) child_args = shlex.split(_build_execute_cmd(child_cfg))[1:] child_cmd = [sys.executable, str(script_path), *child_args] print(f"\n[{algorithm}] writing NONE output under {child_cfg.gxmodel_dir}") subprocess.run(child_cmd, check=True) print("Reprojection sweep complete.") return observer_metadata = _observer_metadata_from_entry(entry_loaded, cfg) if entry_loaded is not None else None import time as time_mod import warnings import logging t_start = time_mod.perf_counter() warnings.filterwarnings("ignore", message=".*assume_spherical_screen.*") logging.getLogger("reproject").setLevel(logging.WARNING) logging.getLogger("sunpy").setLevel(logging.WARNING) requested_obs_time, box_dims_resolved, dx_km = _resolve_box_params(cfg) cfg.dx_km = dx_km obs_time = requested_obs_time resume_mode = bool(cfg.entry_box and not cfg.rebuild and entry_loaded is not None) if not resume_mode: if cfg.coords is None: raise ValueError("--coords is required") if sum([cfg.hpc, cfg.hgc, cfg.hgs]) != 1: raise ValueError("Select exactly one of --hpc, --hgc, --hgs") _warn_impossible_save_requests(cfg, target_stage) stage_rank = {"NONE": 0, "POT": 1, "BND": 2, "NAS": 3, "GEN": 4, "CHR": 5} start_rank = stage_rank.get(target_stage, 0) maps = None base_bz_arr = None base_ic_arr = None bottom_bz_data = None vert_current_error = None projection_tag = "CEA" base = None dr3 = None def _start_progress_emitter(label: str, start: float): isatty = bool(getattr(sys.stdout, "isatty", lambda: False)()) if hasattr(os, "fork"): pid = os.fork() if pid == 0: frames = "|/-\\" idx = 0 last_heartbeat = 0.0 try: while True: elapsed = time_mod.perf_counter() - start frame = frames[idx % len(frames)] if isatty: print(f"\r{label}... {frame}", end="", flush=True) elif elapsed - last_heartbeat >= 5.0: print(f"{label}... {elapsed:.1f}s elapsed", flush=True) last_heartbeat = elapsed idx += 1 time_mod.sleep(0.15) finally: os._exit(0) return pid, isatty, None stop_event = threading.Event() def _heartbeat(): frames = "|/-\\" idx = 0 last_heartbeat = 0.0 while not stop_event.wait(0.15): elapsed = time_mod.perf_counter() - start frame = frames[idx % len(frames)] if isatty: print(f"\r{label}... {frame}", end="", flush=True) elif elapsed - last_heartbeat >= 5.0: print(f"{label}... {elapsed:.1f}s elapsed") last_heartbeat = elapsed idx += 1 thread = threading.Thread(target=_heartbeat, daemon=True) thread.start() return None, isatty, (stop_event, thread) def _stop_progress_emitter(pid, thread_state): if pid is not None: try: os.kill(pid, signal.SIGTERM) except ProcessLookupError: pass try: os.waitpid(pid, 0) except ChildProcessError: pass return if thread_state is not None: stop_event, thread = thread_state stop_event.set() thread.join(timeout=0.3) def _run_logged_step(label: str, func): start = time_mod.perf_counter() print(f"{label}...") pid, isatty, thread_state = _start_progress_emitter(label, start) try: result = func() except Exception: elapsed = time_mod.perf_counter() - start _stop_progress_emitter(pid, thread_state) if isatty: print(f"\r{label}... failed after {elapsed:.2f}s") else: print(f"{label} failed after {elapsed:.2f}s") raise elapsed = time_mod.perf_counter() - start _stop_progress_emitter(pid, thread_state) if isatty: print(f"\r{label}... done in {elapsed:.2f}s") else: print(f"{label} done in {elapsed:.2f}s") return result prepared_run = _prepare_run_state( cfg, resume_mode, entry_loaded, entry_stage, target_stage, requested_obs_time, box_dims_resolved, observer_metadata, _run_logged_step, t_start, ) if prepared_run is None: return box_dims_resolved = _effective_box_dims_from_base(box_dims_resolved, prepared_run.base_group) empty_grid = np.zeros((box_dims_resolved[0], box_dims_resolved[1], box_dims_resolved[2]), dtype=float) default_grid = { "dx": float(prepared_run.dr3[0]), "dy": float(prepared_run.dr3[1]), "dz": np.array([float(prepared_run.dr3[2])], dtype=float), } out_dir = _stage_output_dir(cfg.gxmodel_dir, prepared_run.obs_time) execute_cmd = _build_execute_cmd(cfg) produced = [] stage_times = {} class _StageProgress: def __init__(self, label: str): self.label = label self._start = None self._stop_event = threading.Event() self._thread = None self._isatty = bool(getattr(sys.stdout, "isatty", lambda: False)()) self._finished = False def __enter__(self): self._start = time_mod.perf_counter() print(f"{self.label}...") self._pid, self._isatty, self._thread_state = _start_progress_emitter(self.label, self._start) return self def __exit__(self, exc_type, exc, tb): if exc_type is not None and not self._finished: elapsed = time_mod.perf_counter() - (self._start or time_mod.perf_counter()) _stop_progress_emitter(getattr(self, "_pid", None), getattr(self, "_thread_state", None)) if self._isatty: print(f"\r{self.label}... failed after {elapsed:.2f}s") else: print(f"{self.label} failed after {elapsed:.2f}s") return False def finish(self) -> float: elapsed = time_mod.perf_counter() - (self._start or time_mod.perf_counter()) self._finished = True _stop_progress_emitter(getattr(self, "_pid", None), getattr(self, "_thread_state", None)) if self._isatty: print(f"\r{self.label}... done in {elapsed:.2f}s") else: print(f"{self.label} done in {elapsed:.2f}s") return elapsed def finalize() -> None: if produced: print("\nCompleted gx-fov2box. Output files:") for path in produced: print(f"- {path}") if stage_times: print("Stage timings:") for stage, seconds in stage_times.items(): print(f"- {stage}: {seconds:.2f}s") total = time_mod.perf_counter() - t_start print(f"Total elapsed: {total:.2f}s") save_context = StageSaveContext( cfg=cfg, prepared_run=prepared_run, execute_cmd=execute_cmd, out_dir=out_dir, default_grid=default_grid, empty_grid=empty_grid, produced=produced, ) def save_stage( stage_tag: str, stage_box: dict, *, source_axis_order_3d: str = "xyz", chromo_source_axis_order_2d: str = "yx", ) -> dict: return _save_stage_passthrough( stage_tag, stage_box, context=save_context, source_axis_order_3d=source_axis_order_3d, chromo_source_axis_order_2d=chromo_source_axis_order_2d, ) if start_rank <= 0 and (cfg.save_empty_box or cfg.empty_box_only or cfg.stop_after in ("none", "empty", "empty_box")): with _StageProgress("Computing NONE model") as progress: # NONE stage should preserve boundary conditions at z=0 and keep z>0 empty. # Canonical 3D order in H5 is (nz, ny, nx). stage_box = _compute_none_stage_box(prepared_run, box_dims_resolved) stage_box = _normalize_runtime_stage_box_for_pipeline( stage_box, prepared_run=prepared_run, stage_tag="NONE", source_axis_order_3d="xyz", ) from pyampp.io.model import ensure_geometry_contract_in_metadata # Inject geometry_contract before writing NONE stage ensure_geometry_contract_in_metadata(stage_box) save_stage("NONE", stage_box, source_axis_order_3d="xyz") stage_times["NONE"] = progress.finish() if cfg.empty_box_only or _last_stage_tag(cfg.stop_after) == "NONE": finalize() return transition_inputs = _prepare_transition_stage_inputs( prepared_run, transition_plan, entry_stage=entry_stage, box_dims_resolved=box_dims_resolved, ) active_jump = transition_inputs.active_jump entry_lines = transition_inputs.entry_lines goto_lines = transition_inputs.goto_lines goto_chromo = transition_inputs.goto_chromo pot_box = transition_inputs.pot_box bnd_box = transition_inputs.bnd_box nlfff_box = transition_inputs.nlfff_box if not goto_lines: if pot_box is None and bnd_box is None and nlfff_box is None: with _StageProgress("Computing POT model") as progress: pot_box = _compute_pot_stage_box(prepared_run, box_dims_resolved)["corona"] save_stage("POT", {"corona": pot_box}) stage_times["POT"] = progress.finish() if cfg.potential_only or _last_stage_tag(cfg.stop_after) == "POT": finalize() return elif pot_box is not None: with _StageProgress("Preparing POT model from entry data") as progress: save_stage("POT", {"corona": pot_box}) stage_times["POT"] = progress.finish() if cfg.potential_only or _last_stage_tag(cfg.stop_after) == "POT": finalize() return if bnd_box is None and pot_box is not None and not cfg.use_potential: with _StageProgress("Computing BND model") as progress: bnd_box = _compute_bnd_stage_box(prepared_run, pot_box, box_dims_resolved)["corona"] save_stage("BND", {"corona": bnd_box}) stage_times["BND"] = progress.finish() if _last_stage_tag(cfg.stop_after) == "BND": finalize() return elif bnd_box is not None: with _StageProgress("Preparing BND model from entry data") as progress: save_stage("BND", {"corona": bnd_box}) stage_times["BND"] = progress.finish() if _last_stage_tag(cfg.stop_after) == "BND": finalize() return if nlfff_box is None and cfg.use_potential and pot_box is None: pot_box = _compute_pot_stage_box(prepared_run, box_dims_resolved)["corona"] if nlfff_box is None and not cfg.use_potential and bnd_box is None: if pot_box is None: pot_box = _compute_pot_stage_box(prepared_run, box_dims_resolved)["corona"] bnd_box = _compute_bnd_stage_box(prepared_run, pot_box, box_dims_resolved)["corona"] nas_result = _run_nas_stage( cfg, prepared_run, pot_box, bnd_box, nlfff_box, box_dims_resolved, save_stage, finalize, stage_times, _StageProgress, ) nlfff_box = _prepare_stage_corona_payload( nas_result.nlfff_box, source="memory", expected_shape_3d=box_dims_resolved, allowed_model_types=("pot", "nlfff"), ) if nas_result.finalized: return gen_chr_result = _run_gen_chr_stages( cfg, prepared_run, nlfff_box, resume_mode=resume_mode, entry_stage=entry_stage, target_stage=target_stage, goto_chromo=goto_chromo, entry_lines=entry_lines, save_stage=save_stage, finalize=finalize, stage_times=stage_times, stage_progress_cls=_StageProgress, ) if gen_chr_result.finalized: return finalize()
if __name__ == "__main__": app()