#!/usr/bin/env python
# coding: utf-8
import numpy as np
import astropy.io.fits as fits
import astropy.units as u
from astropy.coordinates import SkyCoord
from sunpy.coordinates import frames
from astropy.time import Time
import sunpy.map
from pathlib import Path
import os, sys
import numpy as np
import datetime
import locale
from sunpy.map import all_coordinates_from_map
import h5py
from pyampp.gxbox.boxutils import load_sunpy_map_compat, map_from_data_header_compat
# from .gx_chromo.combo_model import combo_model
os.environ['OMP_NUM_THREADS'] = '16' # number of parallel threads
locale.setlocale(locale.LC_ALL, "C");
[docs]
def cutout2box(_map, center_x, center_y, dx_km, shape):
hmi_wcs = _map.wcs
center_crd = crd = SkyCoord(center_x, center_y, unit=u.arcsec, frame=_map.coordinate_frame) \
.transform_to("heliographic_carrington")
lon = center_crd.lon
lat = center_crd.lat
rad = center_crd.radius
origin = SkyCoord(lon, lat, rad,
frame="heliographic_carrington",
observer="self",
obstime=_map.date)
scale = np.arcsin(dx_km / origin.radius).to(u.deg) / u.pix
scale = u.Quantity((scale, scale))
box_header = sunpy.map.make_fitswcs_header(shape, origin,
projection_code='CEA', scale=scale)
outmap = _map.reproject_to(box_header, algorithm="exact")
return outmap
[docs]
def hmi_b2ptr(map_field, map_inclination, map_azimuth):
sz = map_field.data.shape
ny, nx = sz
field = map_field.data
gamma = np.deg2rad(map_inclination.data)
psi = np.deg2rad(map_azimuth.data)
b_xi = -field * np.sin(gamma) * np.sin(psi)
b_eta = field * np.sin(gamma) * np.cos(psi)
b_zeta = field * np.cos(gamma)
foo = all_coordinates_from_map(map_field).transform_to("heliographic_stonyhurst")
phi = foo.lon
lambda_ = foo.lat
b = np.deg2rad(map_field.fits_header["crlt_obs"])
p = np.deg2rad(-map_field.fits_header["crota2"])
phi, lambda_ = np.deg2rad(phi), np.deg2rad(lambda_)
sinb, cosb = np.sin(b), np.cos(b)
sinp, cosp = np.sin(p), np.cos(p)
sinphi, cosphi = np.sin(phi), np.cos(phi) # nx*ny
sinlam, coslam = np.sin(lambda_), np.cos(lambda_) # nx*ny
k11 = coslam * (sinb * sinp * cosphi + cosp * sinphi) - sinlam * cosb * sinp
k12 = - coslam * (sinb * cosp * cosphi - sinp * sinphi) + sinlam * cosb * cosp
k13 = coslam * cosb * cosphi + sinlam * sinb
k21 = sinlam * (sinb * sinp * cosphi + cosp * sinphi) + coslam * cosb * sinp
k22 = - sinlam * (sinb * cosp * cosphi - sinp * sinphi) - coslam * cosb * cosp
k23 = sinlam * cosb * cosphi - coslam * sinb
k31 = - sinb * sinp * sinphi + cosp * cosphi
k32 = sinb * cosp * sinphi + sinp * cosphi
k33 = - cosb * sinphi
bptr = np.zeros((3, nx, ny))
bptr[0, :, :] = k31 * b_xi + k32 * b_eta + k33 * b_zeta
bptr[1, :, :] = k21 * b_xi + k22 * b_eta + k23 * b_zeta
bptr[2, :, :] = k11 * b_xi + k12 * b_eta + k13 * b_zeta
# Preserve HMI RSUN in WCS metadata for downstream reprojection parity.
header = map_field.meta
map_bp = map_from_data_header_compat(bptr[0, :, :], header)
map_bt = map_from_data_header_compat(bptr[1, :, :], header)
map_br = map_from_data_header_compat(bptr[2, :, :], header)
return map_bp, map_bt, map_br
[docs]
def ampp_field(dl_path, out_model, x, y, dx, dy, dz, res):
"""Creates a model of coronal magnetic fields, including potential, nlfff and thermal corona model
Args:
dl_path (str): Path to the folder where downloaded files (HMI magnetograms) are stored
out_model (str): Path to output HDF5 model file (.h5)
x (float): X coordinate of active region center (arcsec)
y (float): Y coordinate of active region center (arcsec)
dx (int): number of voxels in X direction
dy (int): number of voxels in Y direction
dz (int): number of voxels in Z (vertical, to corona) direction
res (dimensional astropy Quantity)
Returns:
None
"""
input_path = Path(os.path.expanduser(dl_path)).resolve()
if not input_path.exists():
print("no input data")
field_path = list(input_path.glob("*.field.fits"))[0]
incli_path = list(input_path.glob("*.inclination.fits"))[0]
azimu_path = list(input_path.glob("*.azimuth.fits"))[0]
disam_path = list(input_path.glob("*.disambig.fits"))[0]
conti_path = list(input_path.glob("*.continuum.fits"))[0]
losma_path = list(input_path.glob("*.magnetogram.fits"))[0]
# size_pix = f"[{dx}, {dy}, {dz}]"
# centre = f"[{x}, {y}]"
# wcs_rsun = 6.96e8
res_km = res.to(u.km).value
map_field = load_sunpy_map_compat(field_path)
map_inclination = load_sunpy_map_compat(incli_path)
map_azimuth = load_sunpy_map_compat(azimu_path)
map_disambig = load_sunpy_map_compat(disam_path)
map_conti = load_sunpy_map_compat(conti_path)
map_losma = load_sunpy_map_compat(losma_path)
dis = map_disambig.data
map_azimuth.data[:, :] = map_azimuth.data + dis * 180.
map_bp, map_bt, map_br = hmi_b2ptr(map_field, map_inclination, map_azimuth)
box_bx = cutout2box(map_bp, x, y, res_km * u.km, [dy, dx])
box_by = cutout2box(map_bt, x, y, res_km * u.km, [dy, dx])
box_bz = cutout2box(map_br, x, y, res_km * u.km, [dy, dx])
box_by.data[:, :] *= -1
# earth_observer = SkyCoord(0 * u.deg, 0 * u.deg, 0 * u.km, frame=frames.GeocentricEarthEquatorial, observer="earth",
# obstime=box_bx.date)
print("opening file")
out_file = h5py.File(out_model, "w")
bottom = out_file.create_group("bottom_bounds")
for name, array in zip(("bx", "by", "bz"), (box_bx.data, box_by.data, box_bz.data)):
bottom.create_dataset(name, data=array)
out_file.flush()
maglib_lff = mf_lfff()
maglib_lff.set_field(box_bz.data.T)
res1 = maglib_lff.LFFF_cube(dz)
bx_lff, by_lff, bz_lff = [res1[k].transpose((2, 1, 0)) for k in ("bx", "by", "bz")]
bx_lff[0, :, :] = box_bx.data # replace bottom boundary of lff solution with initial boundary conditions
by_lff[0, :, :] = box_by.data
bz_lff[0, :, :] = box_bz.data
potential = out_file.create_group("potential")
for name, array in zip(("bx", "by", "bz"), (bx_lff, by_lff, bz_lff)):
potential.create_dataset(name, data=array)
out_file.flush()
obs_dr = res.to(u.km) / (696000 * u.km) # dimensionless
potential.attrs["obs_dr"] = obs_dr.value
potential.attrs["res_km"] = res_km
maglib.load_cube_vars(bx_lff, by_lff, bz_lff, (obs_dr * sunpy.sun.constants.radius.to(u.cm)).value)
box = maglib.NLFFF()
energy_new = maglib.energy
print('NLFFF energy: ' + str(energy_new) + ' erg')
nlfff = out_file.create_group("nlfff")
for name, array in zip(("bx", "by", "bz"), (box["bx"], box["by"], box["bz"])):
nlfff.create_dataset(name, data=array)
nlfff.attrs["energy_erg"] = energy_new
out_file.flush()
print("Calculating field lines")
# lines = maglib.lines(seeds=None)
base_bz = cutout2box(map_losma, x, y, res_km * u.km, [dy, dx])
base_ic = cutout2box(map_conti, x, y, res_km * u.km, [dy, dx])
bottom.create_dataset("base_bz", data=base_bz.data)
bottom.create_dataset("base_ic", data=base_ic.data)
header_field = map_field.wcs.to_header()
field_frame = box_bx.center.heliographic_carrington.frame
lon, lat = field_frame.lon.value, field_frame.lat.value
obs_time = Time(map_field.date)
dsun_obs = header_field["DSUN_OBS"]
header = {"lon": lon, "lat": lat, "dsun_obs": dsun_obs, "obs_time": str(obs_time.iso)}
bottom.attrs.update(header)
out_file.flush()
# dr3 = [obs_dr.value, obs_dr.value, obs_dr.value]
# chromo_box = combo_model(box, dr3, base_bz.data.T, base_ic.data.T)
#
# chromo_box["avfield"] = lines["av_field"].transpose((1, 2, 0))
# chromo_box["physlength"] = lines["phys_length"].transpose((1, 2, 0)) * dr3[0]
# chromo_box["status"] = lines["voxel_status"].transpose((1, 2, 0))
# out_file.flush()
#
# chromo_ds = out_file.create_group("chromo")
# for k in chromo_box.keys():
# chromo_ds.create_dataset(k, data=chromo_box[k])
# out_file.flush()
out_file.close()