Source code for indica.readers.adas

"""Base class for reading in ADAS atomic data."""

import datetime
from pathlib import Path
import re
from typing import List
from typing import Literal
from typing import TextIO
from typing import Union
from urllib.request import pathname2url
from urllib.request import urlretrieve

import numpy as np
import prov.model as prov
from xarray import DataArray

from .. import session
from ..abstractio import BaseIO
from ..datatypes import ADF11_GENERAL_DATATYPES
from ..datatypes import ADF15_GENERAL_DATATYPES
from ..datatypes import ELEMENTS

# TODO: Evaluate this location
DEFAULT_PATH = Path("")
CACHE_DIR = ".indica"


[docs]class ADASReader(BaseIO): """Class for reading atomic data from ADAS files. Parameters ---------- path: Union[str, Path] Location from which relative paths should be evaluated. Default is to download files from OpenADAS, storing them in your home directory for later use. session: session.Session An object representing the session being run. Contains information such as provenance data. """ def __init__( self, path: Union[str, Path] = DEFAULT_PATH, sess: session.Session = session.global_session, ): path = Path(path) self.session = sess self.openadas = path == DEFAULT_PATH if path == DEFAULT_PATH: self.namespace = "openadas" self.session.prov.add_namespace( self.namespace, "https://open.adas.ac.uk/detail/adf11/" ) self.path = Path.home() / CACHE_DIR / "adas" else: self.path = path self.namespace = "localadas" self.session.prov.add_namespace( self.namespace, "file:/" + str(self.path.resolve()) ) self.prov_id = session.hash_vals(path=self.path) self.agent = self.session.prov.agent(self.prov_id) self.session.prov.delegation(self.session.agent, self.agent) self.entity = self.session.prov.entity( self.prov_id, {"path": str(self.path.resolve())} ) self.session.prov.generation( self.entity, self.session.session, time=datetime.datetime.now() ) self.session.prov.attribution(self.entity, self.session.agent)
[docs] def close(self): """Closes connections to database/files. For this class it does not need to do anything.""" pass
[docs] def get_adf11(self, quantity: str, element: str, year: str) -> DataArray: """Read data from the specified ADAS file. Parameters ---------- quantity The type of data to retrieve. Options: scd, acd, plt, prb, plsx, prsx. element The atomic symbol for the element which will be retrieved. year The two-digit year label for the data. Returns ------- : The data in the specified file. Dimensions are density and temperature. Each members of the dataset correspond to a different charge state. """ now = datetime.datetime.now() file_component = f"{quantity}{year}" filename = Path(file_component) / f"{file_component}_{element.lower()}.dat" with self._get_file("adf11", filename) as f: header = f.readline().split() z = int(header[0]) nd = int(header[1]) nt = int(header[2]) zmin = int(header[3]) - 1 zmax = int(header[4]) - 1 element_name = header[5][1:].lower() f.readline() densities = np.fromfile(f, float, nd, " ") temperatures = np.fromfile(f, float, nt, " ") data = np.empty((zmax - zmin + 1, nt, nd)) date = datetime.date.min for i in range(zmax - zmin + 1): section_header = f.readline() m = re.search(r"Z1=\s*(\d+)", section_header, re.I) assert isinstance(m, re.Match) assert int(m.group(1)) - 1 == zmin + i m = re.search( r"DATE=\s*(\d?\d)[.\-/](\d\d)[.\-/](\d\d)", section_header, re.I ) # assert isinstance(m, re.Match) # TEMP for PLSX/PRSX reading if isinstance(m, re.Match): short_year = int(m.group(3)) parsed_year = short_year + ( 1900 if short_year >= now.year % 100 else 2000 ) new_date = datetime.date( parsed_year, int(m.group(2)), int(m.group(1)) ) else: new_date = datetime.datetime.now().date() if new_date > date: date = new_date data[i, ...] = np.fromfile(f, float, nd * nt, " ").reshape((nt, nd)) gen_type = ADF11_GENERAL_DATATYPES[quantity] spec_type = element_name try: assert ( z == [value[0] for value in ELEMENTS.values() if value[2] == spec_type][0] ) except AssertionError: raise AssertionError( "There is a mismatch between atomic number(z)\ and element name(element_name) imported from the ADF11 file." ) name = f"{spec_type}_{gen_type}" attrs = { "datatype": (gen_type, spec_type), "date": date, "provenance": self.create_provenance(filename, now), "element_symbol": element, "year": year, } return DataArray( 10 ** (data - 6), coords=[ ("ion_charges", np.arange(zmin, zmax + 1, dtype=int)), ("electron_temperature", 10 ** (temperatures)), ("electron_density", 10 ** (densities + 6)), ], name=name, attrs=attrs, )
[docs] def get_adf15( self, element: str, charge: str, filetype: str, year="", ) -> DataArray: """Read data from the specified ADF15 ADAS file. Implementation is capable of reading files with compact and expanded formatting e.g. pec96][ne_pju][ne9.dat and pec40][ar_cl][ar16.dat respectively Parameters ---------- element The atomic symbol for the element which will be retrieved. charge Charge state of the ion (e.g. 16 for Ar 16+), can also include other string for more complicated path (transport_llu][ar15ic.dat setting charge to "15ic") filetype The type of data to retrieve. Options: ic, cl, ca, ls, llu, ... year The two-digit year label for the data. = "transport" if special transport path Returns ------- : The data in the specified file. Dimensions are density and temperature. Each members of the dataset correspond to a different charge state. """ def explicit_reshape(data_to_reshape, nd, nt): data = np.empty((nd, nt)) for id in range(nd): for it in range(nt): data[id, it] = data_to_reshape[id * nt + it] return data def build_file_component(year, element): file_component = "transport" if year != "transport": file_component = f"pec{year}][{element.lower()}" return file_component def file_type(identifier): identifier_dict = { "+": "compact", ":": "expanded", } file_type = identifier_dict.get(identifier) if file_type is None: raise ValueError(f"Unknown file header identified ({identifier}).") return file_type def transition_match(transition_line): transition_type = "orbitals" match = ( r"c\s+(\d+.)" # isel r"\s+(\d+.\d+)" # wavelength r"\s+(\d+)(\(\d\)\d\(.+\d?.\d\))-" # transition upper level r".+(\d+)(\(\d\)\d\(.+\d?.\d\))" # transition lower level ) header_re = re.compile(match) m = header_re.search(transition_line) if not m: transition_type = "n_levels" match = r"c\s+(\d+.)\s+(\d+.\d+)\s+([n]\=.\d+.-.[n]\=.\d+)" header_re = re.compile(match) m = header_re.search(transition_line) if not m: raise ValueError(f"Unknown transition formatting ({identifier}).") return transition_type, match now = datetime.datetime.now() file_component = build_file_component(year, element) filename = Path(pathname2url(file_component)) / pathname2url( f"{file_component}_{filetype.lower()}]" f"[{element.lower()}{charge.lower()}.dat" ) header_match = { "compact": r"(\d+).+/(\S+).*\+(.*)photon", "expanded": r"(\d+).+/(\S+).*\:(.*)photon", } section_header_match = { "compact": r"(\d+.\d+).+\s+(\d+)\s+(\d+).+type\s?" r"=\s?(\S+).+isel.+\s+(\d+)", "expanded": r"(\d+.\d+)\s+(\d+)\s+(\d+).+type\s?=" r"\s?(\S+).+isel\s+?=\s+?(\d+)", } with self._get_file("adf15", filename) as f: header = f.readline().strip().lower() identifier = file_type(header.split("/")[1][2]) match = header_match[identifier] m = re.search(match, header, re.I) assert isinstance(m, re.Match) ntrans = int(m.group(1)) element_name = m.group(2).strip().lower() charge_state = int(m.group(3)) assert element_name == element.lower() m = re.search(r"(\d+)(\S*)", charge) assert isinstance(m, re.Match) extracted_charge = m.group(1) if charge_state != int(extracted_charge): raise ValueError( f"Charge state in ADF15 file ({charge_state}) does not " f"match argument ({charge})." ) # Read first section header to build arrays outside of reading loop match = section_header_match[identifier] header_re = re.compile(match) m = None while not m: line = f.readline().strip().lower() m = header_re.search(line) assert isinstance(m, re.Match) nd = int(m.group(2)) nt = int(m.group(3)) ttype: List[str] = [] tindex = np.empty(ntrans) wavelength = np.empty(ntrans) # Read Photon Emissivity Coefficient rates data = np.empty((ntrans, nd, nt)) for i in range(ntrans): m = header_re.search(line) assert isinstance(m, re.Match) assert int(m.group(5)) - 1 == i tindex[i] = i + 1 ttype.append(m.group(4)) wavelength[i] = float(m.group(1)) # (Angstroms) densities = np.fromfile(f, float, nd, " ") temperatures = np.fromfile(f, float, nt, " ") data_tmp = np.fromfile(f, float, nd * nt, " ") data[i, :, :] = explicit_reshape(data_tmp, nd, nt) line = f.readline().strip().lower() data = np.transpose(np.array(data), (0, 2, 1)) # Read Transition information from end of file file_end_re = re.compile(r"c\s+[isel].+\s+[transition].+\s+[type]") while not file_end_re.search(line): line = f.readline().strip().lower() _ = f.readline() if identifier == "expanded": _ = f.readline() line = f.readline().strip().lower() transition_type, match = transition_match(line) transition_re = re.compile(match) format_transition = { "orbitals": lambda m: f"{m.group(4)}-{m.group(6)}".replace(" ", ""), "n_levels": lambda m: m.group(3).replace(" ", ""), } transition = [] for i in tindex: m = transition_re.search(line) assert isinstance(m, re.Match) assert int(m.group(1)[:-1]) == i transition_tmp = format_transition[transition_type](m) transition.append(transition_tmp) line = f.readline().strip().lower() gen_type = ADF15_GENERAL_DATATYPES[filetype] spec_type = element name = f"{spec_type}_{gen_type}" attrs = { "datatype": (gen_type, spec_type), "provenance": self.create_provenance(filename, now), } coords = [ ("index", tindex), ("electron_temperature", temperatures), # eV ("electron_density", densities * 10**6), # m**-3 ] pecs = DataArray( data * 10**-6, coords=coords, name=name, attrs=attrs, ) # Add extra dimensions attached to index pecs = pecs.assign_coords(wavelength=("index", wavelength)) # (A) pecs = pecs.assign_coords( transition=("index", transition) ) # (2S+1)L(w-1/2)-(2S+1)L(w-1/2) of upper-lower levels, no blank spaces pecs = pecs.assign_coords(type=("index", ttype)) # (excit, recomb, cx) return pecs
[docs] def create_provenance( self, filename: Path, start_time: datetime.datetime ) -> prov.ProvEntity: """Create a provenance entity for the given ADAS file. Note that this method just creates the provenance data appropriate for the arguments it has been provided with. It does not check that these arguments are actually valid and that the provenance corresponds to actually existing data. """ end_time = datetime.datetime.now() entity = self.session.prov.entity( session.hash_vals(filename=filename, start_time=start_time) ) activity = self.session.prov.activity( session.hash_vals(agent=self.prov_id, date=start_time), start_time, end_time, {prov.PROV_TYPE: "ReadData"}, ) self.session.prov.association(activity, self.agent) self.session.prov.association(activity, self.session.agent) self.session.prov.communication(activity, self.session.session) self.session.prov.derivation(entity, f"{self.namespace}:{filename}", activity) self.session.prov.generation(entity, activity, end_time) self.session.prov.attribution(entity, self.agent) self.session.prov.attribution(entity, self.session.agent) return entity
def _get_file(self, dataclass: str, filename: Union[str, Path]) -> TextIO: """Retrieves an ADAS file, downloading it from OpenADAS if necessary. It will cache any downloads for later use. Parameters ---------- dataclass The format of ADAS data in this file (e.g., ADF11). filename Name of the file to get. Returns ------- : A file-like object from which the data can be read. """ filepath = self.path / dataclass / filename if self.openadas and not filepath.exists(): filepath.parent.mkdir(parents=True, exist_ok=True) urlretrieve( f"https://open.adas.ac.uk/download/{dataclass}/{filename}", filepath ) return filepath.open("r") @property def requires_authentication(self) -> Literal[False]: """Reading ADAS data never requires authentication.""" return False