Source code for indica.operators.extrapolate_impurity_density
from typing import cast
from typing import List
from typing import Optional
from typing import Sequence
from typing import Tuple
from typing import Union
import numpy as np
from scipy.interpolate import UnivariateSpline
from scipy.optimize import least_squares
from scipy.stats import norm
from xarray import concat
from xarray import DataArray
from xarray.core.common import zeros_like
from indica.converters.flux_surfaces import FluxSurfaceCoordinates
from indica.operators.bolometry_derivation import BolometryDerivation
from .abstractoperator import EllipsisType
from .abstractoperator import Operator
from .. import session
from ..datatypes import DataType
from ..utilities import input_check
def asymmetry_from_R_z(
data_R_z: DataArray,
flux_surfaces: FluxSurfaceCoordinates,
rho_arr: DataArray,
threshold_rho: Optional[DataArray] = None,
t_arr: Optional[DataArray] = None,
):
"""Function to calculate an asymmetry parameter from a given density profile in
(R, z, t) coordinates.
Parameters
----------
data_R_z
High-z density profile which is to be used to calculate the asymmetry parameter.
xarray.DataArray with dimensions (R, z, t)
flux_surfaces
FluxSurfaceCoordinates object representing polar coordinate systems
using flux surfaces for the radial coordinate.
rho_arr
1D xarray.DataArray of rho from 0 to 1.
threshold_rho
rho value denoting the cutoff point beyond which soft x-ray diagnostics
are invalid. It's also used in setting the derived asymmetry parameter to be
flat in the invalid region.
xarray.DataArray with dimensions (t)
t_arr
1D xarray.DataArray of t.
Returns
-------
derived_asymmetry_parameter
Derived asymmetry parameter. xarray.DataArray with dimensions (rho, t)
"""
input_check("data_R_z", data_R_z, DataArray, 3, strictly_positive=False)
input_check("flux_surfaces", flux_surfaces, FluxSurfaceCoordinates)
input_check("rho_arr", rho_arr, DataArray, 1, strictly_positive=False)
if threshold_rho is not None:
input_check(
"threshold_rho", threshold_rho, DataArray, 1, strictly_positive=True
)
if t_arr is None:
t_arr = data_R_z.coords["t"]
else:
input_check("t_arr", t_arr, DataArray, 1, strictly_positive=False)
theta_arr_ = np.array([0.0, np.pi], dtype=float)
theta_arr = DataArray(data=theta_arr_, coords={"theta": theta_arr_}, dims=["theta"])
R_deriv, z_deriv = flux_surfaces.convert_to_Rz(rho_arr, theta_arr)
R_deriv = cast(DataArray, R_deriv).interp(t=t_arr, method="linear")
z_deriv = cast(DataArray, z_deriv).interp(t=t_arr, method="linear")
R_deriv = cast(DataArray, R_deriv).transpose("rho_poloidal", "theta", "t")
z_deriv = cast(DataArray, z_deriv).transpose("rho_poloidal", "theta", "t")
data_rho_theta = data_R_z.indica.interp2d(
{"R": R_deriv, "z": z_deriv}, method="linear", assume_sorted=True
)
data_rho_theta = data_rho_theta.transpose("rho_poloidal", "theta", "t")
R_lfs_midplane = cast(DataArray, R_deriv).isel(theta=0) # theta = 0.0
R_hfs_midplane = cast(DataArray, R_deriv).isel(theta=1) # theta = np.pi
derived_asymmetry_parameter = np.log(
data_rho_theta.isel(theta=1) / data_rho_theta.isel(theta=0)
)
derived_asymmetry_parameter /= R_hfs_midplane**2 - R_lfs_midplane**2
# Set constant asymmetry parameter for rho<0.1
derived_asymmetry_parameter = derived_asymmetry_parameter.where(
derived_asymmetry_parameter.coords["rho_poloidal"] > 0.1,
other=derived_asymmetry_parameter.sel({"rho_poloidal": 0.1}, method="nearest"),
)
if threshold_rho is not None:
for ind_t, it in enumerate(threshold_rho.coords["t"]):
derived_asymmetry_parameter.loc[
threshold_rho[ind_t] :, it # type:ignore
] = derived_asymmetry_parameter.loc[threshold_rho[ind_t], it]
return derived_asymmetry_parameter
def asymmetry_from_rho_theta(
data_rho_theta: DataArray,
flux_surfaces: FluxSurfaceCoordinates,
threshold_rho: Optional[DataArray] = None,
t_arr: Optional[DataArray] = None,
):
"""Function to calculate an asymmetry parameter from a given density profile in
(rho_poloidal, theta, t) coordinates.
Parameters
----------
data_rho_theta
High-z density profile which is to be used to calculate the asymmetry parameter.
xarray.DataArray with dimensions (rho_poloidal, theta, t)
flux_surfaces
FluxSurfaceCoordinates object representing polar coordinate systems
using flux surfaces for the radial coordinate.
threshold_rho
rho value denoting the cutoff point beyond which soft x-ray diagnostics
are invalid. It's also used in setting the derived asymmetry parameter to be
flat in the invalid region.
xarray.DataArray with dimensions (t)
t_arr
1D xarray.DataArray of t.
Returns
-------
derived_asymmetry_parameter
Derived asymmetry parameter. xarray.DataArray with dimensions (rho, t)
"""
input_check("data_rho_theta", data_rho_theta, DataArray, 3, strictly_positive=False)
input_check("flux_surfaces", flux_surfaces, FluxSurfaceCoordinates)
if threshold_rho is not None:
input_check(
"threshold_rho", threshold_rho, DataArray, 1, strictly_positive=False
)
if t_arr is None:
t_arr = data_rho_theta.coords["t"]
else:
input_check("t_arr", t_arr, DataArray, 1, strictly_positive=False)
rho_arr = data_rho_theta.coords["rho_poloidal"]
theta_arr_ = np.array([0.0, np.pi], dtype=float)
theta_arr = DataArray(data=theta_arr_, coords={"theta": theta_arr_}, dims=["theta"])
R_deriv, z_deriv = flux_surfaces.convert_to_Rz(rho_arr, theta_arr)
R_deriv = cast(DataArray, R_deriv).interp(t=t_arr, method="linear")
z_deriv = cast(DataArray, z_deriv).interp(t=t_arr, method="linear")
R_deriv = cast(DataArray, R_deriv).transpose("rho_poloidal", "theta", "t")
z_deriv = cast(DataArray, z_deriv).transpose("rho_poloidal", "theta", "t")
R_lfs_midplane = cast(DataArray, R_deriv).isel(theta=0) # theta = 0.0
R_hfs_midplane = cast(DataArray, R_deriv).isel(theta=1) # theta = np.pi
derived_asymmetry_parameter = np.log(
data_rho_theta.interp(theta=np.pi, method="linear")
/ data_rho_theta.interp(theta=0.0, method="linear")
)
# assert for mypy, numpy array predicted
assert isinstance(derived_asymmetry_parameter, DataArray)
derived_asymmetry_parameter /= R_hfs_midplane**2 - R_lfs_midplane**2
# Set constant asymmetry parameter for rho<0.1
derived_asymmetry_parameter = derived_asymmetry_parameter.where(
derived_asymmetry_parameter.coords["rho_poloidal"] > 0.1,
other=derived_asymmetry_parameter.sel({"rho_poloidal": 0.1}, method="nearest"),
)
derived_asymmetry_parameter = derived_asymmetry_parameter.transpose(
"rho_poloidal", "t"
)
if threshold_rho is not None:
threshold_asymmetry = derived_asymmetry_parameter.sel(
rho_poloidal=threshold_rho, method="nearest"
)
derived_asymmetry_parameter = derived_asymmetry_parameter.where(
derived_asymmetry_parameter.rho_poloidal < threshold_rho,
other=threshold_asymmetry,
)
return derived_asymmetry_parameter
def asymmetry_modifier_from_parameter(
asymmetry_parameter: DataArray, R_deriv: DataArray
):
"""
Convert an asymmetry parameter to an asymmetry modifier.
Parameters
----------
asymmetry_parameter
Asymmetry parameter on (rho, t)
R_deriv
Variable describing value of R for every coordinate on a (rho, theta) grid.
xarray.DataArray with dimensions (rho, theta, t)
Returns
-------
asymmetry_modifier
Asymmetry modifier used to transform a low-field side only rho-profile
of a poloidally asymmetric quantity to a full poloidal cross-sectional
profile ie. (rho, t) -> (rho, theta, t). Also can be defined as:
exp(asymmetry_parameter * (R ** 2 - R_lfs ** 2)), where R is the major
radius as a function of (rho, theta, t) and R_lfs is the low-field-side
major radius as a function of (rho, t). xarray DataArray with dimensions
(rho, theta, t)
"""
R_lfs_midplane = cast(DataArray, R_deriv).sel(theta=0, method="nearest")
asymmetry_modifier = np.exp(
asymmetry_parameter * (R_deriv**2 - R_lfs_midplane**2)
)
# assert for mypy, numpy array predicted
assert isinstance(asymmetry_modifier, DataArray)
asymmetry_modifier = asymmetry_modifier.transpose("rho_poloidal", "theta", "t")
return asymmetry_modifier
def recover_threshold_rho(truncation_threshold: float, electron_temperature: DataArray):
"""Recover the rho value for a given electron temperature threshold, as in
at what rho location does the electron temperature drop below the specified
temperature.
Parameters
----------
truncation_threshold
User-specified (float) temperature truncation threshold.
electron_temperature
xarray.DataArray of the electron temperature profile,
with dimensions (rho, t).
Returns
-------
threshold_rho
rho value beyond which the electron temperature falls below
the truncation_threshold.
"""
input_check(
"truncation_threshold",
truncation_threshold,
float,
strictly_positive=True,
)
input_check(
"electron_temperature",
electron_temperature,
DataArray,
2,
strictly_positive=True,
)
try:
assert set(["rho_poloidal"]).issubset(
set(list(electron_temperature.coords.keys()))
)
except AssertionError:
raise AssertionError("Electron temperature must be a profile of rho.")
threshold_temp = electron_temperature.where(
(electron_temperature - truncation_threshold >= 0), drop=True
).min("rho_poloidal")
threshold_rho_ind = electron_temperature.where(
electron_temperature >= threshold_temp, np.inf
).argmin("rho_poloidal")
threshold_rho = electron_temperature.coords["rho_poloidal"].isel(
rho_poloidal=threshold_rho_ind
)
return threshold_rho
[docs]class ExtrapolateImpurityDensity(Operator):
"""Extrapolate the impurity density beyond the limits of SXR (Soft X-ray)
Attributes
----------
ARGUMENT_TYPES: List[DataType]
Ordered list of the types of data expected for each argument of the
operator.
RESULT_TYPES: List[DataType]
Ordered list of the types of data returned by the operator.
Returns
-------
extrapolated_smooth_density_Rz
Extrapolated and smoothed impurity density,
xarray.DataArray with dimensions (R, z, t).
extrapolated_smooth_density_rho_theta
Extrapolated and smoothed impurity density,
xarray.DataArray with dimensions (rho, theta, t).
t
If ``t`` was not specified as an argument for the __call__ function,
return the time the results are given for.
Otherwise return the argument.
"""
ARGUMENT_TYPES: List[Union[DataType, EllipsisType]] = []
def __init__(
self,
sess: session.Session = session.global_session,
):
"""Initialises ExtrapolateImpurityDensity class."""
super().__init__(sess=sess)
self.halfwidthhalfmax_coeff = np.sqrt(2 * np.log(2))
self.fullwidthhalfmax_coeff = 2 * self.halfwidthhalfmax_coeff
[docs] def return_types(self, *args: DataType) -> Tuple[DataType, ...]:
return (
("number_density", "impurity_element"),
("number_density", "impurity_element"),
("time", "impurity_element"),
)
[docs] def transform_to_rho_theta(
self,
data_R_z: DataArray,
flux_surfaces: FluxSurfaceCoordinates,
rho_arr: DataArray,
t_arr: Optional[DataArray] = None,
):
"""Function to transform data from an (R, z) grid to a (rho_poloidal, theta) grid
Parameters
----------
data_R_z
xarray.DataArray to be transformed. Dimensions (R, z, t)
flux_surfaces
FluxSurfaceCoordinates object representing polar coordinate systems
using flux surfaces for the radial coordinate.
rho_arr
1D xarray.DataArray of rho_poloidal from 0 to 1.
t_arr
1D xarray.DataArray of t.
Returns
-------
data_rho_theta
Transformed xarray.DataArray. Dimensions (rho_poloidal, theta, t)
R_deriv
Variable describing value of R in every coordinate on a (rho, theta) grid.
xarray.DataArray with dimensions (rho, theta, t)
z_deriv
Variable describing value of z in every coordinate on a (rho, theta) grid.
xarray.DataArray with dimensions (rho, theta, t)
"""
if t_arr is None:
t_arr = data_R_z.coords["t"]
theta_arr = np.linspace(-np.pi, np.pi, 21)
# mypy doesn't like re-assignments which changes types.
theta_arr = DataArray( # type: ignore
data=theta_arr, coords={"theta": theta_arr}, dims=["theta"]
)
R_deriv, z_deriv = flux_surfaces.convert_to_Rz(rho_arr, theta_arr, t_arr)
R_deriv = cast(DataArray, R_deriv).transpose("rho_poloidal", "theta", "t")
z_deriv = cast(DataArray, z_deriv).transpose("rho_poloidal", "theta", "t")
data_rho_theta = data_R_z.indica.interp2d(
{"R": R_deriv, "z": z_deriv}, method="linear", assume_sorted=True
)
data_rho_theta = data_rho_theta.transpose("rho_poloidal", "theta", "t")
return data_rho_theta, R_deriv, z_deriv
[docs] def basic_extrapolation(
self,
data_rho_theta: DataArray,
electron_density: DataArray,
threshold_rho: float,
):
"""Basic extrapolation which eliminates the data_rho_theta for
rho > threshold_rho and joins on electron density from that point
outwards (in rho). Also multiplies electron density to prevent a
discontinuity at rho_poloidal=threshold_rho.
Parameters
----------
data_rho_theta
xarray.DataArray to extrapolate. Dimensions (rho, theta, t)
electron_density
xarray.DataArray of Electron density (axisymmetric). Dimensions (rho, t)
threshold_rho
Threshold value (float) of rho beyond which SXR diagnostics cannot
be used to accurately infer impurity density.
Returns
-------
extrapolated_impurity_density
xarray.DataArray of extrapolated impurity density.
Dimensions (rho, theta, t)
"""
theta_arr = np.array([0.0, np.pi], dtype=float)
theta_arr = DataArray( # type: ignore
data=theta_arr, coords={"theta": theta_arr}, dims=["theta"]
)
boundary_electron_density = electron_density.sel(
{"rho_poloidal": threshold_rho}
).squeeze()
boundary_data = data_rho_theta.sel({"rho_poloidal": threshold_rho}).squeeze()
discontinuity_scale = boundary_data / boundary_electron_density
# Continue impurity_density_sxr following the shape of the electron density
# profile.
bounded_data = data_rho_theta.where(
data_rho_theta.rho_poloidal <= threshold_rho, 0.0
)
bounded_electron_density = electron_density.where(
electron_density.rho_poloidal > threshold_rho, 0.0
)
extrapolated_impurity_density = (
bounded_data + bounded_electron_density * discontinuity_scale
)
return extrapolated_impurity_density
[docs] def extrapolation_smoothing(
self,
extrapolated_data: DataArray,
rho_arr: DataArray,
):
"""Function to smooth extrapolatd data. Extrapolated data may not have
any 0th order discontinuity but 1st order discontinuities may exist.
Smoothing is necessary to eliminate these higher order discontinuities.
Parameters
----------
extrapolated_data
xarray.DataArray extrapolated data to be smoothed.
Dimensions (rho, theta, t)
rho_arr
xarray.DataArray used to construct smoothing splines. Dimensions (rho)
(Must be higher or the same resolution as the rho dimension
of extrapolated_data)
Returns
-------
extrapolated_smooth_lfs_arr
Extrapolated smoothed data on low-field side (fixed theta = 0)
extrapolated_smooth_hfs_arr
Extrapolated smoothed data on high-field side (fixed theta = pi)
"""
t = extrapolated_data.coords["t"]
extrapolated_smooth_lfs = []
extrapolated_smooth_hfs = []
for ind_t, it in enumerate(extrapolated_data.coords["t"]):
variance_extrapolated_data_lfs = extrapolated_data.isel(
{"t": ind_t, "theta": 0}
).var("rho_poloidal")
variance_extrapolated_data_hfs = extrapolated_data.isel(
{"t": ind_t, "theta": 1}
).var("rho_poloidal")
extrapolated_spline_lfs = UnivariateSpline(
rho_arr,
extrapolated_data.isel(t=ind_t).sel(theta=0),
k=5,
s=0.001 * variance_extrapolated_data_lfs,
)
extrapolated_spline_hfs = UnivariateSpline(
rho_arr,
extrapolated_data.isel(t=ind_t).sel(theta=np.pi),
k=5,
s=0.001 * variance_extrapolated_data_hfs,
)
extrapolated_smooth_lfs.append(extrapolated_spline_lfs(rho_arr, 0))
extrapolated_smooth_hfs.append(extrapolated_spline_hfs(rho_arr, 0))
extrapolated_smooth_lfs_arr = DataArray(
data=extrapolated_smooth_lfs,
coords={"t": t, "rho_poloidal": rho_arr},
dims=["t", "rho_poloidal"],
)
extrapolated_smooth_hfs_arr = DataArray(
data=extrapolated_smooth_hfs,
coords={"t": t, "rho_poloidal": rho_arr},
dims=["t", "rho_poloidal"],
)
extrapolated_smooth_lfs_arr = extrapolated_smooth_lfs_arr.transpose(
"rho_poloidal", "t"
)
extrapolated_smooth_hfs_arr = extrapolated_smooth_hfs_arr.transpose(
"rho_poloidal", "t"
)
# Following section is to ensure that near the rho_poloidal=0 region, the
# extrapolated_smooth_data is constant (ie. with a first-order derivative of 0).
inv_extrapolated_smooth_hfs = DataArray(
data=np.flip(extrapolated_smooth_hfs_arr.data, axis=0),
coords={
"rho_poloidal": -1
* np.flip(extrapolated_smooth_hfs_arr.coords["rho_poloidal"].data),
"t": extrapolated_smooth_hfs_arr.coords["t"].data,
},
dims=["rho_poloidal", "t"],
)
inv_rho_arr = inv_extrapolated_smooth_hfs.coords["rho_poloidal"].data
inv_del_val = inv_rho_arr[-1]
inv_extrapolated_smooth_hfs = inv_extrapolated_smooth_hfs.drop_sel(
rho_poloidal=inv_del_val
)
extrapolated_smooth_mid_plane_arr = concat(
(inv_extrapolated_smooth_hfs, extrapolated_smooth_lfs_arr), "rho_poloidal"
)
rho_zero_ind = np.where(
np.isclose(extrapolated_smooth_mid_plane_arr.rho_poloidal.data, 0.0)
)[0][0]
smooth_central_region = extrapolated_smooth_mid_plane_arr.isel(
rho_poloidal=slice(rho_zero_ind - 2, rho_zero_ind + 3)
)
smooth_central_region.loc[:, :] = smooth_central_region.max(dim="rho_poloidal")
extrapolated_smooth_mid_plane_arr.loc[
extrapolated_smooth_mid_plane_arr.rho_poloidal.data[
rho_zero_ind - 2
] : extrapolated_smooth_mid_plane_arr.rho_poloidal.data[rho_zero_ind + 2],
:,
] = smooth_central_region
inv_extrapolated_smooth_hfs = extrapolated_smooth_mid_plane_arr.isel(
rho_poloidal=slice(0, rho_zero_ind + 1)
)
extrapolated_smooth_hfs_arr = DataArray(
data=np.flip(inv_extrapolated_smooth_hfs.data, axis=0),
coords=extrapolated_smooth_hfs_arr.coords,
dims=extrapolated_smooth_hfs_arr.dims,
)
# Ignoring mypy warning since it seems to be unaware that the xarray .loc
# method uses label-based indexing and slicing instead of integer-based.
extrapolated_smooth_lfs_arr = extrapolated_smooth_mid_plane_arr.loc[
0: # type: ignore
]
return extrapolated_smooth_lfs_arr, extrapolated_smooth_hfs_arr
[docs] def apply_asymmetry(
self,
asymmetry_parameter: DataArray,
extrapolated_smooth_hfs: DataArray,
extrapolated_smooth_lfs: DataArray,
R_deriv: DataArray,
):
"""Applying an asymmetry parameter to low-field-side data which
will be extended over the poloidal extent to obtain an asymmetric
extrapolated smoothed data on a (rho, theta) grid.
Parameters
----------
asymmetry_parameter
Asymmetry parameter to apply.
xarray.DataArray with dimensions (rho, t)
extrapolated_smooth_hfs
Extrapolated smoothed data on high-field side (fixed theta = pi).
xarray.DataArray with dimensions (rho, t)
extrapolated_smooth_lfs
Extrapolated smoothed data on low-field side (fixed theta = 0).
xarray.DataArray with dimensions (rho, t)
R_deriv
Variable describing value of R in every coordinate on a (rho, theta) grid.
xarray.DataArray with dimensions (rho, theta, t)
Returns
-------
extrapolated_smooth_data
Extrapolated and smoothed data on full (rho, theta) grid.
xarray.DataArray with dimensions (rho, theta, t)
asymmetry_modifier
Asymmetry modifier used to transform a low-field side only rho-profile
of a poloidally asymmetric quantity to a full poloidal cross-sectional
profile ie. (rho, t) -> (rho, theta, t). Also can be defined as:
exp(asymmetry_parameter * (R ** 2 - R_lfs ** 2)), where R is the major
radius as a function of (rho, theta, t) and R_lfs is the low-field-side
major radius as a function of (rho, t). xarray DataArray with dimensions
(rho, theta, t)
"""
rho_arr = extrapolated_smooth_hfs.coords["rho_poloidal"]
self.rho_arr = rho_arr
theta_arr = np.linspace(-np.pi, np.pi, 21)
theta_arr = DataArray(
theta_arr, {"theta": theta_arr}, ["theta"]
) # type: ignore
asymmetry_modifier = asymmetry_modifier_from_parameter(
asymmetry_parameter, R_deriv
)
extrapolated_smooth_data = extrapolated_smooth_lfs * asymmetry_modifier
extrapolated_smooth_data = extrapolated_smooth_data.transpose(
"rho_poloidal", "theta", "t"
)
return extrapolated_smooth_data
[docs] def transform_to_R_z(
self,
R_deriv: DataArray,
z_deriv: DataArray,
extrapolated_smooth_data: DataArray,
flux_surfaces: FluxSurfaceCoordinates,
):
"""Function to transform data from an (rho, theta) grid to a (R, z) grid
Parameters
----------
R_deriv
Variable describing value of R in every coordinate on a (rho, theta) grid.
xarray.DataArray with dimensions (rho, theta, t)
(from derive_and_apply_asymmetry)
z_deriv
Variable describing value of z in every coordinate on a (rho, theta) grid.
xarray.DataArray with dimensions (rho, theta, t)
(from derive_and_apply_asymmetry)
extrapolated_smooth_data
Extrapolated and smoothed data to transform onto (R, z) grid.
xarray.DataArray with dimensions (rho, theta, t)
flux_surfaces
FluxSurfaceCoordinates object representing polar coordinate systems
using flux surfaces for the radial coordinate.
Returns
-------
extrapolated_smooth_data
Extrapolated and smoothed data on (R, z) grid.
xarray.DataArray with dimensions (R, z, t)
"""
R_arr = np.linspace(np.min(R_deriv[1:]), np.max(R_deriv[1:]), 40)
z_arr = np.linspace(np.min(z_deriv), np.max(z_deriv), 40)
R_arr = DataArray(R_arr, {"R": R_arr}, ["R"]) # type: ignore
z_arr = DataArray(z_arr, {"z": z_arr}, ["z"]) # type: ignore
t_arr = extrapolated_smooth_data.coords["t"]
rho_derived, theta_derived = flux_surfaces.convert_from_Rz(R_arr, z_arr, t_arr)
rho_derived = cast(DataArray, rho_derived).transpose("R", "z", "t")
theta_derived = cast(DataArray, theta_derived).transpose("R", "z", "t")
rho_derived = abs(rho_derived)
extrapolated_smooth_data = extrapolated_smooth_data.indica.interp2d(
{"rho_poloidal": rho_derived, "theta": theta_derived},
method="linear",
assume_sorted=True,
)
extrapolated_smooth_data = extrapolated_smooth_data.fillna(0.0)
return extrapolated_smooth_data
[docs] def fitting_function(
self,
amplitude: float,
standard_dev: float,
position: float,
):
"""Function to construct a signal that modifies the
extrapolated smoothed impurity density. The signal is constructed
using a Gaussian profile with the three free parameters.
Parameters
----------
amplitude
Amplitude of the additional signal (Gaussian amplitude)
standard_dev
Standard deviation associated with the Gaussian construction
(can be defined as FWHM/2.355 where FWHM is full-width at half maximum)
position
Position of the Gaussian. During optimization this is constrained to
the extrapolated region of rho (ie. outside the SXR validity region).
Returns
-------
sig
xarray.DataArray containing the Gaussian signal with dimensions (rho)
"""
rho_arr = self.rho_arr
gaussian_signal = norm(loc=position, scale=standard_dev)
sig = gaussian_signal.pdf(rho_arr)
sig = DataArray(
data=sig, coords={"rho_poloidal": rho_arr}, dims=["rho_poloidal"]
)
sig /= sig.max()
sig *= amplitude
return sig
[docs] def optimize_perturbation(
self,
extrapolated_smooth_data: DataArray,
orig_bolometry_data: DataArray,
bolometry_obj: BolometryDerivation,
impurity_element: str,
asymmetry_parameter: DataArray,
threshold_rho: DataArray,
R_deriv: DataArray,
time_correlation: bool = True,
):
"""Optimizes a Gaussian-style perturbation to recover the over-density
structure that is expected on the low-field-side of the plasma.
Parameters
----------
extrapolated_smooth_data
Extrapolated and smoothed data which continues the impurity density
beyond the soft x-ray threshold limit by using electron density as a
guide. xarray DataArray with dimensions (rho, theta, t).
orig_bolometry_data
Original bolometry data that is used in the objective function to fit
the perturbation. xarray DataArray with dimensions (channels, t).
bolometry_obj
BolometryDerivation object.
impurity_element
String of impurity element symbol.
asymmetry_parameter
Asymmetry parameter used to transform a low-field side only rho-profile
of a poloidally asymmetric quantity to a full poloidal cross-sectional
profile ie. (rho, t) -> (rho, theta, t). Dimensions (rho, t)
threshold_rho
Threshold rho value beyond which asymmetry parameter is invalid and
should be fitted from bolometry.
R_deriv
Variable describing value of R in every coordinate on a (rho, theta) grid.
xarray.DataArray with dimensions (rho, theta, t)
time_correlation
Boolean to indicate whether or not to use time correlated guesses during
the optimization (ie. the result of the optimization for the previous
time-step is used as a guess for the next time-step.)
Returns
-------
fitted_density
New density with an optimized perturbation on the low-field-side that
matches the original bolometry data. xarray DataArray with dimensions
(rho, theta, t)
"""
input_check(
"extrapolated_smooth_data",
extrapolated_smooth_data,
DataArray,
ndim_to_check=3,
strictly_positive=False,
)
input_check(
"orig_bolometry_data",
orig_bolometry_data,
DataArray,
ndim_to_check=2,
strictly_positive=False,
)
input_check(
"asymmetry_parameter",
asymmetry_parameter,
DataArray,
ndim_to_check=2,
positive=False,
strictly_positive=False,
)
rho_arr = self.rho_arr
drho = np.max(np.diff(rho_arr))
# Check whether bolometry_obj as been called at least once
# (ie. does it have a trimmed variant of the LoS bolometry data.)
if hasattr(bolometry_obj, "LoS_bolometry_data_trimmed"):
trim = True
orig_bolometry_trimmed = []
for bolo_diag, bolo_los in zip(
orig_bolometry_data.coords["bolo_kb5v_coords"],
[i[6] for i in bolometry_obj.LoS_bolometry_data],
):
LoS_bolometry_data_trimmed_labels = [
i[6] for i in bolometry_obj.LoS_bolometry_data_trimmed
]
if bolo_los in LoS_bolometry_data_trimmed_labels:
orig_bolometry_trimmed.append(orig_bolometry_data.loc[:, bolo_diag])
orig_bolometry = concat(
orig_bolometry_trimmed, dim="bolo_kb5v_coords"
).assign_attrs(**orig_bolometry_data.attrs)
else:
trim = False
orig_bolometry = orig_bolometry_data
extrapolated_smooth_data_mean = np.mean(
extrapolated_smooth_data.loc[self.threshold_rho[0] :, :, :]
)
def objective_func(objective_array: Sequence, time: float):
"""Objective function that is passed to scipy.optimize.least_squares
Parameters
----------
objective_array
List of:
[amplitude, standard deviation, position and asymmetry_parameter]
defining the Gaussian perturbation.
time
Float specifying the time point of interest.
Returns
-------
abs_error
Absolute error between the derived bolometry data (with a given
Gaussian perturbation) and the original bolometry data (ground truth)
for the selected time point.
xarray.DataArray with dimensions (channels)
"""
(
amplitude,
standard_dev,
position,
edge_asymmetry_parameter,
) = objective_array
perturbation_signal = self.fitting_function(
amplitude, standard_dev, position
)
# trim perturbation_signal to only be valid within rho = 0.0 and rho = 1.0
perturbation_signal = perturbation_signal.interp(
rho_poloidal=rho_arr, method="linear"
)
# fit asymmetry parameter for region where it was invalid
asymmetry_parameter_cont = asymmetry_parameter.where(
asymmetry_parameter.rho_poloidal < threshold_rho,
other=edge_asymmetry_parameter,
)
asymmetry_modifier = asymmetry_modifier_from_parameter(
asymmetry_parameter_cont, R_deriv
)
perturbation_signal = perturbation_signal * asymmetry_modifier.sel(t=time)
modified_density = (
extrapolated_smooth_data.sel(t=time) + perturbation_signal
)
bolometry_obj.impurity_densities.loc[
impurity_element, :, :, time
] = modified_density
# mypy unable to determine the length of bolometry_args so is getting
# confused about whether t_val is included in bolometry_args or not,
# hence ignored
modified_bolometry_data = bolometry_obj( # type:ignore
deriv_only=True, trim=trim, t_val=time
)
comparison_orig_bolometry_data = orig_bolometry.sel(
t=time, method="nearest", drop=True
)
error = np.abs(
modified_bolometry_data.data - comparison_orig_bolometry_data.data
)
error = np.nan_to_num(error)
return error
fitted_density = zeros_like(bolometry_obj.electron_density)
lower_amp_bound = 0.1 * extrapolated_smooth_data_mean
upper_amp_bound = 1e4 * extrapolated_smooth_data_mean
lower_width_bound = (drho / self.halfwidthhalfmax_coeff) / 3
upper_width_bound = (
(1.1 - self.threshold_rho[0].data) / self.fullwidthhalfmax_coeff
) / 3
lower_pos_bound = self.threshold_rho[0].data + drho
upper_pos_bound = 1.1
initial_guesses = np.array(
[
[
extrapolated_smooth_data_mean,
np.mean([lower_width_bound, upper_width_bound]),
np.mean([lower_pos_bound, upper_pos_bound]),
# asymmetry initial guess is value at threshold
asymmetry_parameter.isel(t=0).sel(
rho_poloidal=threshold_rho, method="ffill"
),
]
]
)
fitting_bounds = [
np.array(
[
lower_amp_bound,
lower_width_bound,
lower_pos_bound,
-10 * np.abs(asymmetry_parameter.max()),
]
),
np.array(
[
upper_amp_bound,
upper_width_bound,
upper_pos_bound,
10 * np.abs(asymmetry_parameter.max()),
]
),
]
# set scale of optimizer steps for amp, width, position and asymmetry
x_scale = np.array(
[
extrapolated_smooth_data_mean,
0.3,
1,
asymmetry_parameter.std(),
]
)
result = least_squares(
fun=objective_func,
x0=initial_guesses[0],
bounds=tuple(fitting_bounds),
max_nfev=50,
args=(extrapolated_smooth_data.coords["t"].data[0],),
ftol=1e-15,
xtol=1e-60,
gtol=1e-60,
x_scale=x_scale,
)
gaussian_params = result.x
if time_correlation:
initial_guesses = np.append(
initial_guesses, np.array([gaussian_params]), axis=0
)
for ind_t, it in enumerate(extrapolated_smooth_data.coords["t"].data[1:]):
upper_width_bound = (
(1.1 - self.threshold_rho[ind_t].data) / self.fullwidthhalfmax_coeff
) / 3
lower_pos_bound = self.threshold_rho[ind_t].data + drho
fitting_bounds[0][2] = lower_pos_bound
fitting_bounds[1][1] = upper_width_bound
if time_correlation:
result = least_squares(
fun=objective_func,
x0=initial_guesses[ind_t],
bounds=tuple(fitting_bounds),
max_nfev=50,
args=(it,),
ftol=1e-60,
xtol=1e-5,
gtol=1e-60,
x_scale=x_scale,
)
gaussian_params = result.x
initial_guesses = np.append(
initial_guesses, np.array([gaussian_params]), axis=0
)
else:
result = least_squares(
fun=objective_func,
x0=initial_guesses[0],
bounds=tuple(fitting_bounds),
max_nfev=15,
args=(it,),
ftol=1e-15,
xtol=1e-60,
gtol=1e-60,
x_scale=x_scale,
)
gaussian_params = result.x
fitted_density = bolometry_obj.impurity_densities.loc[impurity_element, :, :, :]
fitted_density = fitted_density.transpose("rho_poloidal", "theta", "t")
return fitted_density
[docs] def __call__( # type: ignore
self,
impurity_density_sxr: DataArray,
electron_density: DataArray,
electron_temperature: DataArray,
truncation_threshold: float,
flux_surfaces: FluxSurfaceCoordinates,
asymmetry_parameter: Optional[DataArray] = None,
t: Optional[DataArray] = None,
):
"""Extrapolates the impurity density beyond the limits of SXR (Soft X-ray)
Parameters
----------
impurity_density_sxr
xarray.DataArray of impurity density derived from soft X-ray emissivity.
Dimensions (rho_poloidal, theta, t) or (R, z, t)
electron_density
xarray.DataArray of electron density. Dimensions (rho ,t)
electron_temperature
xarray.DataArray of electron temperature. Dimensions (rho, t)
truncation_threshold
Truncation threshold (float) for the electron temperature
flux_surfaces
FluxSurfaceCoordinates object representing polar coordinate systems
using flux surfaces for the radial coordinate.
asymmetry_parameter
Optional, asymmetry parameter (either externally sourced or pre-calculated),
xarray.DataArray with dimensions (rho, t)
t
Optional float, time at which the impurity concentration is to
be calculated at.
Returns
-------
extrapolated_smooth_density_R_z
Extrapolated and smoothed impurity density ((R, z) grid).
xarray.DataArray with dimensions (R, z, t)
extrapolated_smooth_density_rho_theta
Extrapolated and smoothed impurity density ((rho, theta) grid).
xarray.DataArray with dimensions (rho, theta, t).
t
If ``t`` was not specified as an argument for the __call__ function,
return the time the results are given for.
Otherwise return the argument.
"""
input_check(
"impurity_density_sxr",
impurity_density_sxr,
DataArray,
ndim_to_check=3,
strictly_positive=False,
)
input_check(
"electron_density",
electron_density,
DataArray,
ndim_to_check=2,
strictly_positive=False,
)
input_check(
"electron_temperature",
electron_temperature,
DataArray,
strictly_positive=True,
)
input_check(
"truncation_threshold",
truncation_threshold,
float,
strictly_positive=True,
)
input_check("flux_surfaces", flux_surfaces, FluxSurfaceCoordinates)
if t is None:
t = electron_density.t
else:
input_check("t", t, DataArray, strictly_positive=False)
self.threshold_rho = recover_threshold_rho(
truncation_threshold, electron_temperature
)
# Transform impurity_density_sxr to (rho, theta) coordinates
rho_arr = electron_density.coords["rho_poloidal"]
t_arr = t
if set(["R", "z"]).issubset(set(list(impurity_density_sxr.dims))):
(
impurity_density_sxr_rho_theta,
R_deriv,
z_deriv,
) = self.transform_to_rho_theta(
impurity_density_sxr,
flux_surfaces,
rho_arr,
t_arr=t_arr,
)
elif set(["rho_poloidal", "theta"]).issubset(
set(list(impurity_density_sxr.dims))
):
impurity_density_sxr_rho_theta = impurity_density_sxr
R_deriv, z_deriv = flux_surfaces.convert_to_Rz(
impurity_density_sxr_rho_theta.coords["rho_poloidal"],
impurity_density_sxr_rho_theta.coords["theta"],
impurity_density_sxr_rho_theta.coords["t"],
)
else:
raise ValueError(
'Inputted impurity_density_sxr does not have any compatible\
coordinates: ["rho_poloidal", "theta"] or ["R", "z"]'
)
# Continue impurity_density_sxr following the shape of the electron density
# profile and mitigate discontinuity.
extrapolated_impurity_density = self.basic_extrapolation(
impurity_density_sxr_rho_theta, electron_density, self.threshold_rho
)
assert np.all(np.logical_not(np.isnan(extrapolated_impurity_density)))
# Smoothing extrapolated data at the discontinuity.
# (There is still a discontinuity in the radial gradient.)
extrapolated_smooth_lfs, extrapolated_smooth_hfs = self.extrapolation_smoothing(
extrapolated_impurity_density, rho_arr
)
if asymmetry_parameter is None:
asymmetry_parameter = asymmetry_from_rho_theta(
impurity_density_sxr_rho_theta, flux_surfaces, self.threshold_rho, t_arr
)
else:
input_check(
"asymmetry_parameter",
asymmetry_parameter,
DataArray,
ndim_to_check=2,
positive=False,
strictly_positive=False,
)
# Applying the asymmetry parameter to extrapolated density.
# Also extends the data beyond the hfs and lfs to be
# the full poloidal angle range.
extrapolated_smooth_density_rho_theta = self.apply_asymmetry(
asymmetry_parameter,
extrapolated_smooth_hfs,
extrapolated_smooth_lfs,
R_deriv,
)
# Transform extrapolated density back onto a (R, z) grid
extrapolated_smooth_density_R_z = self.transform_to_R_z(
R_deriv, z_deriv, extrapolated_smooth_density_rho_theta, flux_surfaces
)
asymmetry_modifier = asymmetry_modifier_from_parameter(
asymmetry_parameter, R_deriv
)
self.asymmetry_modifier = asymmetry_modifier
return (
extrapolated_smooth_density_R_z,
extrapolated_smooth_density_rho_theta,
t,
)