Source code for mcfine.fitting

import copy
import glob
import inspect
import logging
import multiprocessing as mp
import os
import pickle
import sys
import warnings
from functools import partial

try:
    import tomllib
except ModuleNotFoundError:
    import tomli as tomllib

import astropy.units as u
import emcee
import ndradexhyperfine as ndradex
import numpy as np
from lmfit import minimize, Parameters
from scipy.interpolate import RegularGridInterpolator
from tqdm import tqdm
from threadpoolctl import threadpool_limits

from .line_info import transition_lines, freq_lines, strength_lines, v_lines
from .utils import get_dict_val, check_overwrite, save_fit_dict, load_fit_dict

T_BACKGROUND = 2.73

ALLOWED_FIT_TYPES = [
    'lte',
    'radex',
]

ALLOWED_FIT_METHODS = [
    'mcmc',
    'leastsq',
]

ALLOWED_LINES = [
    'n2hp10',
    'co21',
]

CONFIG_DEFAULT_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'toml', 'config_defaults.toml')
LOCAL_DEFAULT_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'toml', 'local_defaults.toml')

radex_grid = {}

mp.set_start_method('fork')


[docs] def round_nearest(x, a, ): """Round x to the nearest a Args: x: value to round a: value to round to """ return round(round(x / a) * a, -int(np.floor(np.log10(a))))
[docs] def get_nearest_value(data, value, ): """Get the nearest value in a dataset Args: data: array of values to hunt through value: value to find nearest in data to Returns: Nearest value """ # Find the nearest below and above diff = data - value less = np.where(diff <= 0) greater = np.where(diff >= 0) # Get the position and value for above and below. If we're at the edge but somehow this doesn't work, just # go for the lowest or highest values if len(less[0]) == 0: nearest_below = data[0] else: nearest_below_idx = np.argsort(np.abs(diff[less]))[0] nearest_below = data[less][nearest_below_idx] if len(greater[0]) == 0: nearest_above = data[-1] else: nearest_above_idx = np.argsort(diff[greater])[0] nearest_above = data[greater][nearest_above_idx] nearest_value = np.array([nearest_below, nearest_above]) # If we're actually choosing a grid value, just return that singular value if np.diff(nearest_value) == 0: nearest_value = nearest_value[0] return nearest_value
[docs] def get_nearest_values(dataset, keys, values, ): """Function to parallelise get_nearest_value Args: dataset (dict): Dataset to hunt through keys (list): List of keys to search with values (list): List of values to find in the dataset Returns list of the nearest values """ nearest_values_list = [get_nearest_value(dataset[keys[i]].values, values[i]) for i in range(len(values))] return nearest_values_list
[docs] def gaussian(x, amp, centre, width, ): """Evaluate a Gaussian on a 1D grid. Calculates a Gaussian, using :math:`f(x) = A \\exp[-0.5(x - \\mu)^2/\\sigma^2]`. Args: x (np.ndarray): Grid to calculate Gaussian on. amp (float or np.ndarray): Height(s) of curve peak(s), :math:`A`. centre (float or np.ndarray): Peak centre(s), :math:`\\mu`. width (float or np.ndarray): Standard deviation(s), :math:`\\sigma`. Returns: np.ndarray: Gaussian model array """ y = amp * np.exp(- (x - centre) ** 2 / (2 * width ** 2)) return y
[docs] def residual(observed, model, observed_error=None, ): """Calculate standard residual. If errors are provided, then this is the sum of :math:`({\\rm obs}-{\\rm model})/{\\rm error}`. Else the sum of :math:`({\\rm obs}-{\\rm model})/{\\rm model}`. Args: observed (np.ndarray): Observed values. model (np.ndarray): Model values. observed_error (float or np.ndarray): The error in the observed values. Defaults to None. Returns: float: The residual value. """ if observed_error is not None: res = (observed - model) / observed_error else: res = (observed - model) / model return res
[docs] def chi_square(observed, model, observed_error=None, ): """Calculate standard chi-square. If errors are provided, then this is the sum of :math:`({\\rm obs}-{\\rm model})^2/{\\rm error}^2`. Else the sum of :math:`({\\rm obs}-{\\rm model})^2/{\\rm model}^2`. Args: observed (np.ndarray): Observed values. model (np.ndarray): Model values. observed_error (float or np.ndarray): The error in the observed values. Defaults to None. Returns: float: The chi-square value. """ res = residual(observed, model, observed_error) chisq = np.nansum(res ** 2) return chisq
[docs] def hyperfine_structure_lte(t_ex, tau, v_centre, line_width, strength_lines, v_lines, vel, return_hyperfine_components=False, log_tau=True, ): """Create hyperfine intensity profile. Takes line strengths and relative velocity centres, along with excitation temperature and optical depth to produce a hyperfine intensity profile. Args: t_ex (float): Excitation temperature (K). tau (float): Total optical depth of the line. v_centre (float): Central velocity of the strongest component (km/s). strength_lines (np.ndarray): Array of relative line strengths v_lines (np.ndarray): Array of relative velocity shifts for the lines (km/s) vel (np.ndarray): Velocity array (km/s) line_width (float): Width of components (assumed to be the same for each hyperfine component; km/s). return_hyperfine_components (bool): Return the intensity for each hyperfine component. Defaults to False. log_tau (bool): If True, will assume tau is in a logarithmic scale. Else is linear. Defaults to True. Returns: Intensity for each individual hyperfine component (if `return_hyperfine_components` is True), and the total intensity for all components """ if log_tau: strength = np.exp(tau) * strength_lines else: strength = tau * strength_lines tau_components = gaussian(vel[:, np.newaxis], strength, v_lines + v_centre, line_width) total_tau = np.nansum(tau_components, axis=-1) intensity_total = (1 - np.exp(-total_tau)) * (t_ex - T_BACKGROUND) if not return_hyperfine_components: return intensity_total else: intensity_components = (1 - np.exp(-tau_components)) * (t_ex - T_BACKGROUND) return intensity_components, intensity_total
[docs] def hyperfine_structure_radex(t_ex, tau, v_centre, line_width, v_lines, vel, return_hyperfine_components=False, ): """Create hyperfine intensity profile using RADEX. Takes line strengths and relative velocity centres calculated with RADEX, and produces a spectrum. Args: t_ex (float): Excitation temperature (K). tau (float): Total optical depth of the line. v_centre (float): Central velocity of the strongest component (km/s). line_width (float): Width of components (assumed to be the same for each hyperfine component; km/s). v_lines (float): Velocity of the various components (km/s). vel (np.ndarray): Velocity array (km/s) return_hyperfine_components (bool): Return the intensity for each hyperfine component. Defaults to False. Returns: Intensity for each individual hyperfine component (if `return_hyperfine_components` is True), and the total intensity for all components """ tau_components = gaussian(vel[:, np.newaxis], tau, v_lines + v_centre, line_width) intensity_components = (1 - np.exp(-tau_components)) * (t_ex - T_BACKGROUND) intensity_total = np.nansum(intensity_components, axis=-1) if not return_hyperfine_components: return intensity_total else: return intensity_components, intensity_total
[docs] def multiple_components(theta, vel, strength_lines, v_lines, props, n_comp, fit_type='lte', log_tau=True, ): """Sum intensities for multiple lines. Takes `n_comp` distinct lines, and calculates the total intensity of their various hyperfine lines. Args: theta: [t_ex, tau, vel, vel_width] for each component. Should have a length of 4*`n_comp` vel: velocity array strength_lines (np.ndarray): Array for relative line strengths props: List of properties for the line profiles n_comp (int): Number of distinct components to calculate intensities for fit_type (str): Should be one of ALLOWED_FIT_TYPES. Defaults to 'lte' log_tau (bool): Whether to fit tau in log or linear space. Defaults to True (log space) Returns: np.ndarray: The sum of the intensities for all the distinct components. """ prop_len = len(props) if n_comp == 0: return np.zeros_like(vel) if fit_type == 'lte': intensity_model = np.array([hyperfine_structure_lte(*theta[prop_len * i: prop_len * i + prop_len], strength_lines, v_lines, vel, log_tau=log_tau) for i in range(n_comp)]) elif fit_type == 'radex': qn_ul = np.array(range(len(radex_grid['QN_ul'].values))) intensity_model = [get_radex_multiple_components(theta[prop_len * i: prop_len * i + prop_len], vel, v_lines, qn_ul) for i in range(n_comp)] else: raise Warning('fit type %s not understood!' % fit_type) intensity_model = np.sum(intensity_model, axis=0) return intensity_model
[docs] def get_radex_multiple_components(theta, vel, v_lines, qn_ul, ): """Wrapper around RADEX to get out the multiple components""" # Important point here, RADEX uses a square profile so transform the sigma into the right width. Pull out # the subset of data around our values tau, t_ex = radex_grid_interp(theta, qn_ul) intensity_model = hyperfine_structure_radex(t_ex, tau, theta[3], theta[4], v_lines, vel) return intensity_model
[docs] def radex_grid_interp(theta, qn_ul, labels=None, ): """Interpolate generated RADEX grid to get useful values out without weird edge effects Args: theta (list): property values qn_ul (list): Names for the transitions labels (list): Labels for proeprties Returns: tau and t_ex """ if labels is None: labels = ['T_kin', 'N_mol', 'n_H2', 'dv'] nearest_values = get_nearest_values( radex_grid, labels, [theta[0], 10 ** theta[1], 10 ** theta[2], theta[4] * 2.355]) # Pull out grid subset of the nearest values grid_subset = radex_grid.sel(T_kin=nearest_values[0], N_mol=nearest_values[1], n_H2=nearest_values[2], dv=nearest_values[3]) tau_subset = grid_subset['tau'].values t_ex_subset = grid_subset['T_ex'].values # Remove any singular (limit) values limit_vals = [True if type(nearest_value) == np.ndarray else False for nearest_value in nearest_values] theta_coords = np.array([theta[0], 10 ** theta[1], 10 ** theta[2], theta[4] * 2.355]) theta_coords = theta_coords[limit_vals] # Also remove the singular dimensions from the nearest values in the grid nearest_values = np.asarray(nearest_values)[limit_vals] # Construct an array to get values for each point in the grid coords = np.array([(value, *theta_coords) for value in qn_ul]) # Setup grid and fast interpolate grid_coords = (qn_ul, *nearest_values) reg_grid_tau = RegularGridInterpolator(grid_coords, tau_subset) reg_grid_t_ex = RegularGridInterpolator(grid_coords, t_ex_subset) tau = reg_grid_tau(coords) t_ex = reg_grid_t_ex(coords) return tau, t_ex
[docs] def initial_lmfit(params, intensity, intensity_err, vel, strength_lines, v_lines, props, n_comp=1, fit_type='lte', log_tau=True, ): """Get initial guess for MC walkers from lmfit This is the residual calculation for LMFIT. Just to get our initial parameters to instantiate the MC Args: params: List of lmfit parameters intensity: Measured intensity intensity_err: Measured error on intensity vel: Velocity axis strength_lines: Relative line strength v_lines: Relative line velocity props: List of properties for the line profiles n_comp: Number of components to fit. Defaults to 1 fit_type: Type of fit to do. Either 'lte' or 'radex'. Defaults to 'lte' log_tau: Whether to fit tau in log-space (default) or linear space Returns: Residual (chisq) value from the fit """ theta = np.array([params[key].value for key in params]) intensity_model = multiple_components(theta=theta, vel=vel, strength_lines=strength_lines, v_lines=v_lines, props=props, n_comp=n_comp, fit_type=fit_type, log_tau=log_tau, ) residual = (intensity - intensity_model) / intensity_err return residual
[docs] def ln_like(theta, intensity, intensity_err, vel, strength_lines, v_lines, props, n_comp=1, fit_type='lte', log_tau=True, ): """Calculate the (negative) log-likelihood for the model This is the main meat of the MCMC fitting, given the parameters we calculate the likelihood of the model and return that Args: theta: Parameters for the fit intensity: Observed intensity intensity_err: Intensity uncertainty vel: Observed velocity strength_lines: Relative line strength v_lines: Relative line velocities props: List of properties being fit n_comp: Number of components to fit. Defaults to 1 fit_type: Either 'lte' (default) or 'radex' log_tau: Whether to fit tau in log-space (default) or linear space Returns: Negative ln-likelihood for the model. """ intensity_model = multiple_components(theta, vel, strength_lines, v_lines, props, n_comp=n_comp, fit_type=fit_type, log_tau=log_tau) chisq = chi_square(intensity, intensity_model, intensity_err) # # Scale the chisq by sqrt(2[N-P]), following Smith+ and others # p = n_comp * len(props) # n = len(intensity[~np.isnan(intensity)]) # scale_factor = np.sqrt(2 * (n - p)) # chisq /= scale_factor return -0.5 * chisq
[docs] def ln_prob(theta, intensity, intensity_err, vel, strength_lines, v_lines, props, bounds, n_comp=1, fit_type='lte', ): """Calculate the ln-probability for emcee This combines the prior (generally flat) with the ln-likelihood to return to emcee Args: theta: Parameters for the fit intensity: Observed intensity intensity_err: Intensity uncertainty vel: Observed velocity strength_lines: Relative line strength v_lines: Relative line velocities props: List of properties being fit bounds: Bounds on parameters n_comp: Number of components to fit. Defaults to 1 fit_type: Either 'lte' (default) or 'radex' Returns: float: Combined probability (likelihood+prior) for the model. """ lp = ln_prior(theta, vel, props, bounds, n_comp=n_comp) if not np.isfinite(lp): return -np.inf like = ln_like(theta, intensity, intensity_err, vel, strength_lines, v_lines, props, n_comp=n_comp, fit_type=fit_type) return lp + like
[docs] def ln_prior(theta, vel, props, bounds, n_comp=1, ): """Apply flat priors to the emcee fitting This also ensures any velocity components are strictly increasing, to avoid degeneracies there Args: theta: Parameters for the fit vel: Observed velocity props: List of properties being fit bounds: Bounds on parameters n_comp: Number of components to fit. Defaults to 1 Returns: 0 if within bounds, -infinity otherwise """ prop_len = len(props) for prop in range(prop_len): values = np.array([theta[prop_len * i + prop] for i in range(n_comp)]) if not np.logical_and(bounds[prop][0] <= values, values <= bounds[prop][1]).all(): return -np.inf if n_comp > 1: dv = np.abs(np.nanmedian(np.diff(vel))) # Insist on monotonically increasing velocity components v_idx = np.where(np.asarray(props) == 'v')[0][0] vels = np.array([theta[prop_len * i + v_idx] for i in range(n_comp)]) vel_diffs = np.diff(vels) # Make sure components are separated by at least one channel if np.any(vel_diffs < dv): return -np.inf return 0
[docs] class HyperfineFitter: def __init__( self, data=None, vel=None, error=None, mask=None, config_file=None, local_file=None, ): """Multi-component, hyperfine MCMC line fitting. A fully-automated, highly customisable multi-component fitter. For full details, see Williams & Watkins. For tutorials, see the docs Args: data (np.ndarray): Either a 1D array of intensity (spectrum) or a 3D array of intensities (cube). Intensities should be in K. vel (np.ndarray): Array of velocity values that correspond to data, in km/s. error (np.ndarray): Array of errors in intensity. Should have the same shape as `data`. Defaults to None. mask (np.ndarray): 1/0 mask to indicate significant emission in the cube (i.e. the pixels to fit). Should have shape of `data.shape[1:]`. Defaults to None, which will fit all pixels in a cube. config_file (str): Path to config.toml file. Defaults to None, which will use the default settings local_file (str): Path to local.toml file. Defaults to None, which will use the default settings Todo: * Bounds as parameters to input here. * Test the radex fitting routines on cubes. * Covariance matrices for the MCMC chains so we can plot errors for each component """ logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(asctime)s - %(name)s - %(funcName)s - %(message)s', ) self.logger = logging.getLogger(__name__) if data is None: self.logger.warning('data should be provided!') sys.exit() if vel is None: self.logger.warning('velocity definition should be provided! Defaulting to monotonically increasing') vel = np.arange(data.shape[0]) if error is None: self.logger.info('No error provided. Defaulting to 0') error = np.zeros_like(data) self.data = data self.vel = vel self.error = error with open(CONFIG_DEFAULT_PATH, 'rb') as f: config_defaults = tomllib.load(f) with open(LOCAL_DEFAULT_PATH, 'rb') as f: local_defaults = tomllib.load(f) if config_file is not None: with open(config_file, 'rb') as f: config = tomllib.load(f) else: self.logger.info('No config file provided. Using in-built defaults') config = copy.deepcopy(config_defaults) if local_file is not None: with open(local_file, 'rb') as f: local = tomllib.load(f) else: self.logger.info('No local file provided. Using in-built defaults') local = copy.deepcopy(local_defaults) self.config = config self.local = local self.config_defaults = config_defaults self.local_defaults = local_defaults fit_type = get_dict_val(self.config, self.config_defaults, table='fitting_params', key='fit_type', logger=self.logger, ) if fit_type not in ALLOWED_FIT_TYPES: self.logger.warning('Fit type %s not understood!' % fit_type) sys.exit() self.fit_type = fit_type fit_method = get_dict_val(self.config, self.config_defaults, table='fitting_params', key='fit_method', logger=self.logger, ) if fit_method not in ALLOWED_FIT_METHODS: self.logger.warning('Fitting procedure %s not understood!' % fit_method) sys.exit() self.fit_method = fit_method self.dv = np.abs(np.nanmedian(np.diff(self.vel))) if self.data.ndim == 1: self.data_type = 'spectrum' else: self.data_type = 'cube' self.logger.info('Detected data type as %s' % self.data_type) if self.data_type == 'cube': if mask is None: self.logger.info('No mask provided. Including every pixel') mask = np.ones([data.shape[1], data.shape[2]]) self.mask = mask if self.data.shape != self.error.shape: self.logger.warning('Data and error should be the same shape') sys.exit() if self.data.shape[1:] != self.mask.shape: self.logger.warning('Mask should be 2D projection of data') sys.exit() line = get_dict_val(self.config, self.config_defaults, table='fitting_params', key='line', logger=self.logger, ) if line not in ALLOWED_LINES: self.logger.warning('Line %s not understood!' % line) sys.exit() self.line = line if self.fit_type == 'lte': self.strength_lines = np.array( [strength_lines[line_name] for line_name in strength_lines.keys() if self.line in line_name] ) self.v_lines = np.array( [v_lines[line_name] for line_name in v_lines.keys() if self.line in line_name] ) self.bounds = [ (T_BACKGROUND, 1e3), (np.log(0.1), np.log(30)), (np.nanmin(self.vel), np.nanmax(self.vel)), (self.dv / 2.355, 10), ] elif self.fit_type == 'radex': rest_freq = get_dict_val(self.config, self.config_defaults, table='fitting_params', key='rest_freq', logger=self.logger, ) if rest_freq == '': self.logger.warning('rest_freq should be defined') sys.exit() if isinstance(rest_freq, float) or isinstance(rest_freq, int): rest_freq = rest_freq * u.GHz self.strength_lines = None self.transitions = [ transition_lines[line_name] for line_name in transition_lines.keys() if self.line in line_name ] freq = np.array( [freq_lines[line_name] for line_name in freq_lines.keys() if self.line in line_name] ) * u.GHz freq_to_vel = u.doppler_radio(rest_freq) self.v_lines = freq.to(u.km / u.s, equivalencies=freq_to_vel).value self.bounds = [ (T_BACKGROUND, 75), (13, 15), (5, 8), (np.nanmin(self.vel), np.nanmax(self.vel)), (self.dv / 2.355, 10), ] radex_datafile = get_dict_val(self.local, self.local_defaults, table='local', key='radex_datafile', logger=self.logger, ) self.radex_datafile = radex_datafile p0 = get_dict_val(self.config, self.config_defaults, table='initial_guess', key=fit_type, logger=self.logger, ) self.p0 = p0 if not len(self.bounds) == len(self.p0): self.logger.warning('bounds and p0 should have the same length!') sys.exit() delta_bic_cutoff = get_dict_val(self.config, self.config_defaults, table='mcmc', key='delta_bic_cutoff', logger=self.logger, ) self.delta_bic_cutoff = delta_bic_cutoff n_steps = get_dict_val(self.config, self.config_defaults, table='mcmc', key='n_steps', logger=self.logger, ) self.n_steps = n_steps n_walkers = get_dict_val(self.config, self.config_defaults, table='mcmc', key='n_walkers', logger=self.logger, ) self.n_walkers = n_walkers n_cores = get_dict_val(self.local, self.local_defaults, table='local', key='n_cores', logger=self.logger, ) if n_cores == '': self.n_cores = mp.cpu_count() else: self.n_cores = n_cores if self.fit_type == 'lte': self.props = [ 'tex', 'tau', 'v', 'sigma', ] self.labels = [ r'$T_\mathrm{ex}$ (%s)', r'$\log(\tau)$ (%s)', r'$v$ (%s)', r'$\sigma$ (%s)', ] elif self.fit_type == 'radex': self.props = [ 't_kin', 'N_col', 'n_h2', 'v', 'sigma', ] self.labels = [ r'$T_\mathrm{kin}$ (%s)', r'$N_\mathrm{col}$ (%s)', r'$n_\mathrm{H2}$ (%s)', r'$v$ (%s)', r'$\sigma$ (%s)', ] else: self.logger.warning('fit_type %s not known' % self.fit_type) sys.exit() self.parameter_maps = None self.max_n_comp = None
[docs] def generate_radex_grid(self, output_file=None, ): """Pre-generate RADEX grid, given various bounds and steps This will be the grid we interpolate over for speed later. As such, the parameters should be set to cover the parameter range you care about. Parameters are stored in config.toml Args: output_file (str): Output filename. Defaults to None, which will pull from config.toml """ if self.fit_type != 'radex': self.logger.warning('fit_type should be radex') sys.exit() if output_file is None: output_file = get_dict_val(self.config, self.config_defaults, table='generate_radex_grid', key='output_file', logger=self.logger, ) t_kin = get_dict_val(self.config, self.config_defaults, table='generate_radex_grid', key='t_kin', logger=self.logger, ) n_mol = get_dict_val(self.config, self.config_defaults, table='generate_radex_grid', key='n_mol', logger=self.logger, ) n_h2 = get_dict_val(self.config, self.config_defaults, table='generate_radex_grid', key='n_h2', logger=self.logger, ) dv = get_dict_val(self.config, self.config_defaults, table='generate_radex_grid', key='dv', logger=self.logger, ) if t_kin == '': t_kin = self.bounds[0] if n_mol == '': n_mol = self.bounds[1] if n_h2 == '': n_h2 = self.bounds[2] if dv == '': dv = self.bounds[4] t_kin_step = get_dict_val(self.config, self.config_defaults, table='generate_radex_grid', key='t_kin_step', logger=self.logger, ) n_mol_step = get_dict_val(self.config, self.config_defaults, table='generate_radex_grid', key='n_mol_step', logger=self.logger, ) n_h2_step = get_dict_val(self.config, self.config_defaults, table='generate_radex_grid', key='n_h2_step', logger=self.logger, ) dv_step = get_dict_val(self.config, self.config_defaults, table='generate_radex_grid', key='dv_step', logger=self.logger, ) geom = get_dict_val(self.config, self.config_defaults, table='generate_radex_grid', key='geom', logger=self.logger, ) progress = get_dict_val(self.config, self.config_defaults, table='generate_radex_grid', key='progress', logger=self.logger, ) f_name = inspect.currentframe().f_code.co_name overwrite = check_overwrite(self.config, f_name) if not os.path.exists(output_file) or overwrite: t_kin_array = np.arange(t_kin[0], t_kin[1] + t_kin_step, t_kin_step) n_mol_array = np.arange(n_mol[0], n_mol[1] + n_mol_step, n_mol_step) n_h2_array = np.arange(n_h2[0], n_h2[1] + n_h2_step, n_h2_step) dv_array = np.arange(dv[0], dv[1] + dv_step, dv_step) * 2.355 # 2.355 since distinction between sigma/dv ds = ndradex.run(self.radex_datafile, self.transitions, T_kin=t_kin_array, N_mol=10 ** n_mol_array, n_H2=10 ** n_h2_array, dv=dv_array, T_bg=T_BACKGROUND, geom=geom, progress=progress, n_procs=self.n_cores, ) ndradex.save_dataset(ds, output_file) else: ds = ndradex.load_dataset(output_file) global radex_grid radex_grid = ds
[docs] def multicomponent_fitter(self, fit_dict_filename=None, n_comp_filename=None, likelihood_filename=None, ): """Run the multicomponent fitter This runs everything, essentially, and is the part that you should call. It'll do LMFIT to get guesses, emcee to properly sample parameter space and then iteratively add components before removing them. Args: fit_dict_filename: Filename to save the fitted emcee walkers to. Defaults to None, which will not save anything n_comp_filename: Filename to save out fitted number of components. Defaults to None, which will not save anything likelihood_filename: Filename to save out likelihood for the best fit. Defaults to None, which will not save anything """ f_name = inspect.currentframe().f_code.co_name overwrite = check_overwrite(self.config, f_name) self.logger.info('Starting multi-component fitting') if fit_dict_filename is None: fit_dict_filename = get_dict_val(self.config, self.config_defaults, table='multicomponent_fitter', key='fit_dict_filename', logger=self.logger, ) chunksize = get_dict_val(self.config, self.config_defaults, table='multicomponent_fitter', key='chunksize', logger=self.logger, ) progress = get_dict_val(self.config, self.config_defaults, table='multicomponent_fitter', key='progress', logger=self.logger, ) save = get_dict_val(self.config, self.config_defaults, table='multicomponent_fitter', key='save', logger=self.logger, ) if self.data_type == 'spectrum': if not os.path.exists(fit_dict_filename + '.pkl') or overwrite: data = self.data error = self.error n_comp, likelihood, sampler = self.delta_bic_looper(data, error, progress=progress, ) if save: fit_dict = {'sampler': sampler, 'n_comp': n_comp, 'likelihood': likelihood, } save_fit_dict(fit_dict, fit_dict_filename + '.pkl') elif self.data_type == 'cube': if n_comp_filename is None: n_comp_filename = get_dict_val(self.config, self.config_defaults, table='multicomponent_fitter', key='n_comp_filename', logger=self.logger, ) if likelihood_filename is None: likelihood_filename = get_dict_val(self.config, self.config_defaults, table='multicomponent_fitter', key='likelihood_filename', logger=self.logger, ) if not os.path.exists(n_comp_filename + '.npy') or overwrite: n_comp = np.zeros([self.data.shape[1], self.data.shape[2]]) else: n_comp = np.load(n_comp_filename + '.npy') if not os.path.exists(likelihood_filename + '.npy') or overwrite: likelihood = np.zeros_like(n_comp) else: likelihood = np.load(likelihood_filename + '.npy') ij_list = [(i, j) for i in range(self.data.shape[1]) for j in range(self.data.shape[2]) if self.mask[i, j] != 0 ] self.logger.info('Fitting using %d cores' % self.n_cores) with mp.Pool(self.n_cores) as pool: map_result = list( tqdm( pool.imap( partial(self.parallel_fitting, fit_dict_filename=fit_dict_filename, save=save, overwrite=overwrite), ij_list, chunksize=chunksize, ), total=len(ij_list) ) ) for idx, ij in enumerate(ij_list): n_comp[ij[0], ij[1]] = map_result[idx][0] likelihood[ij[0], ij[1]] = map_result[idx][1] if save: np.save(n_comp_filename + '.npy', n_comp) np.save(likelihood_filename + '.npy', likelihood)
[docs] def parallel_fitting(self, ij, fit_dict_filename='fit_dict', save=True, overwrite=False, ): """Parallel function for MCMC fitting. Wraps up the MCMC fitting to pass off to multiple cores. Because of overheads, it's easier to farm out multiple fits to multiple cores, rather than run the MCMC with multiple threads. Args: ij (tuple): tuple containing (i, j) coordinates of the pixel to fit. fit_dict_filename (str): Base filename for MCMC ft pickle. Will append coordinates on afterwards. Defaults to fit_dict. save (bool): Save out files? Defaults to True. overwrite (bool): Overwrite existing files? Defaults to False. Returns: Number of fitted components and the best-fit likelihood. """ i = ij[0] j = ij[1] cube_fit_dict_filename = fit_dict_filename + '_%s_%s' % (i, j) if not os.path.exists(cube_fit_dict_filename + '.pkl') or overwrite: self.logger.debug('Fitting %s, %s' % (i, j)) data = self.data[:, i, j] error = self.error[:, i, j] # Limit to a single core to avoid weirdness with threadpool_limits(limits=1, user_api=None): n_comp, likelihood, sampler = self.delta_bic_looper(data=data, error=error, ) if save: fit_dict = {'sampler': sampler, 'n_comp': n_comp, 'likelihood': likelihood, } save_fit_dict(fit_dict, cube_fit_dict_filename + '.pkl') else: fit_dict = load_fit_dict(cube_fit_dict_filename + '.pkl') n_comp = fit_dict['n_comp'] likelihood = fit_dict['likelihood'] self.logger.debug('N components: %d, likelihood: %.2f' % (n_comp, likelihood)) return n_comp, likelihood
[docs] def delta_bic_looper(self, data, error, save=False, overwrite=True, progress=False, ): """Increase spectral complexity until we hit diminishing returns This will iteratively build up the spectral model one component at a time, before looping backwards to remove the weaker components which might just be fits to the noise Args: data: Observed data error: Observed error save: Whether to save out or not, defaults to False overwrite: Whether to overwrite existing fits. Defaults to False progress: Whether to display progress bars or not. Defaults to False Returns: fitted number of components, likelihood, and the emcee sampler """ # We start with a zero component model, i.e. a flat line delta_bic = np.inf sampler_old = None likelihood_old = None sampler = None n_comp = 0 prop_len = len(self.props) ln_m = np.log(len(data[~np.isnan(data)])) likelihood = ln_like(theta=0, intensity=data, intensity_err=error, vel=self.vel, strength_lines=self.strength_lines, v_lines=self.v_lines, props=self.props, n_comp=n_comp, fit_type=self.fit_type, ) bic = - 2 * likelihood while delta_bic > self.delta_bic_cutoff: # Store the previous BIC and sampler, since we need them later bic_old = bic sampler_old = sampler likelihood_old = likelihood # Increase the number of components, refit n_comp += 1 k = n_comp * prop_len with warnings.catch_warnings(): warnings.simplefilter('ignore') sampler = self.run_mcmc(data, error, n_comp=n_comp, save=save, overwrite=overwrite, progress=progress, ) # Calculate max likelihood parameters and BIC flat_samples = sampler.get_chain(discard=self.n_steps // 2, flat=True, ) parameter_median = np.nanmedian(flat_samples, axis=0, ) likelihood = ln_like(theta=parameter_median, intensity=data, intensity_err=error, vel=self.vel, strength_lines=self.strength_lines, v_lines=self.v_lines, props=self.props, n_comp=n_comp, fit_type=self.fit_type, ) bic = k * ln_m - 2 * likelihood delta_bic = bic_old - bic sampler = sampler_old likelihood = likelihood_old n_comp -= 1 # Now loop backwards, iteratively remove the weakest component and refit. Only if we have a >0 order fit! if n_comp > 0: max_back_loops = n_comp for i in range(max_back_loops): flat_samples = sampler.get_chain(discard=self.n_steps // 2, flat=True, ) parameter_median = np.nanmedian(flat_samples, axis=0, ) if self.fit_type == 'lte': line_intensities = np.array( [hyperfine_structure_lte(*parameter_median[prop_len * i: prop_len * i + prop_len], strength_lines=self.strength_lines, v_lines=self.v_lines, vel=self.vel, ) for i in range(n_comp)] ) elif self.fit_type == 'radex': line_intensities = np.array( [hyperfine_structure_radex(*parameter_median[prop_len * i: prop_len * i + prop_len], v_lines=self.v_lines, vel=self.vel, ) for i in range(n_comp)] ) else: self.logger.warning('Fit type %s not understood!' % self.fit_type) sys.exit() integrated_intensities = np.trapz(line_intensities, x=self.vel, axis=-1) component_order = np.argsort(integrated_intensities) bic_old = bic sampler_old = sampler likelihood_old = likelihood # Remove the weakest component n_comp -= 1 if n_comp == 0: likelihood = ln_like(theta=0, intensity=data, intensity_err=error, vel=self.vel, strength_lines=self.strength_lines, v_lines=self.v_lines, props=self.props, n_comp=n_comp, fit_type=self.fit_type) bic = - 2 * likelihood else: k = n_comp * prop_len idx_to_delete = range(prop_len * component_order[0], prop_len * component_order[0] + prop_len) p0 = np.delete(parameter_median, idx_to_delete) with warnings.catch_warnings(): warnings.simplefilter('ignore') sampler = self.run_mcmc(data, error, n_comp=n_comp, save=save, overwrite=overwrite, progress=progress, p0_fit=p0, ) # Calculate max likelihood parameters and BIC flat_samples = sampler.get_chain(discard=self.n_steps // 2, flat=True, ) parameter_median = np.nanmedian(flat_samples, axis=0) likelihood = ln_like(theta=parameter_median, intensity=data, intensity_err=error, vel=self.vel, strength_lines=self.strength_lines, v_lines=self.v_lines, props=self.props, n_comp=n_comp, fit_type=self.fit_type, ) bic = k * ln_m - 2 * likelihood delta_bic = bic_old - bic # If removing and refitting doesn't significantly improve things, then just jump out of here if delta_bic < self.delta_bic_cutoff: break sampler = sampler_old likelihood = likelihood_old n_comp += 1 return n_comp, likelihood, sampler
[docs] def run_mcmc(self, data, error, n_comp=1, save=True, overwrite=False, progress=False, fit_dict_filename='fit_dict.pkl', p0_fit=None, ): """Run emcee to get a fit out Args: data: Observed data error: Observed uncertainty n_comp: Number of components to fit. Defaults to 1 save: Whether to save the sampler out. Defaults to True overwrite: Whether to overwrite existing fit. Defaults to False progress: Whether to display progress bar. Defaults to False fit_dict_filename: Name for the sampler. Defaults to fit_dict.pkl p0_fit: Initial guess for the fit. Defaults to None, which will use some basic parameters that are likely to be suboptimal """ prop_len = len(self.props) bounds = self.bounds * n_comp if not os.path.exists(fit_dict_filename) or overwrite: if p0_fit is None: vel_idx = np.where(np.array(self.props) == 'v')[0][0] p0 = np.array(self.p0 * n_comp) # Move the velocities a little to encourage parameter space hunting for i in range(n_comp): p0[prop_len * i + vel_idx] += i # Use lmfit to get an initial fit params = Parameters() p0_idx = 0 for i in range(n_comp): for j in range(prop_len): params.add('%s_%d' % (self.props[j], i), value=p0[p0_idx], min=bounds[j][0], max=bounds[j][1], ) p0_idx += 1 # Pull in any relevant kwargs kwargs = {} for config_dict in [self.config_defaults, self.config]: if 'lmfit' in config_dict: for key in config_dict['lmfit']: kwargs[key] = config_dict['lmfit'][key] # Filter out any NaNs good_idx = np.where(~np.isnan(data)) lmfit_result = minimize( fcn=initial_lmfit, params=params, args=(data[good_idx], error[good_idx], self.vel[good_idx], self.strength_lines, self.v_lines, self.props, n_comp, self.fit_type, True, ), **kwargs, ) p0_fit = np.array([lmfit_result.params[key].value for key in lmfit_result.params]) # Sort p0 so it has monotonically increasing velocities v0_values = np.array( [p0_fit[prop_len * i + vel_idx] for i in range(n_comp)] ) v0_sort = v0_values.argsort() p0_fit_sort = [p0_fit[prop_len * i: prop_len * i + prop_len] for i in v0_sort] p0_fit = [item for sublist in p0_fit_sort for item in sublist] n_dims = len(p0_fit) # Shuffle the parameters around a little. For values very close to 0, don't base this on the value itself p0_movement = np.max(np.array([0.01 * np.abs(p0_fit), 0.01 * np.ones_like(p0_fit)]), axis=0, ) pos = np.array(p0_fit) + p0_movement * np.random.randn(self.n_walkers, n_dims) # Enforce positive values for t_ex, width for the LTE fitting if self.fit_type == 'lte': positive_idx = [0, 3] elif self.fit_type == 'radex': positive_idx = [0, 1, 4] else: self.logger.warning('Fit type %s not understood!' % self.fit_type) sys.exit() prop_len = len(self.props) enforced_positives = [[prop_len * i + j] for i in range(n_comp) for j in positive_idx] enforced_positives = [item for sublist in enforced_positives for item in sublist] for i in enforced_positives: pos[:, i] = np.abs(pos[:, i]) sampler = self.emcee_wrapper(data, error, pos=pos, n_dims=n_dims, n_comp=n_comp, progress=progress, ) if save: # Calculate max likelihood flat_samples = sampler.get_chain(discard=self.n_steps // 2, flat=True, ) parameter_median = np.nanmedian(flat_samples, axis=0, ) likelihood = ln_like(theta=parameter_median, intensity=data, intensity_err=error, vel=self.vel, strength_lines=self.strength_lines, v_lines=self.v_lines, props=self.props, n_comp=n_comp, fit_type=self.fit_type, ) fit_dict = {'sampler': sampler, 'n_comp': n_comp, 'likelihood': likelihood, } save_fit_dict(fit_dict, fit_dict_filename) else: fit_dict = load_fit_dict(fit_dict_filename) sampler = fit_dict['sampler'] return sampler
[docs] def emcee_wrapper(self, data, error, pos, n_dims, n_comp=1, progress=False, ): """Light wrapper around emcee The runs the emcee part, with some custom moves and passes the arguments neatly to functions Args: data: Observed data error: Observed uncertainty pos: Position for the walkers n_dims: Number of dimensions for the problem n_comp: Number of components to fit. Defaults to None progress: Whether or not to display progress bar. Defaults to False """ if self.data_type == 'spectrum': # Multiprocess here for speed with mp.Pool(self.n_cores) as pool: sampler = emcee.EnsembleSampler(nwalkers=self.n_walkers, ndim=n_dims, log_prob_fn=ln_prob, args=( data, error, self.vel, self.strength_lines, self.v_lines, self.props, self.bounds, n_comp, self.fit_type, ), moves=[(emcee.moves.DEMove(), 0.8), (emcee.moves.DESnookerMove(), 0.2) ], pool=pool, ) # Run burn-in state = sampler.run_mcmc(pos, self.n_steps // 4, progress=progress, ) sampler.reset() # Do the full run sampler.run_mcmc(state, self.n_steps, progress=progress, ) else: # Run in serial since the cube is multiprocessing already (no daemons here, not today satan) sampler = emcee.EnsembleSampler(nwalkers=self.n_walkers, ndim=n_dims, log_prob_fn=ln_prob, args=( data, error, self.vel, self.strength_lines, self.v_lines, self.props, self.bounds, n_comp, self.fit_type, ), moves=[(emcee.moves.DEMove(), 0.8), (emcee.moves.DESnookerMove(), 0.2) ], ) # Run burn-in state = sampler.run_mcmc(pos, self.n_steps // 4, progress=progress, ) sampler.reset() # Do the full run sampler.run_mcmc(state, self.n_steps, progress=progress, ) return sampler
[docs] def encourage_spatial_coherence(self, input_dir='fit', output_dir='fit_coherence', fit_dict_filename=None, n_comp_filename=None, likelihood_filename=None, reverse_direction=False, ): """Loop over fits to encourage spatial coherence This will loop over RA/Dec to potentially replace fits with neighbouring fits. This helps to encourage spatial coherence and can also help in the case of catastrophic misfits Args: input_dir: directory for the input fits. Defaults to 'fit' output_dir: directory for the output of the coherence checking. Defaults to 'fit_coherence' fit_dict_filename: Filename structure for the fit dictionary. Default to None, which will choose some generic name n_comp_filename: Filename for the n_comp map. Defaults to None, which will choose some generic name likelihood_filename: Filename for the likelihood map. Defaults to None, which will choose some generic name reverse_direction: Whether to reverse how we step through x/y in the coherence encouragement. Defaults to False, but likely you should run one case forward, then another backward """ if self.data_type != 'cube': self.logger.warning('Can only do spatial coherence on a cube!') sys.exit() f_name = inspect.currentframe().f_code.co_name overwrite = check_overwrite(self.config, f_name) if fit_dict_filename is None: fit_dict_filename = get_dict_val(self.config, self.config_defaults, table='multicomponent_fitter', key='fit_dict_filename', logger=self.logger, ) if n_comp_filename is None: n_comp_filename = get_dict_val(self.config, self.config_defaults, table='multicomponent_fitter', key='n_comp_filename', logger=self.logger, ) if likelihood_filename is None: likelihood_filename = get_dict_val(self.config, self.config_defaults, table='multicomponent_fitter', key='likelihood_filename', logger=self.logger, ) if reverse_direction: step = -1 else: step = 1 if not os.path.exists(output_dir): os.makedirs(output_dir) if overwrite: # Flush out the directory files_in_dir = glob.glob(os.path.join(output_dir, '*')) for file_in_dir in files_in_dir: os.remove(file_in_dir) n_comp = np.load(os.path.join(input_dir, '%s.npy' % n_comp_filename)) likelihood = np.load(os.path.join(input_dir, '%s.npy' % likelihood_filename)) total_found = 0 ij_list = [(i, j) for i in range(self.data.shape[1])[::step] for j in range(self.data.shape[2])[::step] if self.mask[i, j] != 0] for ij in tqdm(ij_list): i, j = ij[0], ij[1] # Pull out the neighbouring pixels i_min = max(0, i - 1) i_max = min(self.data.shape[1], i + 2) j_min = max(0, j - 1) j_max = min(self.data.shape[2], j + 2) n_comp_cutout = n_comp[i_min: i_max, j_min: j_max] input_file = os.path.join(input_dir, '%s_%s_%s.pkl' % (fit_dict_filename, i, j)) output_file = os.path.join(output_dir, '%s_%s_%s.pkl' % (fit_dict_filename, i, j)) if not os.path.exists(output_file) or overwrite: n_comp_original = int(n_comp[i, j]) ln_m = np.log(len(self.data[:, i, j][~np.isnan(self.data[:, i, j])])) # Pull original likelihood for the pixel fit_dict = load_fit_dict(input_file) like_original = fit_dict['likelihood'] bic_original = ln_m * n_comp_original * len(self.props) - 2 * like_original delta_bic = np.zeros_like(n_comp_cutout) likelihood_cutout = np.zeros_like(delta_bic) for i_cutout, i_full in enumerate(range(i_min, i_max)): for j_cutout, j_full in enumerate(range(j_min, j_max)): if self.mask[i_full, j_full] == 0: continue # Pull out the parameters from the neighbouring fits, and see if that works better for the # original pixel n_comp_new = int(n_comp[i_full, j_full]) if n_comp_new > 0: # Check if we already have moved the sampler file cutout_fit_dict_filename = os.path.join(output_dir, '%s_%s_%s.pkl' % (fit_dict_filename, i_full, j_full)) if not os.path.exists(cutout_fit_dict_filename): cutout_fit_dict_filename = os.path.join(input_dir, '%s_%s_%s.pkl' % (fit_dict_filename, i_full, j_full)) cutout_fit_dict = load_fit_dict(cutout_fit_dict_filename) cutout_sampler = cutout_fit_dict['sampler'] flat_samples = cutout_sampler.get_chain(discard=self.n_steps // 2, flat=True, ) pars_new = np.nanmedian(flat_samples, axis=0, ) like_new = ln_like(theta=pars_new, intensity=self.data[:, i, j], intensity_err=self.error[:, i, j], vel=self.vel, strength_lines=self.strength_lines, v_lines=self.v_lines, props=self.props, n_comp=n_comp_new, fit_type=self.fit_type, ) else: like_new = ln_like(theta=0, intensity=self.data[:, i, j], intensity_err=self.error[:, i, j], vel=self.vel, strength_lines=self.strength_lines, v_lines=self.v_lines, props=self.props, n_comp=n_comp_new, fit_type=self.fit_type, ) bic_new = ln_m * n_comp_new * len(self.props) - 2 * like_new delta_bic[i_cutout, j_cutout] = bic_original - bic_new likelihood_cutout[i_cutout, j_cutout] = like_new # Find the maximum change in BIC between pixels. Set ones where we don't have a meaningful value to # something small delta_bic[delta_bic == 0] = -1000 idx = np.unravel_index(np.argmax(delta_bic, axis=None), delta_bic.shape) if delta_bic[idx] > self.delta_bic_cutoff: total_found += 1 n_comp[i, j] = n_comp_cutout[idx] likelihood[i, j] = likelihood_cutout[idx] # If we're replacing with a file we've already replaced, pull from the output directory. Else # pull from the input directory. input_file = os.path.join(output_dir, '%s_%s_%s.pkl' % (fit_dict_filename, idx[0] + i_min, idx[1] + j_min)) if not os.path.exists(input_file): input_file = os.path.join(input_dir, '%s_%s_%s.pkl' % (fit_dict_filename, idx[0] + i_min, idx[1] + j_min)) # Move the right file to the new directory os.system('cp %s %s' % (input_file, output_file)) self.logger.info('Number replaced: %d' % total_found) n_comp_output_filename = os.path.join(output_dir, '%s.npy' % n_comp_filename) likelihood_output_filename = os.path.join(output_dir, '%s.npy' % likelihood_filename) if not os.path.exists(n_comp_output_filename) or overwrite: np.save(n_comp_output_filename, n_comp) np.save(likelihood_output_filename, likelihood)
[docs] def get_fits_from_samples(self, samples, vel, n_draws=100, n_comp=1, ): """Get a number of fit lines from an MCMC run Args: samples: emcee output vel: Velocity grid to evaluate the fit on n_draws (int): Number of draws to pull from samples n_comp (int): Number of components in the fit Returns: array of best fit line """ fit_lines = np.zeros([len(vel), n_draws, n_comp]) for draw in range(n_draws): sample = np.random.randint(low=0, high=samples.shape[0]) for i in range(n_comp): theta_draw = samples[sample, ...][len(self.props) * i: len(self.props) * i + len(self.props)] if self.fit_type == 'lte': fit_lines[:, draw, i] = hyperfine_structure_lte(*theta_draw, strength_lines=self.strength_lines, v_lines=self.v_lines, vel=vel, ) elif self.fit_type == 'radex': qn_ul = np.array(range(len(radex_grid['QN_ul'].values))) fit_lines[:, draw, i] = get_radex_multiple_components(theta_draw, vel=vel, v_lines=self.v_lines, qn_ul=qn_ul, ) return fit_lines
[docs] def create_fit_cube(self, fit_dict_filename=None, n_comp_filename=None, cube_filename=None, ): """Create upper/lower errors for spectral fit plots Args: fit_dict_filename (str): Name for the filename of fitted parameter dictionary. Defaults to None, which will pull from config.toml n_comp_filename (str): Name for the filename of component number map. Defaults to None, which will pull from config.toml cube_filename (str): Name for the filename of output cube. Defaults to None, which will pull from config.toml """ f_name = inspect.currentframe().f_code.co_name overwrite = check_overwrite(self.config, f_name) if fit_dict_filename is None: fit_dict_filename = get_dict_val(self.config, self.config_defaults, table='multicomponent_fitter', key='fit_dict_filename', logger=self.logger, ) if n_comp_filename is None: n_comp_filename = get_dict_val(self.config, self.config_defaults, table='multicomponent_fitter', key='n_comp_filename', logger=self.logger, ) if cube_filename is None: cube_filename = get_dict_val(self.config, self.config_defaults, table='create_fit_cube', key='cube_filename', logger=self.logger, ) chunksize = get_dict_val(self.config, self.config_defaults, table='create_fit_cube', key='chunksize', logger=self.logger, ) if not os.path.exists('%s.npy' % cube_filename) or overwrite: ij_list = [(i, j) for i in range(self.data.shape[1]) for j in range(self.data.shape[2]) if self.mask[i, j] != 0 ] n_comp = np.load('%s.npy' % n_comp_filename) # Setup fit cube fit_cube = np.zeros([3, *self.data.shape], dtype=np.float32) with mp.Pool(self.n_cores) as pool: map_result = list( tqdm( pool.imap( partial( self.parallel_fit_samples, fit_dict_filename=fit_dict_filename, n_comp=n_comp), ij_list, chunksize=chunksize, ), total=len(ij_list) ) ) for idx, ij in enumerate(ij_list): fit_cube[:, :, ij[0], ij[1]] = map_result[idx] np.save('%s.npy' % cube_filename, fit_cube)
[docs] def parallel_fit_samples(self, ij, fit_dict_filename=None, n_comp=None, ): """Pull fit percentiles from a single pixel""" if not fit_dict_filename or n_comp is None: self.logger.warning('Fit dict filename and n_comp must be defined!') sys.exit() i = ij[0] j = ij[1] n_comp_pix = int(n_comp[i, j]) cube_sampler_filename = '%s_%s_%s.pkl' % (fit_dict_filename, i, j) if n_comp_pix == 0: return np.zeros([3, len(self.vel)]) fit_dict = load_fit_dict(cube_sampler_filename) sampler = fit_dict['sampler'] flat_samples = sampler.get_chain(discard=self.n_steps // 2, flat=True, ) fit_lines = self.get_fits_from_samples(flat_samples, vel=self.vel, n_draws=100, n_comp=n_comp_pix, ) fit_percentiles = np.nanpercentile(np.nansum(fit_lines, axis=-1), [50, 16, 84], axis=1, ) return fit_percentiles
[docs] def make_parameter_maps(self, fit_dict_filename=None, n_comp_filename=None, maps_filename=None, ): """Make maps of fitted parameters Args: fit_dict_filename (str): Name for the filename of fitted parameter dictionary. Defaults to None, which will pull from config.toml n_comp_filename (str): Name for the filename of component number map. Defaults to None, which will pull from config.toml maps_filename (str): Name for the filename of output maps. Defaults to None, which will pull from config.toml """ if self.data_type != 'cube': self.logger.warning('Can only make parameter maps for fitted cubes') sys.exit() f_name = inspect.currentframe().f_code.co_name overwrite = check_overwrite(self.config, f_name) if fit_dict_filename is None: fit_dict_filename = get_dict_val(self.config, self.config_defaults, table='multicomponent_fitter', key='fit_dict_filename', logger=self.logger, ) if n_comp_filename is None: n_comp_filename = get_dict_val(self.config, self.config_defaults, table='multicomponent_fitter', key='n_comp_filename', logger=self.logger, ) if maps_filename is None: maps_filename = get_dict_val(self.config, self.config_defaults, table='make_parameter_maps', key='maps_filename', logger=self.logger, ) n_samples = get_dict_val(self.config, self.config_defaults, table='make_parameter_maps', key='n_samples', logger=self.logger, ) chunksize = get_dict_val(self.config, self.config_defaults, table='make_parameter_maps', key='chunksize', logger=self.logger, ) n_comp = np.load('%s.npy' % n_comp_filename) max_n_comp = int(np.nanmax(n_comp)) if not os.path.exists(maps_filename) or overwrite: # Set up arrays in a dictionary parameter_maps = {} parameter_maps['chisq_red'] = np.zeros([self.data.shape[1], self.data.shape[2]]) parameter_maps['chisq_red'][parameter_maps['chisq_red'] == 0] = np.nan for i in range(max_n_comp): keys = ['tpeak_%s' % i, 'tpeak_%s_err_up' % i, 'tpeak_%s_err_down' % i ] for key in keys: parameter_maps[key] = np.zeros([self.data.shape[1], self.data.shape[2]]) parameter_maps[key][parameter_maps[key] == 0] = np.nan for prop in self.props: keys = ['%s_%s' % (prop, i), '%s_%s_err_up' % (prop, i), '%s_%s_err_down' % (prop, i) ] for key in keys: parameter_maps[key] = np.zeros([self.data.shape[1], self.data.shape[2]]) parameter_maps[key][parameter_maps[key] == 0] = np.nan # Loop over each pixel, pulling out the properties for each component, as well as the peak intensity and # errors for everything. Parallelize this up for speed ij_list = [[i, j] for i in range(self.data.shape[1]) for j in range(self.data.shape[2]) if self.mask[i, j] != 0 ] with mp.Pool(self.n_cores) as pool: par_dicts = list( tqdm( pool.imap( partial(self.parallel_map_making, n_comp=n_comp, fit_dict_filename=fit_dict_filename, n_samples=n_samples), ij_list, chunksize=chunksize, ), total=len(ij_list) ) ) # Pull the parameters into the arrays for dict_idx, par_dict in enumerate(par_dicts): i, j = ij_list[dict_idx][0], ij_list[dict_idx][1] for key in par_dict.keys(): parameter_maps[key][i, j] = par_dict[key] # Save out if maps_filename is not None: with open(maps_filename, 'wb') as f: pickle.dump(parameter_maps, f) else: with open(maps_filename, 'rb') as f: parameter_maps = pickle.load(f) self.parameter_maps = parameter_maps self.max_n_comp = max_n_comp
[docs] def parallel_map_making(self, ij, n_comp=None, fit_dict_filename='fit_dict', n_samples=500, ): """Pull parameters for map out of a single pixel""" if n_comp is None: self.logger.warning('n_comp should be defined!') sys.exit() i, j = ij[0], ij[1] n_comps_pix = int(n_comp[i, j]) par_dict = {} obs = self.data[:, i, j] obs_err = self.error[:, i, j] if n_comps_pix > 0: cube_fit_dict_filename = '%s_%s_%s.pkl' % (fit_dict_filename, i, j) fit_dict = load_fit_dict(cube_fit_dict_filename) sampler = fit_dict['sampler'] # Pull out median and errors for each parameter and each component flat_samples = sampler.get_chain(discard=self.n_steps // 2, flat=True, ) param_percentiles = np.percentile(flat_samples, [16, 50, 84], axis=0) param_diffs = np.diff(param_percentiles, axis=0) # Pull out model for reduced chi-square total_model = multiple_components(theta=param_percentiles[1, :], vel=self.vel, strength_lines=self.strength_lines, v_lines=self.v_lines, props=self.props, n_comp=n_comps_pix, fit_type=self.fit_type, ) for n_comp_pix in range(n_comps_pix): # Pull out peak intensities and errors for each component tpeak = np.zeros(n_samples) for sample in range(n_samples): choice_idx = np.random.randint(0, flat_samples.shape[0]) theta = flat_samples[choice_idx, 4 * n_comp_pix: 4 * n_comp_pix + 4] model = hyperfine_structure_lte(*theta, strength_lines=self.strength_lines, v_lines=self.v_lines, vel=self.vel, ) tpeak[sample] = np.nanmax(model) tpeak_percentiles = np.percentile(tpeak, [16, 50, 84], axis=0) tpeak_diff = np.diff(tpeak_percentiles) par_dict['tpeak_%s' % n_comp_pix] = tpeak_percentiles[1] par_dict['tpeak_%s_err_down' % n_comp_pix] = tpeak_diff[0] par_dict['tpeak_%s_err_up' % n_comp_pix] = tpeak_diff[1] # Pull out fitted properties and errors for each component for prop_idx, prop in enumerate(self.props): param_idx = n_comp_pix * len(self.props) + prop_idx par_dict['%s_%s' % (prop, n_comp_pix)] = param_percentiles[1, param_idx] par_dict['%s_%s_err_down' % (prop, n_comp_pix)] = param_diffs[0, param_idx] par_dict['%s_%s_err_up' % (prop, n_comp_pix)] = param_diffs[1, param_idx] else: total_model = np.zeros_like(obs) chisq = chi_square(obs, total_model, obs_err) deg_freedom = len(obs[~np.isnan(obs)]) - (n_comps_pix * len(self.props)) par_dict['chisq_red'] = chisq / deg_freedom return par_dict