from __future__ import annotations
from astropy.constants import R_sun
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.time import Time
import astropy.units as u
import numpy as np
from sunpy.coordinates import (
HeliographicStonyhurst,
get_body_heliographic_stonyhurst,
get_earth,
get_horizons_coord,
)
from sunpy.coordinates import sun
_HORIZONS_TARGETS = {
"solar orbiter": "Solar Orbiter",
"stereo-a": "STEREO-A",
"stereo-b": "STEREO-B",
}
_OBSERVER_CACHE: dict[tuple[str, str], SkyCoord | None] = {}
_OBSERVER_STATUS_CACHE: dict[tuple[str, str], tuple[SkyCoord | None, str, str | None]] = {}
_NETWORK_ERROR_MARKERS = (
"temporary failure",
"name or service not known",
"name resolution",
"nodename nor servname",
"network is unreachable",
"failed to establish a new connection",
"max retries exceeded",
"connection refused",
"connection reset",
"connection aborted",
"remote end closed connection",
"timed out",
"timeout",
"ssl",
"proxyerror",
"urlopen error",
"service unavailable",
)
[docs]
def decode_meta_text(value) -> str:
if isinstance(value, (bytes, bytearray)):
return value.decode("utf-8", "ignore")
if isinstance(value, np.ndarray) and value.shape == ():
item = value.item()
if isinstance(item, (bytes, bytearray)):
return item.decode("utf-8", "ignore")
return str(item)
return str(value)
[docs]
def normalize_observer_key(observer_key: str | None) -> str:
raw = observer_key
if isinstance(raw, (bytes, bytearray)):
raw = raw.decode("utf-8", "ignore")
if isinstance(raw, np.ndarray) and raw.shape == ():
raw = raw.item()
if isinstance(raw, (bytes, bytearray)):
raw = raw.decode("utf-8", "ignore")
key = str(raw or "earth").strip().lower()
aliases = {
"custom": "custom",
"sdo": "sdo",
"sdo/aia": "sdo",
"sdo/hmi": "sdo",
"earth": "earth",
"terra": "earth",
"solo": "solar orbiter",
"solar-orbiter": "solar orbiter",
"solarorbiter": "solar orbiter",
"solar orbiter": "solar orbiter",
"stereo a": "stereo-a",
"stereo-a": "stereo-a",
"stereoa": "stereo-a",
"stereo ahead": "stereo-a",
"stereo b": "stereo-b",
"stereo-b": "stereo-b",
"stereob": "stereo-b",
"stereo behind": "stereo-b",
}
return aliases.get(key, key)
def _observer_coord_from_ephemeris(ephemeris: dict, obs_time=None) -> SkyCoord | None:
if not isinstance(ephemeris, dict):
return None
try:
hgln = ephemeris.get("hgln_obs_deg")
hglt = ephemeris.get("hglt_obs_deg")
dsun_cm = ephemeris.get("dsun_cm")
if hgln is None or hglt is None or dsun_cm is None:
return None
when = obs_time if obs_time is not None else ephemeris.get("obs_date", ephemeris.get("obs_time"))
if when is not None and not isinstance(when, Time):
when = Time(when)
return SkyCoord(
lon=float(hgln) * u.deg,
lat=float(hglt) * u.deg,
radius=u.Quantity(float(dsun_cm), u.cm),
frame=HeliographicStonyhurst(obstime=when),
)
except Exception:
return None
def _header_from_text(header_text) -> fits.Header | None:
text = decode_meta_text(header_text)
if not text:
return None
if "\\n" in text and "\n" not in text:
text = text.replace("\\n", "\n")
try:
return fits.Header.fromstring(text, sep="\n")
except Exception:
return None
def _observer_coord_from_header(header: fits.Header | None, obs_time=None) -> SkyCoord | None:
if header is None:
return None
try:
hgln = header.get("HGLN_OBS")
hglt = header.get("HGLT_OBS")
dsun = header.get("DSUN_OBS")
if hgln is None or hglt is None or dsun is None:
return None
when = obs_time or header.get("DATE-OBS") or header.get("DATE_OBS")
if when is not None and not isinstance(when, Time):
when = Time(when)
return SkyCoord(
lon=float(hgln) * u.deg,
lat=float(hglt) * u.deg,
radius=u.Quantity(float(dsun), u.m),
frame=HeliographicStonyhurst(obstime=when),
)
except Exception:
return None
def _cache_key(observer_key: str | None, obs_time: Time | None) -> tuple[str, str]:
key = normalize_observer_key(observer_key)
if obs_time is None:
return key, ""
when = obs_time if isinstance(obs_time, Time) else Time(obs_time)
return key, when.isot
def _classify_observer_exception(exc: Exception) -> str:
text = f"{type(exc).__name__}: {exc}".lower()
if any(marker in text for marker in _NETWORK_ERROR_MARKERS):
return "offline"
return "unavailable"
def _resolve_named_observer_with_status(observer_key: str | None, obs_time: Time):
key = normalize_observer_key(observer_key)
cache_key = _cache_key(key, obs_time)
if cache_key in _OBSERVER_STATUS_CACHE:
return _OBSERVER_STATUS_CACHE[cache_key]
coord = None
status = "unavailable"
detail = None
if key in _HORIZONS_TARGETS:
target = _HORIZONS_TARGETS[key]
try:
coord = get_horizons_coord(target, obs_time)
coord = coord.transform_to(HeliographicStonyhurst(obstime=obs_time))
status = "available"
except Exception as exc:
status = _classify_observer_exception(exc)
detail = str(exc)
else:
try:
coord = get_body_heliographic_stonyhurst(key, obs_time)
status = "available"
except Exception as exc:
status = "unavailable"
detail = str(exc)
_OBSERVER_STATUS_CACHE[cache_key] = (coord, status, detail)
_OBSERVER_CACHE[cache_key] = coord
return coord, status, detail
def _stored_observer_key(b3d: dict) -> str | None:
observer_meta = b3d.get("observer", {}) if isinstance(b3d, dict) else {}
if not isinstance(observer_meta, dict):
return None
fov_box = observer_meta.get("fov_box", {})
if isinstance(fov_box, dict) and fov_box.get("observer_key") is not None:
return normalize_observer_key(fov_box.get("observer_key"))
if observer_meta.get("name") is not None:
return normalize_observer_key(observer_meta.get("name"))
return None
[docs]
def resolve_sdo_observer_from_b3d(b3d: dict, obs_time=None) -> SkyCoord | None:
if not isinstance(b3d, dict):
return None
refmaps = b3d.get("refmaps", {})
if not isinstance(refmaps, dict):
refmaps = {}
for key in ("Bz_reference", "Ic_reference"):
payload = refmaps.get(key)
if not isinstance(payload, dict):
continue
coord = _observer_coord_from_header(_header_from_text(payload.get("wcs_header")), obs_time=obs_time)
if coord is not None:
return coord
return None
[docs]
def resolve_named_observer(observer_key: str | None, obs_time: Time):
coord, status, _detail = _resolve_named_observer_with_status(observer_key, obs_time)
if status == "available":
return coord
return None
[docs]
def probe_observer_availability(b3d: dict, observer_key: str | None, obs_time: Time):
key = normalize_observer_key(observer_key)
if key in {"earth", "custom"}:
return "available", None
if key == "sdo":
if resolve_sdo_observer_from_b3d(b3d, obs_time=obs_time) is not None:
return "available", None
return "unavailable", "Saved/source map metadata does not contain an SDO observer."
_coord, status, detail = _resolve_named_observer_with_status(key, obs_time)
return status, detail
[docs]
def resolve_observer(b3d: dict, observer_key: str | None, obs_time: Time):
coord, _warning, _used_key = resolve_observer_with_info(b3d, observer_key, obs_time)
return coord
[docs]
def resolve_observer_parameters_from_ephemeris(
ephemeris: dict | None,
*,
observer_key: str | None = None,
obs_time=None,
):
"""
Resolve observer geometry and derived solar angles from stored ephemeris cards.
Parameters
----------
ephemeris : dict | None
Expected keys are ``hgln_obs_deg``, ``hglt_obs_deg``, ``dsun_cm`` and optionally
``rsun_cm`` and ``obs_date``.
observer_key : str | None
Fallback observer name used only when the ephemeris cards are incomplete.
obs_time : parseable time or `astropy.time.Time`, optional
Observation time. If omitted, ``ephemeris['obs_date']`` is used.
Returns
-------
dict
Keys:
``observer_coordinate``, ``observer_key``, ``obs_time``, ``b0_deg``, ``l0_deg``,
``p_deg``, ``dsun_cm``, ``rsun_cm``, ``rsun_arcsec``, ``source``.
Notes
-----
``b0_deg`` and ``l0_deg`` follow the legacy GX/IDL ``pb0r`` convention:
they are the observer Stonyhurst latitude and longitude, respectively.
``p_deg`` is currently returned as ``None``. SunPy's public ``sun.P()`` helper is Earth-only,
and a general spacecraft-safe image-north position-angle implementation should be added
separately rather than guessed here.
"""
when = obs_time
if when is None and isinstance(ephemeris, dict):
when = ephemeris.get("obs_date", ephemeris.get("obs_time"))
if when is not None and not isinstance(when, Time):
when = Time(when)
coord = _observer_coord_from_ephemeris(ephemeris or {}, obs_time=when)
source = "ephemeris"
used_key = normalize_observer_key(observer_key)
if coord is None:
if when is None:
return None
coord = resolve_named_observer(observer_key, when)
source = "observer_name"
if coord is None:
return None
try:
hgs = coord.transform_to(HeliographicStonyhurst(obstime=when))
except Exception:
hgs = coord
rsun_cm = None
if isinstance(ephemeris, dict) and ephemeris.get("rsun_cm") is not None:
try:
rsun_cm = float(ephemeris.get("rsun_cm"))
except Exception:
rsun_cm = None
rsun_arcsec = None
if rsun_cm is not None:
try:
ratio = float(rsun_cm) / float(coord.radius.to_value(u.cm))
ratio = float(np.clip(ratio, -1.0, 1.0))
rsun_arcsec = float(np.arcsin(ratio) * u.rad.to(u.arcsec))
except Exception:
rsun_arcsec = None
return {
"observer_coordinate": coord,
"observer_key": used_key,
"obs_time": when,
"b0_deg": float(hgs.lat.to_value(u.deg)),
"l0_deg": float(hgs.lon.to_value(u.deg)),
"p_deg": float(sun.P(when).to_value(u.deg)) if used_key == "earth" and when is not None else None,
"dsun_cm": float(coord.radius.to_value(u.cm)),
"rsun_cm": rsun_cm,
"rsun_arcsec": rsun_arcsec,
"source": source,
}
[docs]
def build_ephemeris_from_pb0r(
*,
b0_deg,
l0_deg,
rsun_arcsec,
obs_date=None,
rsun_cm=None,
):
try:
b0_value = float(b0_deg)
l0_value = float(l0_deg)
rsun_arcsec_value = float(rsun_arcsec)
except Exception:
return None
if not np.isfinite(b0_value) or not np.isfinite(l0_value) or not np.isfinite(rsun_arcsec_value):
return None
if rsun_arcsec_value <= 0:
return None
if rsun_cm is None:
rsun_cm_value = float(R_sun.cgs.value)
else:
try:
rsun_cm_value = float(rsun_cm)
except Exception:
return None
try:
rsun_rad = float((rsun_arcsec_value * u.arcsec).to_value(u.rad))
if not np.isfinite(rsun_rad) or rsun_rad <= 0:
return None
dsun_cm_value = float(rsun_cm_value / np.sin(rsun_rad))
except Exception:
return None
ephemeris = {
"hgln_obs_deg": l0_value,
"hglt_obs_deg": b0_value,
"dsun_cm": dsun_cm_value,
"rsun_cm": rsun_cm_value,
}
if obs_date:
ephemeris["obs_date"] = str(obs_date)
return ephemeris
[docs]
def resolve_observer_with_info(b3d: dict, observer_key: str | None, obs_time: Time):
requested_key = normalize_observer_key(observer_key)
if requested_key == "earth":
return get_earth(obs_time), None, "earth"
if requested_key == "sdo":
sdo_coord = resolve_sdo_observer_from_b3d(b3d, obs_time=obs_time)
if sdo_coord is not None:
return sdo_coord, None, "sdo"
warning = (
"Observer 'sdo' could not be restored from saved/source map metadata; "
"falling back to Earth observer for this session."
)
return get_earth(obs_time), warning, "earth"
coord = resolve_observer_from_metadata(b3d, requested_key, obs_time=obs_time)
if coord is not None:
return coord, None, requested_key
coord = resolve_named_observer(requested_key, obs_time)
if coord is not None:
return coord, None, requested_key
earth_coord = get_earth(obs_time)
warning = (
f"Observer '{requested_key}' could not be restored from saved metadata "
f"and remote ephemeris was unavailable; falling back to Earth observer for this session."
)
return earth_coord, warning, "earth"