Source code for pyampp.sfq.potential

from __future__ import annotations

import time
import numpy as np

from .utils import idl_where, norm_vec, u_grid, u_grid_box, u_str_add


[docs] def get_str_mag(bx: np.ndarray, by: np.ndarray, bz: np.ndarray, apos: np.ndarray, rsun: float) -> dict: bx = np.asarray(bx, dtype=float) by = np.asarray(by, dtype=float) bz = np.asarray(bz, dtype=float) n = bx.shape gr = u_grid(pos=np.asarray(apos, dtype=float), n=n) alf = float(rsun) * np.pi / 180.0 / 3600.0 dsun = 1.0 / np.sin(alf) tx = np.tan(gr["x"] * np.pi / 180.0 / 3600.0) ty = np.tan(gr["y"] * np.pi / 180.0 / 3600.0) txy2 = tx * tx + ty * ty denom = txy2 + 1.0 a = dsun * txy2 / denom b = (dsun * dsun * txy2 - 1.0) / denom det = a * a - b indin = idl_where(det >= 0.0) indout = idl_where(det < 0.0) x = np.zeros_like(gr["x"], dtype=float) if indin[0] != -1: in_mask = det >= 0.0 x[in_mask] = a[in_mask] + np.sqrt(det[in_mask]) if indout[0] != -1: out_mask = det < 0.0 t = np.sqrt(txy2[out_mask]) tan_alf = np.tan(alf) x[out_mask] = tan_alf * dsun * t / (tan_alf * t + 1.0) qs = {"l": np.float32(0.0), "b": np.float32(0.0), "p": np.float32(0.0), "r": np.float32(rsun), "type": "arc"} return { "x0": x.astype(np.float32), "x1": gr["x"].astype(np.float32), "x2": gr["y"].astype(np.float32), "t0": bz.astype(np.float32), "t1": bx.astype(np.float32), "t2": by.astype(np.float32), "pos": np.asarray(apos, dtype=np.float32), "qs": qs, "ts": dict(qs), "indin": indin, "indout": indout, "visible": idl_where(np.ones_like(x, dtype=bool)), "unvisible": np.int64(-1), }
[docs] def pex_bl_(si: dict, sh: bool = False, silent: bool = False, bin_value: int | float | None = None, pot_vmag_func=None) -> dict: if pot_vmag_func is None: raise NotImplementedError("pex_bl_ requires pot_vmag_func.") s = dict(si) sh_i = 1 if sh else 0 bin_i = 0 if bin_value is None else float(bin_value) t0 = np.asarray(s["t0"], dtype=float) nx, ny = t0.shape sx = 256.0 if bin_i == 0 else 256.0 / bin_i sy = 256.0 if bin_i == 0 else 256.0 / bin_i npx = max(int(np.ceil(nx / sx)), 10) npy = max(int(np.ceil(ny / sy)), 10) sx = nx / npx sy = ny / npy indout_global = np.asarray(s.get("indout", np.array([-1], dtype=np.int64))) if indout_global.size > 0 and indout_global[0] != -1: t0f = t0.ravel() t0f[indout_global.astype(np.int64)] = 0.0 t0 = t0f.reshape(t0.shape) s["t0"] = t0 pot = dict(s) pot["t0"] = np.zeros_like(t0) pot = u_str_add(pot, "t1", np.zeros_like(t0)) pot = u_str_add(pot, "t2", np.zeros_like(t0)) ss_base = u_str_add(dict(s), "t1") ss_base = u_str_add(ss_base, "t2") wgt = np.zeros_like(t0) nparts = 2 * (npx + 2) * (npy + 2) oldcomplete = 0 for i in range(-1, npx + 1): for j in range(-1, npy + 1): if not silent: complete = int(np.round(100.0 * float(j + 1 + (i + 1) * (npx + 2) + sh_i * nparts / 2.0) / nparts)) if (complete > oldcomplete) and (complete % 10 == 0): print(f"potential field calculating: {complete} % complete") oldcomplete = complete xst = min(max(int(i * sx + sx / 2.0 * sh_i), 0), nx - 1) xen = min(max(int((i + 1) * sx - 1.0 + sx / 2.0 * sh_i), 0), nx - 1) yst = min(max(int(j * sy + sy / 2.0 * sh_i), 0), ny - 1) yen = min(max(int((j + 1) * sy - 1.0 + sy / 2.0 * sh_i), 0), ny - 1) if xen < xst or yen < yst: continue ss = dict(ss_base) ss = u_str_add(ss, ["t0", "x0", "x1", "x2"], s["t0"][xst:xen + 1, yst:yen + 1], s["x0"][xst:xen + 1, yst:yen + 1], s["x1"][xst:xen + 1, yst:yen + 1], s["x2"][xst:xen + 1, yst:yen + 1]) ss["pos"] = np.array([s["x1"][xst, yst], s["x2"][xst, yst], s["x1"][xen, yen], s["x2"][xen, yen]], dtype=float) dx = (ss["pos"][2] - ss["pos"][0]) / 2.0 or 1.0 dy = (ss["pos"][3] - ss["pos"][1]) / 2.0 or 1.0 xc = (ss["pos"][2] + ss["pos"][0]) / 2.0 yc = (ss["pos"][3] + ss["pos"][1]) / 2.0 wgts = np.exp(-6.0 * ((ss["x1"] - xc) / dx) ** 2 - 6.0 * ((ss["x2"] - yc) / dy) ** 2) radius = float(ss["qs"]["r"]) alf = radius * np.pi / 180.0 / 3600.0 dsun = 1.0 / np.sin(alf) tx = np.tan(ss["x1"] * np.pi / 180.0 / 3600.0) ty = np.tan(ss["x2"] * np.pi / 180.0 / 3600.0) tx2ty2 = tx * tx + ty * ty a = dsun * tx2ty2 / (tx2ty2 + 1.0) b = (dsun * dsun * tx2ty2 - 1.0) / (tx2ty2 + 1.0) det = a * a - b indin = idl_where(det >= 0.0) indout = idl_where(det < 0.0) ss = u_str_add(ss, ["indin", "indout"], indin, indout) if indout[0] != -1: t0_local = ss["t0"].ravel() t0_local[indout.astype(np.int64)] = 0.0 ss["t0"] = t0_local.reshape(ss["t0"].shape) if int(np.sum(np.abs(ss["t0"]) > 0)) != 0 and ss["indin"].size > 1: ss1 = pot_vmag_func(ss, simple=True) if indout[0] != -1 and isinstance(ss1, dict): for key in ("t0", "t1", "t2"): arr = np.asarray(ss1[key], dtype=float).ravel() arr[indout.astype(np.int64)] = 0.0 ss1[key] = arr.reshape(np.asarray(ss1[key]).shape) wgtsf = wgts.ravel() wgtsf[indout.astype(np.int64)] = 0.0 wgts = wgtsf.reshape(wgts.shape) if isinstance(ss1, dict): pot["t0"][xst:xen + 1, yst:yen + 1] = ss1["t0"] pot["t1"][xst:xen + 1, yst:yen + 1] = ss1["t1"] pot["t2"][xst:xen + 1, yst:yen + 1] = ss1["t2"] wgt[xst:xen + 1, yst:yen + 1] = wgts s["t0"] = pot["t0"] s["t1"] = pot["t1"] s["t2"] = pot["t2"] s = u_str_add(s, "wgt", wgt) return s
[docs] def pex_bl(s0: dict, silent: bool = False, pot_vmag_func=None) -> dict: if pot_vmag_func is None: raise NotImplementedError("pex_bl requires pot_vmag_func.") t0 = time.time() if not silent: print("starting precise potential field calculating in parts") s1 = pex_bl_(s0, silent=silent, pot_vmag_func=pot_vmag_func) s2 = pex_bl_(s0, sh=True, silent=silent, pot_vmag_func=pot_vmag_func) wgt = s1["wgt"] + s2["wgt"] tt0 = s1["t0"] * s1["wgt"] + s2["t0"] * s2["wgt"] tt1 = s1["t1"] * s1["wgt"] + s2["t1"] * s2["wgt"] tt2 = s1["t2"] * s1["wgt"] + s2["t2"] * s2["wgt"] ind = idl_where(wgt != 0) if ind[0] != -1: idx = ind.astype(np.int64) fw = wgt.ravel() f0, f1, f2 = tt0.ravel(), tt1.ravel(), tt2.ravel() f0[idx] = f0[idx] / fw[idx] f1[idx] = f1[idx] / fw[idx] f2[idx] = f2[idx] / fw[idx] tt0, tt1, tt2 = f0.reshape(tt0.shape), f1.reshape(tt1.shape), f2.reshape(tt2.shape) if not silent: print(f"Potential field calculation complete in {int(round(time.time() - t0))} seconds") return {"t1": tt1, "t2": tt2}
[docs] def pot_vmag( s: dict, outbpos: dict | None = None, fcenter: int = 1, simple: bool = False, alpha: float = 0.0, normal: bool = True, bpos: dict | None = None, sol_crd_func=None, qs_crd_func=None, a_field_func=None, get_fftplane_func=None, return_outbpos: bool = False, ): if sol_crd_func is None or qs_crd_func is None or a_field_func is None or get_fftplane_func is None: raise NotImplementedError("pot_vmag requires sol_crd_func, qs_crd_func, a_field_func, and get_fftplane_func.") l0, b0, p0 = (0.0, 0.0, 0.0) if normal else (float(s["qs"]["l"]), float(s["qs"]["b"]), float(s["qs"]["p"])) rad = float(s["qs"]["r"]) bpos_local = None if bpos is None else dict(bpos) if fcenter != 0: indin = np.asarray(s.get("indin", np.array([-1], dtype=np.int64))) if indin.size == 0 or indin[0] == -1: raise ValueError("pot_vmag requires non-empty s['indin'] for fcenter != 0.") idx = indin.astype(np.int64) ua = { "x0": np.asarray(s["x0"], dtype=float).ravel()[idx], "x1": np.asarray(s["x1"], dtype=float).ravel()[idx], "x2": np.asarray(s["x2"], dtype=float).ravel()[idx], "qs": {"l": l0, "b": b0, "p": p0, "r": rad, "type": "arc"}, } ub = {"qs": dict(ua["qs"])} ub["qs"]["type"] = "dec" u = sol_crd_func(ua, ub, crd=True) xc, yc, zc = float(np.mean(u["x0"])), float(np.mean(u["x1"])), float(np.mean(u["x2"])) if fcenter == 2: if "t2" in s and "t1" in s: t0m = np.asarray(s["t0"], dtype=float).ravel()[idx] t1m = np.asarray(s["t1"], dtype=float).ravel()[idx] t2m = np.asarray(s["t2"], dtype=float).ravel()[idx] b_mod = np.sqrt(t0m ** 2 + t1m ** 2 + t2m ** 2) else: b_mod = np.abs(np.asarray(s["t0"], dtype=float).ravel()[idx]) denom = float(np.sum(b_mod)) yc = float(np.sum(b_mod * np.asarray(u["x1"], dtype=float)) / denom) if denom else yc zc = float(np.sum(b_mod * np.asarray(u["x2"], dtype=float)) / denom) if denom else zc xc = float(np.sqrt(max(0.0, 1.0 - yc ** 2 - zc ** 2))) ad = qs_crd_func({"x0": xc, "x1": yc, "x2": zc}, l0, b0, p0, inv=True, tosph=True) lc = float(ad["x2"] * 180.0 / np.pi) bc = float(90.0 - ad["x1"] * 180.0 / np.pi) ad = qs_crd_func({"x0": u["x0"], "x1": u["x1"], "x2": u["x2"]}, l0, b0, p0, inv=True) sph = qs_crd_func(ad, lc, bc, 0.0, tosph=True) l = np.asarray(sph["x2"], dtype=float) * 180.0 / np.pi b = 90.0 - np.asarray(sph["x1"], dtype=float) * 180.0 / np.pi hs = np.array([np.max(np.abs(l)), np.max(np.abs(b))], dtype=float) rm = np.sqrt(np.asarray(u["x1"], dtype=float) ** 2 + np.asarray(u["x2"], dtype=float) ** 2) srt = np.argsort(rm)[:2] u0 = np.array([u["x0"][srt[0]], u["x1"][srt[0]], u["x2"][srt[0]]], dtype=float) u1 = np.array([u["x0"][srt[1]], u["x1"][srt[1]], u["x2"][srt[1]]], dtype=float) rob0, rob1 = norm_vec(u0), norm_vec(u1) pixs = float(np.arccos(np.clip(float(np.dot(u0, u1) / (rob0 * rob1)), -1.0, 1.0))) n_vec = np.maximum(np.fix(2.0 * hs * np.pi / 180.0 / pixs).astype(int), 5) hs = (n_vec - 1) * pixs * 180.0 / np.pi / 2.0 bpos_local = {"l": lc, "b": bc, "hs": hs, "n": np.array([n_vec[0], n_vec[1], float(np.min(n_vec)) / 2.0], dtype=float)} if bpos_local is None: raise ValueError("bpos must be provided when fcenter == 0.") if outbpos is not None: outbpos.clear() outbpos.update(bpos_local) n = np.asarray(bpos_local["n"], dtype=float)[:2].astype(int) hs = np.asarray(bpos_local["hs"], dtype=float) grb = u_grid_box(np.array([-hs[0], -hs[1]], dtype=float), np.array([2.0 * hs[0], 2.0 * hs[1]], dtype=float), n) ph = grb["x"] * np.pi / 180.0 th = grb["y"] * np.pi / 180.0 r = np.zeros_like(ph) sa = { "a": {"x0": s["x0"], "x1": s["x1"], "x2": s["x2"], "t0": s["t0"], "qs": {"l": l0, "b": b0, "p": p0, "r": rad, "type": "arc"}, "pos": s.get("pos")}, "b": {"qs": {"l": bpos_local["l"], "b": bpos_local["b"], "p": 0.0, "type": "sbox"}}, "proc": "b_spl", "vec": 0, } q = a_field_func(ph, th, r, set=sa) if isinstance(q, dict) and q.get("err"): return 0 set_obj = {"bl": q["t0"], "spos": {"L": l0, "b": b0, "p": p0}, "bpos": {"l": bpos_local["l"], "b": bpos_local["b"], "hs": hs}, "alfa": alpha} get_fftplane_func(set=set_obj, simple=simple) pos = np.array([-hs[0], -hs[1], hs[0], hs[1]], dtype=float) * np.pi / 180.0 uu = get_fftplane_func(pos, n, 0.0, simple=simple) sa2 = { "a": {"x0": uu["x0"], "x1": uu["x1"], "x2": np.asarray(uu["x0"], dtype=float) * 0.0, "t0": uu["t0"], "t1": uu["t1"], "t2": uu["t2"], "qs": sa["b"]["qs"], "ts": sa["b"]["qs"], "pos": pos}, "b": {"qs": sa["a"]["qs"], "ts": sa["a"]["qs"]}, "proc": "b_spl", "vec": 1, } q1 = a_field_func(s["x0"], s["x1"], s["x2"], set=sa2) s1 = dict(s) s1["t0"] = q1["t0"] s1["t1"] = q1["t1"] s1["t2"] = q1["t2"] s1["type"] = f"vecp {s1['type']}" if "type" in s1 else "vecp" return (s1, bpos_local) if return_outbpos else s1