Source code for indica.readers.ppfreader

"""Provides implementation of :py:class:`readers.DataReader` for
reading PPF data produced by JET.

"""

from numbers import Number
from pathlib import Path
import pickle
import stat
from typing import Any
from typing import cast
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Tuple
from typing import Union
import warnings

import numpy as np
from sal.client import SALClient
from sal.core.exception import AuthenticationFailed
from sal.core.exception import NodeNotFound
from sal.dataclass import Signal
import scipy.constants as sc

import indica.readers.surf_los as surf_los
from .abstractreader import CACHE_DIR
from .abstractreader import DataReader
from .abstractreader import DataSelector
from .selectors import choose_on_plot
from .. import session
from ..datatypes import ELEMENTS
from ..numpy_typing import RevisionLike
from ..utilities import to_filename


SURF_PATH = Path(surf_los.__file__).parent / "surf_los.dat"


class PPFError(Exception):
    """An exception which occurs when trying to read PPF data which would
    not be caught by the lower-level SAL library. An example would be
    failing to find any valid channels for an instrument when each channel
    is a separate DTYPE.

    """


class PPFWarning(UserWarning):
    """A warning that occurs while trying to read PPF data. Typically
    related to caching in some way.

    """


[docs]class PPFReader(DataReader): """Class to read JET PPF data using SAL. Parameters ---------- times : np.ndarray An ordered array of times to which data will be downsampled/interpolated. pulse : int The ID number for the pulse from which to get data. uid : str The UID for the particular data to be read. server : str The URL for the SAL server to read data from. default_error : float Relative uncertainty to use for diagnostics which do not provide a value themselves. sess : session.Session An object representing the session being run. Contains information such as provenance data. Attributes ---------- DIAGNOSTIC_QUANTITIES: Dict[str, Dict[str, Dict[str, Dict[str, ArrayType]]]] Hierarchical information on the quantities which are available for reading. These are indexed by (in order) diagnostic name, UID, instrument name, and quantity name. The values of the innermost dictionary describe the physical type of the data to be read. NAMESPACE: Tuple[str, str] The abbreviation and full URL for the PROV namespace of the reader class. """ INSTRUMENT_METHODS = { "hrts": "get_thomson_scattering", "lidr": "get_thomson_scattering", "efit": "get_equilibrium", "eftp": "get_equilibrium", "kk3": "get_cyclotron_emissions", "ks3": "get_bremsstrahlung_spectroscopy", "sxr": "get_radiation", "bolo": "get_radiation", "kg10": "get_thomson_scattering", **{ "cx{}m".format(val): "get_charge_exchange" for val in ("s", "d", "f", "g", "h") }, **{"cx{}6".format(val): "get_charge_exchange" for val in ("s", "d", "f", "g")}, **{"cx{}4".format(val): "get_charge_exchange" for val in ("s", "d", "f", "h")}, } _IMPLEMENTATION_QUANTITIES = { "kg10": {"ne": ("number_density", "electron")}, "sxr": { "h": ("luminous_flux", "sxr"), "t": ("luminous_flux", "sxr"), "v": ("luminous_flux", "sxr"), }, "bolo": { "kb5h": ("luminous_flux", "bolometric"), "kb5v": ("luminous_flux", "bolometric"), }, "ks3": { "zefh": ("effective_charge", "plasma"), "zefv": ("effective_charge", "plasma"), }, **{ "cx{}m".format(val): { "angf": ("angular_freq", "ions"), "ti": ("temperature", "ions"), "conc": ("concentration", "ions"), } for val in ("s", "d", "f", "g", "h") }, **{ "cx{}6".format(val): { "angf": ("angular_freq", "ions"), "ti": ("temperature", "ions"), "conc": ("concentration", "ions"), } for val in ("s", "d", "f", "g") }, **{ "cx{}4".format(val): { "angf": ("angular_freq", "ions"), "ti": ("temperature", "ions"), "conc": ("concentration", "ions"), } for val in ("s", "d", "f", "h") }, } _BREMSSTRAHLUNG_LOS = { "ks3": "edg7", } _RADIATION_RANGES = { "sxr/h": 17, "sxr/t": 35, "sxr/v": 35, "bolo/kb5h": 24, "bolo/kb5v": 32, } _KK3_RANGE = (1, 96) MACHINE_DIMS = ((1.83, 3.9), (-1.75, 2.0)) def __init__( self, pulse: int, tstart: float, tend: float, server: str = "https://sal.jet.uk", default_error: float = 0.05, max_freq: float = 1e6, selector: DataSelector = choose_on_plot, session: session.Session = session.global_session, ): self._reader_cache_id = f"ppf:{server.replace('-', '_')}:{pulse}" self.NAMESPACE: Tuple[str, str] = ("jet", server) super().__init__( tstart, tend, max_freq, session, selector, pulse=pulse, server=server, default_error=default_error, ) self.pulse = pulse self._client = SALClient(server) self._client.prompt_for_password = False self._default_error = default_error
[docs] def get_sal_path( self, uid: str, instrument: str, quantity: str, revision: RevisionLike ) -> str: """Return the path in the PPF database to for the given INSTRUMENT (DDA in JET).""" return ( f"/pulse/{self.pulse:d}/ppf/signal/{uid}/{instrument}/" f"{quantity}:{revision:d}" )
def _get_signal( self, uid: str, instrument: str, quantity: str, revision: RevisionLike ) -> Tuple[Signal, str]: """Gets the signal for the given INSTRUMENT (DDA in JET), at the given revision.""" path = self.get_sal_path(uid, instrument, quantity, revision) info = self._client.list(path) path = self.get_sal_path( uid, instrument, quantity, info.revision_current, ) cache_path = self._sal_path_to_file(path) data = self._read_cached_ppf(cache_path) if data is None: data = self._client.get(path) self._write_cached_ppf(cache_path, data) return data, path def _get_revision( self, uid: str, instrument: str, revision: RevisionLike ) -> RevisionLike: """ Get actual revision that's being read from database, converts relative revision (e.g. 0, latest) to absolute """ info = self._client.list( f"/pulse/{self.pulse:d}/ppf/signal/{uid}/{instrument}:{revision:d}" ) return info.revision_current def _read_cached_ppf(self, path: Path) -> Optional[Signal]: """Check if the PPF data specified by `sal_path` has been cached and, if so, load it. """ if not path.exists(): return None permissions = stat.filemode(path.stat().st_mode) if permissions[5] == "w" or permissions[8] == "w": warnings.warn( "Can not open cache file which is writeable by anyone other than " "the user. (Security risk.)", PPFWarning, ) return None with path.open("rb") as f: try: return pickle.load(f) except pickle.UnpicklingError: warnings.warn( f"Error unpickling cache file {path}. (Possible data corruption.)", PPFWarning, ) return None def _write_cached_ppf(self, path: Path, data: Signal): """Write the given signal, fetched from `sal_path`, to the disk for later reuse. """ path.parent.mkdir(parents=True, exist_ok=True) with path.open("wb") as f: pickle.dump(data, f) path.chmod(0o644) def _sal_path_to_file(self, sal_path: str) -> Path: """Get the file path which would be used to cache data from the given `sal_path`. """ return ( Path.home() / CACHE_DIR / self.__class__.__name__ / to_filename(self._reader_cache_id + sal_path + ".pkl") ) def _get_charge_exchange( self, uid: str, instrument: str, revision: RevisionLike, quantities: Set[str], ) -> Dict[str, Any]: """Return temperature, angular frequency, or concentration data for an ion, measured using charge exchange recombination spectroscopy. """ results = {} R, R_path = self._get_signal(uid, instrument, "rpos", revision) z, z_path = self._get_signal(uid, instrument, "pos", revision) mass, m_path = self._get_signal(uid, instrument, "mass", revision) texp, t_path = self._get_signal(uid, instrument, "texp", revision) atomic_num, atomic_num_path = self._get_signal( uid, instrument, "zqnn", revision ) mass_int = round(mass.data[0]) atomic_num_int = round(atomic_num.data[0]) # We approximate that the positions do not change much in time results["R"] = R.data[0, :] results["z"] = z.data[0, :] results["length"] = R.data.shape[1] results["element"] = [ value[2] for value in ELEMENTS.values() if (value[0] == atomic_num_int and value[1] == mass_int) ][0] results["texp"] = texp.data results["times"] = None paths = [R_path, z_path, m_path, t_path] if "angf" in quantities: angf, a_path = self._get_signal(uid, instrument, "angf", revision) afhi, e_path = self._get_signal(uid, instrument, "afhi", revision) if results["times"] is None: results["times"] = angf.dimensions[0].data results["angf"] = angf.data results["angf_error"] = afhi.data - angf.data results["angf_records"] = paths + [a_path, e_path] if "conc" in quantities: conc, c_path = self._get_signal(uid, instrument, "conc", revision) cohi, e_path = self._get_signal(uid, instrument, "cohi", revision) if results["times"] is None: results["times"] = conc.dimensions[0].data results["conc"] = conc.data results["conc_error"] = cohi.data - conc.data results["conc_records"] = paths + [c_path, e_path] if "ti" in quantities: ti, t_path = self._get_signal(uid, instrument, "ti", revision) tihi, e_path = self._get_signal(uid, instrument, "tihi", revision) if results["times"] is None: results["times"] = ti.dimensions[0].data results["ti"] = ti.data results["ti_error"] = tihi.data - ti.data results["ti_records"] = paths + [t_path, e_path] results["revision"] = self._get_revision(uid, instrument, revision) return results def _get_thomson_scattering( self, uid: str, instrument: str, revision: RevisionLike, quantities: Set[str], ) -> Dict[str, Any]: """Fetch raw data for electron temperature or number density calculated from Thomson scattering. """ results = {} z, z_path = self._get_signal(uid, instrument, "z", revision) results["z"] = z.data results["R"] = z.dimensions[0].data results["length"] = len(z.data) if "te" in quantities: te, t_path = self._get_signal(uid, instrument, "te", revision) self._set_times_item(results, te.dimensions[0].data) results["te"] = te.data if instrument == "lidr": tehi, e_path = self._get_signal(uid, instrument, "teu", revision) results["te_error"] = tehi.data - results["te"] else: dte, e_path = self._get_signal(uid, instrument, "dte", revision) results["te_error"] = dte.data results["te_records"] = [z_path, t_path, e_path] if "ne" in quantities: ne, d_path = self._get_signal(uid, instrument, "ne", revision) self._set_times_item(results, ne.dimensions[0].data) results["ne"] = ne.data if instrument == "lidr": nehi, e_path = self._get_signal(uid, instrument, "neu", revision) results["ne_error"] = nehi.data - results["ne"] elif instrument == "kg10": results["ne_error"] = 0.0 * results["ne"] else: dne, e_path = self._get_signal(uid, instrument, "dne", revision) results["ne_error"] = dne.data results["ne_records"] = [z_path, d_path] if instrument != "kg10": results["ne_records"].append(e_path) results["revision"] = self._get_revision(uid, instrument, revision) return results def _get_equilibrium( self, uid: str, instrument: str, revision: RevisionLike, quantities: Set[str], ) -> Dict[str, Any]: """Fetch raw data for plasma equilibrium.""" results: Dict[str, Any] = {} for q in quantities: qval, q_path = self._get_signal(uid, instrument, q, revision) self._set_times_item(results, qval.dimensions[0].data) if ( len(qval.dimensions) > 1 and q not in {"psi", "rbnd", "zbnd"} and "psin" not in results ): results["psin"] = qval.dimensions[1].data if q == "psi": r, r_path = self._get_signal(uid, instrument, "psir", revision) z, z_path = self._get_signal(uid, instrument, "psiz", revision) results["psi_r"] = r.data results["psi_z"] = z.data results["psi"] = qval.data.reshape( (len(results["times"]), len(z.data), len(r.data)) ) results["psi_records"] = [q_path, r_path, z_path] else: results[q] = qval.data results[q + "_records"] = [q_path] results["revision"] = self._get_revision(uid, instrument, revision) return results def _get_cyclotron_emissions( self, uid: str, instrument: str, revision: RevisionLike, quantities: Set[str], ) -> Dict[str, Any]: """Fetch raw data for electron cyclotron emissin diagnostics.""" _, _, zstart, zend, _, _ = surf_los.read_surf_los( SURF_PATH, self.pulse, instrument.lower() ) assert zstart[0] == zend[0] # gen contains accquisition parameters # e.g. which channels are valid (gen[0,:] > 0) gen, gen_path = self._get_signal(uid, instrument, "gen", revision) channels = np.argwhere(gen.data[0, :] > 0)[:, 0] freq = gen.data[15, channels] * 1e9 nharm = gen.data[11, channels] results: Dict[str, Any] = { "machine_dims": self.MACHINE_DIMS, "z": zstart[0], "length": len(channels), "Btot": 2 * np.pi * freq * sc.m_e / (sc.e * nharm), } bad_channels = np.argwhere( np.logical_or(gen.data[18, channels] == 0.0, gen.data[19, channels] == 0.0) ) results["bad_channels"] = results["Btot"][bad_channels] for q in quantities: records = [SURF_PATH.name, gen_path] data = [] for i in channels: qval, q_path = self._get_signal( uid, instrument, f"{q}{i + 1:02d}", revision ) records.append(q_path) data.append(qval.data) if "times" not in results: results["times"] = qval.dimensions[0].data results[q] = np.array(data).T results[q + "_error"] = self._default_error * results[q] results[q + "_records"] = records results["revision"] = self._get_revision(uid, instrument, revision) return results def _get_radiation( self, uid: str, instrument: str, revision: RevisionLike, quantities: Set[str], ) -> Dict[str, Any]: """Fetch raw data for radiation quantities such as SXR and bolometric fluxes.. """ results: Dict[str, Any] = { "length": {}, "machine_dims": self.MACHINE_DIMS, } for q in quantities: qtime = q + "_times" records = [SURF_PATH.name] if instrument == "bolo": qval, qpath = self._get_signal(uid, instrument, q, revision) records.append(qpath) results["length"][q] = qval.dimensions[1].length results[qtime] = qval.dimensions[0].data results[q] = qval.data channels: Union[List[int], slice] = slice(None, None) else: luminosities = [] channels = [] for i in range(1, self._RADIATION_RANGES[instrument + "/" + q] + 1): try: qval, q_path = self._get_signal( uid, instrument, f"{q}{i:02d}", revision ) except NodeNotFound: continue records.append(q_path) luminosities.append(qval.data) channels.append(i - 1) if qtime not in results: results[qtime] = qval.dimensions[0].data if len(channels) == 0: # TODO: Try getting information on the INSTRUMENT (DDA in JET), # to determine if the failure is actually due to requesting # an invalid INSTRUMENT (DDA in JET) or revision self._client.list( f"/pulse/{self.pulse:d}/ppf/signal/{uid}/{instrument}:" f"{revision:d}" ) raise PPFError(f"No channels available for {instrument}/{q}.") results["length"][q] = len(channels) results[q] = np.array(luminosities).T results[q + "_error"] = self._default_error * results[q] results[q + "_records"] = records xstart, xend, zstart, zend, ystart, yend = surf_los.read_surf_los( SURF_PATH, self.pulse, instrument.lower() + "/" + q.lower() ) results[q + "_xstart"] = xstart[channels] results[q + "_xstop"] = xend[channels] results[q + "_zstart"] = zstart[channels] results[q + "_zstop"] = zend[channels] results[q + "_ystart"] = ystart[channels] results[q + "_ystop"] = yend[channels] results["revision"] = self._get_revision(uid, instrument, revision) return results def _get_bremsstrahlung_spectroscopy( self, uid: str, instrument: str, revision: RevisionLike, quantities: Set[str], ) -> Dict[str, Any]: results: Dict[str, Any] = { "length": {}, "machine_dims": self.MACHINE_DIMS, } los_instrument = self._BREMSSTRAHLUNG_LOS[instrument] for q in quantities: qval, q_path = self._get_signal(uid, instrument, q, revision) los, l_path = self._get_signal(uid, los_instrument, "los" + q[-1], revision) if "times" not in results: results["times"] = qval.dimensions[0].data results["length"][q] = 1 results[q] = qval.data results[q + "_error"] = 0.0 * results[q] results[q + "_xstart"] = np.array([los.data[1] / 1000]) results[q + "_xstop"] = np.array([los.data[4] / 1000]) results[q + "_zstart"] = np.array([los.data[2] / 1000]) results[q + "_zstop"] = np.array([los.data[5] / 1000]) results[q + "_ystart"] = np.zeros_like(results[q + "_xstart"]) results[q + "_ystop"] = np.zeros_like(results[q + "_xstop"]) results[q + "_records"] = [q_path, l_path] results["revision"] = self._get_revision(uid, instrument, revision) return results # def _handle_kk3(self, key: str, revision: RevisionLike) -> DataArray: # """Produce :py:class:`xarray.DataArray` for electron temperature.""" # uid, general_dat = self._get_signal("kk3_gen", revision) # channel_index = np.argwhere(general_dat.data[0, :] > 0) # f_chan = general_dat.data[15, channel_index] # nharm_chan = general_dat.data[11, channel_index] # uids = [uid] # temperatures = [] # Btot = [] # for i, f, nharm in zip(channel_index, f_chan, nharm_chan): # uid, signal = self._get_signal("{}{:02d}".format(key, i), # revision) # uids.append(uid) # temperatures.append(signal.data) # Btot.append(2 * np.pi, f * sc.m_e / (nharm * sc.e)) # uncalibrated = Btot[general_dat.data[18, channel_index] != 0.0] # temps_array = np.array(temperatures) # coords = [("Btot", np.array(Btot)), ("t", signal.dimensions[0].data)] # meta = {"datatype": self.AVALABLE_DATA[key], # "error": DataArray(0.1*temps_array, coords)} # # TODO: Select correct time range # data = DataArray(temps_array, coords, name=key, # attrs=meta) # drop = self._select_channels(uid, data, "Btot", uncalibrated) # data.attrs["provenance"] = self.create_provenance(key, revision, # uids, drop) # return data.drop_sel({"Btot": drop}) def _get_bad_channels( self, uid: str, instrument: str, quantity: str ) -> List[Number]: """Returns a list of channels which are known to be bad for all pulses on this instrument. Typically this would be for reasons of geometriy (e.g., lines of sight facing the diverter). This should be overridden with machine-specific information. Parameters ---------- uid User ID (i.e., which user created this data). instrument Name of the instrument which measured this data. quantities Which physical quantity this data represents. Returns ------- : A list of channels known to be problematic. These will be ignored by default. """ if instrument == "bolo": if quantity == "kb5h": return cast(List[Number], [*range(0, 8), *range(19, 24)]) elif quantity == "kb5v": return cast(List[Number], [*range(0, 1), *range(5, 16), *range(22, 32)]) else: return [] else: return []
[docs] def close(self): """Ends connection to the SAL server from which PPF data is being read.""" del self._client
@property def requires_authentication(self): """Indicates whether authentication is required to read data. Returns ------- : True if authentication is needed, otherwise false. """ return self._client.auth_required
[docs] def authenticate(self, name: str, password: str): """Log onto the JET/SAL system to access data. Parameters ---------- name: Your username when logging onto Heimdall. password: Your single sign-on password. Returns ------- : Indicates whether authentication was succesful. """ try: self._client.authenticate(name, password) return True except AuthenticationFailed: return False