"""Inverts soft X-ray data to estimate the emissivity of the plasma."""
from typing import cast
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
import warnings
import numpy as np
from scipy.integrate import romb
from scipy.interpolate import CubicSpline
from scipy.optimize import least_squares
from xarray import concat
from xarray import DataArray
from xarray import Dataset
from xarray import where
from .abstractoperator import EllipsisType
from .abstractoperator import Operator
from .. import session
from ..converters import bin_to_time_labels
from ..converters import CoordinateTransform
from ..converters import FluxMajorRadCoordinates
from ..converters import FluxSurfaceCoordinates
from ..converters import ImpactParameterCoordinates
from ..datatypes import DataType
from ..datatypes import SpecificDataType
from ..utilities import broadcast_spline
DataArrayCoords = Tuple[DataArray, DataArray]
class EmissivityProfile:
"""Callable class calculating an estimated emissivity profile for the
plasma.
Parameters
----------
symmetric_emissivity : DataArray
Estimate of the profile of the symmetric emissivity at each knot in rho.
asymmetry_parameter : DataArray
Parameter describing asymmetry in the emissivity between high and low
flux surfaces at each knot in rho.
coord_transform : FluxSurfaceCoordinates
The flux coordinate system which should be used.
"""
def __init__(
self,
symmetric_emissivity: DataArray,
asymmetry_parameter: DataArray,
coord_transform: FluxSurfaceCoordinates,
):
self._dim = coord_transform.x1_name
if self._dim not in symmetric_emissivity.dims:
raise ValueError(
"`symmetric_emissivity` must have the same x1 coordinate as "
"`coord_transform`."
)
if self._dim not in asymmetry_parameter.dims:
raise ValueError(
"`symmetric_emissivity` must have the same x1 coordinate as "
"`coord_transform`."
)
self.sym_dims = tuple(d for d in symmetric_emissivity.dims if d != self._dim)
self.sym_coords = {
k: np.asarray(v, dtype=float)
for k, v in symmetric_emissivity.coords.items()
if k != self._dim
}
transpose_order = (self._dim,) + self.sym_dims
self.symmetric_emissivity = CubicSpline(
symmetric_emissivity.coords[self._dim],
symmetric_emissivity.transpose(*transpose_order),
0,
(
(1, np.zeros_like(symmetric_emissivity.isel({self._dim: 0}))),
(2, np.zeros_like(symmetric_emissivity.isel({self._dim: -1}))),
),
False,
)
self.asym_dims = tuple(d for d in asymmetry_parameter.dims if d != self._dim)
self.asym_coords = {
k: np.asarray(v, dtype=float)
for k, v in asymmetry_parameter.coords.items()
if k != self._dim
}
transpose_order = (self._dim,) + self.asym_dims
self.asymmetry_parameter = CubicSpline(
asymmetry_parameter.coords[self._dim],
asymmetry_parameter.transpose(*transpose_order),
0,
(
(2, np.zeros_like(asymmetry_parameter.isel({self._dim: 0}))),
(2, np.zeros_like(asymmetry_parameter.isel({self._dim: -1}))),
),
)
self.transform = FluxMajorRadCoordinates(coord_transform)
time_sym = symmetric_emissivity.coords.get("t", None)
time_asym = asymmetry_parameter.coords.get("t", None)
if (
time_sym is not None
and time_asym is not None
and not time_sym.equals(time_asym)
):
raise ValueError(
"Dimension 't' must match for `symmetric_emissivity` and "
"`asymmetry_parameter`"
)
self.time = time_sym if time_sym is not None else time_asym
def __call__(
self,
coord_system: CoordinateTransform,
x1: DataArray,
x2: DataArray,
t: DataArray,
) -> DataArray:
"""Get the emissivity values at the locations given by the coordinates.
Parameters
----------
coord_system
The transform describing the system used by the provided coordinates
x1
The first spatial coordinate
x2
The second spatial coordinate
t
The time coordinate
"""
rho, R = cast(
DataArrayCoords, coord_system.convert_to(self.transform, x1, x2, t)
)
return self.evaluate(rho, R, t).assign_attrs(transform=coord_system)
def evaluate(
self,
rho: DataArray,
R: DataArray,
t: Optional[DataArray] = None,
R_0: Optional[DataArray] = None,
) -> DataArray:
"""Evaluate the function at a location defined using (R, z) coordinates"""
if t is None:
if "t" in rho.coords:
t = rho.coords["t"]
elif self.time is not None:
t = self.time
elif "t" not in rho.coords and (
"t" in self.sym_coords or "t" in self.asym_coords
):
rho = rho.expand_dims(t=t)
symmetric = broadcast_spline(
self.symmetric_emissivity, self.sym_dims, self.sym_coords, rho
)
asymmetric = broadcast_spline(
self.asymmetry_parameter, self.asym_dims, self.asym_coords, rho
)
if R_0 is None:
R_0 = cast(DataArray, self.transform.equilibrium.R_hfs(rho, t)[0])
result = symmetric * np.exp(asymmetric * (R**2 - R_0**2))
# Ensure round-off error doesn't result in any values below 0
return where(result < 0.0, 0.0, result).fillna(0.0)
[docs]class InvertRadiation(Operator):
"""Estimates the emissivity distribution of the plasma using radiation
data.
Parameters
----------
flux_coordinates : FluxSurfaceCoordinates
The flux surface coordinate system on which to calculate the fit.
datatype : SpecificDataType
The type of radiation data to be inverted.
num_cameras : int
The number of cameras to which the data will be fit.
n_knots : int
The number of spline knots to use when fitting the emissivity data.
n_intervals : int
The number of intervals over which to integrate th eemissivity. Should
be :math:`2^m + 1`, where m is an integer.
sess : session.Session
An object representing the session being run. Contains information
such as provenance data.
"""
def __init__(
self,
num_cameras: int = 1,
datatype: SpecificDataType = "sxr",
n_knots: int = 6,
n_intervals: int = 65,
sess: session.Session = session.global_session,
):
self.n_knots = n_knots
self.n_intervals = n_intervals
self.datatype = datatype
self.num_cameras = num_cameras
self.integral: List[DataArray]
self.last_knot_zero = datatype == "sxr"
# TODO: Update RETURN_TYPES
# TODO: Revise to include R, z, t
self.ARGUMENT_TYPES: List[Union[DataType, EllipsisType]] = [
("time", None),
("luminous_flux", datatype),
...,
]
super().__init__(
sess,
num_cameras=num_cameras,
datatype=datatype,
n_knots=n_knots,
n_intervals=n_intervals,
)
[docs] def return_types(self, *args: DataType) -> Tuple[DataType, ...]:
"""Indicates the datatypes of the results when calling the operator
with arguments of the given types. It is assumed that the
argument types are valid.
Parameters
----------
args
The datatypes of the parameters which the operator is to be called with.
Returns
-------
:
The datatype of each result that will be returned if the operator is
called with these arguments.
"""
radiation = cast(DataType, self.ARGUMENT_TYPES[-1])[1]
result = cast(
Tuple[DataType, ...],
(
("emissivity", radiation),
(
radiation,
{
"symmetric_emissivity": "emissivity",
"asymmetry_parameter": "asymmetry",
},
),
),
) + cast(
Tuple[DataType, ...],
(
(
radiation,
{
"camera": "luminous_flux",
"weights": "weighting",
"back_integral": "luminous_flux",
},
),
),
) * (
len(args) - 3
)
return result
[docs] @staticmethod
def knot_positions(n: int, rho_max: float):
"""Calculates location of knots in magnetic flux coordinates.
Parameters
----------
n
The number of knots needed.
rho_max
The normalised magnetic flux of the final knot location.
"""
knots = np.empty(n)
if float(rho_max) > 1.0:
knots[:] = np.linspace(0, 1.0, n) ** 1.2 * float(rho_max)
else:
knots[0 : n - 1] = np.linspace(0, 1.0, n - 1) ** 1.2 * float(rho_max)
knots[-1] = 1.0
return knots
[docs] def __call__( # type: ignore[override]
self,
times: DataArray,
*cameras: DataArray,
) -> Tuple[EmissivityProfile, Dataset, List[Dataset]]:
"""Calculate the emissivity profile for the plasma.
Parameters
----------
times
Time coordinates on which to calculate emissivity result.
cameras
The luminosity data being fit to, with each camera passed
as a separate argument.
Returns
-------
: DataArray
The fit emissivity, on the R-z grid. Will also contain an
attribute "emissivity_model", which is an
:py:class:`indica.operators.invert_radiation.EmissivityProfile`
object that can interpolate the fit emissivity onto
arbitrary coordinates.
: Dataset
A dataset containing
- **symmetric_emissivity**: The symmetric emissivity
values which were found during the fit, given along
:math:`\\rho`.
- **asymmetry_parameter**: The asymmetry of the emissivity
which was found during the fit, given along :math:`\\rho`.
: Dataset
For each camera passed as an argument, a dataset containing
- **camera**: The radiation data for that camera, binned in time.
- **back_integral**: The integral of the fit emissivity along
the lines of sight of the camera.
- **weights**: The weights assigned to each line of sight of the
camera when fitting emissivity.
"""
self.validate_arguments(times, *cameras)
flux_coords = FluxSurfaceCoordinates("poloidal")
flux_coords.set_equilibrium(cameras[0].attrs["transform"].equilibrium)
n = self.n_knots
binned_cameras = [bin_to_time_labels(times.data, c) for c in cameras]
def knotvals_to_xarray(knotvals):
symmetric_emissivity = DataArray(np.empty(n), coords=[(dim_name, knots)])
symmetric_emissivity[0:m] = knotvals[0:m]
if self.last_knot_zero:
symmetric_emissivity[-1] = 0.0
asymmetry_parameter = DataArray(np.empty(n), coords=[(dim_name, knots)])
asymmetry_parameter[0] = 0.5 * knotvals[m]
asymmetry_parameter[1:-1] = knotvals[m:]
asymmetry_parameter[-1] = 0.5 * knotvals[-1]
return symmetric_emissivity, asymmetry_parameter
def residuals(
knotvals: np.ndarray,
unfolded_cameras: List[Dataset],
) -> np.ndarray:
symmetric_emissivity, asymmetry_parameter = knotvals_to_xarray(knotvals)
estimate = EmissivityProfile(
symmetric_emissivity, asymmetry_parameter, flux_coords
)
start = 0
resid = np.empty(sum(c.attrs["nlos"] for c in unfolded_cameras))
self.integral = []
for c in unfolded_cameras:
end = start + c.attrs["nlos"]
rho, R = c.indica.convert_coords(rho_maj_rad)
rho_min, x2 = c.indica.convert_coords(c.attrs["impact_parameters"])
emissivity_vals = estimate.evaluate(rho, R, R_0=c.coords["R_0"])
axis = emissivity_vals.dims.index(c.attrs["transform"].x2_name)
integral = romb(emissivity_vals, c.attrs["dl"], axis)
resid[start:end] = ((c.camera - integral) / c.weights)[
c["has_data"]
].data
start = end
x1_name = c.attrs["transform"].x1_name
self.integral.append(
DataArray(integral, coords=[(x1_name, c.coords[x1_name].data)])
)
assert np.all(np.isfinite(resid))
return resid
x2 = np.linspace(0.0, 1.0, self.n_intervals)
# TODO: Use aggregate
unfolded_cameras = [
Dataset(
{"camera": bin_to_time_labels(times.data, c)},
{c.attrs["transform"].x2_name: x2},
{"transform": c.attrs["transform"]},
)
for c in binned_cameras
]
rho_maj_rad = FluxMajorRadCoordinates(flux_coords)
rho_max = 0.0
print("Calculating coordinate conversions")
for c in unfolded_cameras:
c["has_data"] = np.logical_not(np.isnan(c.camera.isel(t=0)))
trans = c.attrs["transform"]
dl = trans.distance(
trans.x2_name, DataArray(0), c.coords[trans.x2_name][0:2], 0.0
)[1]
c.attrs["dl"] = dl
c.attrs["nlos"] = int(np.sum(c["has_data"]))
rho, _ = c.indica.convert_coords(rho_maj_rad)
ip_coords = ImpactParameterCoordinates(
c.attrs["transform"], flux_coords, times=times
)
c.attrs["impact_parameters"] = ip_coords
rho_max = max(rho_max, ip_coords.rhomax())
impact_param, _ = c.indica.convert_coords(ip_coords)
c["weights"] = c.camera * (0.02 + 0.18 * np.abs(impact_param))
c["weights"].attrs["transform"] = c.camera.attrs["transform"]
c["weights"].attrs["datatype"] = ("weighting", self.datatype)
c.coords["R_0"] = c.attrs["transform"].equilibrium.R_hfs(
rho, c.coords["t"]
)[0]
knots = self.knot_positions(n, rho_max)
dim_name = "rho_" + flux_coords.flux_kind
symmetric_emissivities: List[DataArray] = []
asymmetry_parameters: List[DataArray] = []
integrals: List[List[DataArray]] = []
m = n - 1 if self.last_knot_zero else n
abel_inversion = np.linspace(3e3, 0.0, m)
guess = np.concatenate((abel_inversion, np.zeros(n - 2)))
bounds = (
np.concatenate((np.zeros(m), np.where(knots[1:-1] > 0.5, 0.0, -0.5))),
np.concatenate((1e12 * np.ones(m), np.ones(n - 2))),
)
for t in np.asarray(times):
print(f"\nSolving for t={t}")
print("------------------\n")
fit = least_squares(
residuals,
guess,
bounds=bounds,
args=([c.sel(t=t) for c in unfolded_cameras],),
verbose=2,
)
if fit.status == -1:
raise RuntimeError(
"Improper input to `least_squares` function when trying to "
"fit emissivity to radiation data."
)
elif fit.status == 0:
warnings.warn(
f"Attempt to fit emissivity to radiation data at time t={t} "
"reached maximum number of function evaluations.",
RuntimeWarning,
)
sym, asym = knotvals_to_xarray(fit.x)
symmetric_emissivities.append(sym)
asymmetry_parameters.append(asym)
integrals.append(self.integral)
guess = fit.x
symmetric_emissivity = concat(symmetric_emissivities, dim=times)
symmetric_emissivity.attrs["transform"] = flux_coords
symmetric_emissivity.attrs["datatype"] = ("emissivity", self.datatype)
asymmetry_parameter = concat(asymmetry_parameters, dim=times)
asymmetry_parameter.attrs["transform"] = flux_coords
asymmetry_parameter.attrs["datatype"] = ("asymmetry", self.datatype)
integral: List[DataArray] = []
for data in zip(*integrals):
integral.append(concat(data, dim=times))
emissivity = EmissivityProfile(
symmetric_emissivity, asymmetry_parameter, flux_coords
)
results: Dataset = Dataset(
{
"symmetric_emissivity": symmetric_emissivity,
"asymmetry_parameter": asymmetry_parameter,
}
)
for c, i in zip(unfolded_cameras, integral):
del c["has_data"]
i.attrs["datatype"] = ("luminous_flux", self.datatype)
i.attrs["transform"] = c.camera.attrs["transform"]
c["back_integral"] = i
self.assign_provenance(c)
self.assign_provenance(results)
return emissivity, results, unfolded_cameras