Source code for mcfine.fitting

import copy
import filecmp
import glob
import inspect
import logging
import multiprocessing as mp
import os
import pickle
import sys
import tomllib
import warnings
from functools import partial

import emcee
import numpy as np
import specutils
import xarray
from astropy import units as u
from astropy.io import fits
from lmfit import minimize, Parameters
from scipy.interpolate import RegularGridInterpolator
from spectral_cube import SpectralCube
from specutils import Spectrum
from specutils.fitting import find_lines_derivative
from threadpoolctl import threadpool_limits
from tqdm import tqdm

# Suppress specutils warning about continuum level
specutils.conf.do_continuum_function_check = False

NDRADEX_IMPORTED = False
try:
    import ndradex

    NDRADEX_IMPORTED = True
except ModuleNotFoundError:
    pass

from .emcee_funcs import (
    get_samples,
    get_samples_from_fit_dict,
)
from .fitting_funcs import (
    chi_square,
    ln_prior,
)
from .line_info import (
    allowed_lines,
    transition_lines,
    freq_lines,
    strength_lines_dict,
    v_lines_dict,
)
from .line_shape_funcs import (
    hyperfine_structure_lte,
    hyperfine_structure_pure_gauss,
    hyperfine_structure_radex,
)
from .radex_funcs import get_nearest_values
from .utils import (
    get_dict_val,
    check_overwrite,
    save_pkl,
    load_pkl,
    CONFIG_DEFAULT_PATH,
    LOCAL_DEFAULT_PATH,
)
from .vars import (
    T_BACKGROUND,
    ALLOWED_LMFIT_METHODS,
    ALLOWED_EMCEE_RUN_METHODS,
    ALLOWED_FIT_TYPES,
    ALLOWED_FIT_METHODS,
)

logging.basicConfig(
    level=logging.INFO,
    format="[%(levelname)s] %(asctime)s - %(name)s - %(funcName)s - %(message)s",
)
logger = logging.getLogger("mcfine")

# Define global variables for potentially huge arrays, and various config values
glob_data = np.array([])
glob_error = np.array([])
glob_vel = np.array([])

glob_downsampled_data = np.array([])
glob_downsampled_error = np.array([])

glob_initial_n_comp = np.array([])
glob_initial_theta = np.array([], dtype=list)

glob_mcfine_output = {}

radex_grid = xarray.Dataset()

glob_config = {}

mp.set_start_method("fork")


[docs] def multiple_components( theta, vel, strength_lines, v_lines, props, n_comp, fit_type="lte", log_tau=True, ): """Sum intensities for multiple lines. Takes `n_comp` distinct lines, and calculates the total intensity of their various hyperfine lines. Args: theta: [t_ex, tau, vel, vel_width] for each component. Should have a length of 4*`n_comp` vel: velocity array strength_lines: Array for relative line strengths v_lines: Array of relative velocity shifts for the lines props: List of properties for the line profiles n_comp (int): Number of distinct components to calculate intensities for fit_type (str): Should be one of ALLOWED_FIT_TYPES. Defaults to 'lte' log_tau (bool): Whether to fit tau in log or linear space. Defaults to True (log space) Returns: np.ndarray: The sum of the intensities for all the distinct components. """ prop_len = len(props) if n_comp == 0: return np.zeros_like(vel) if fit_type == "lte": intensity_model = np.array( [ hyperfine_structure_lte( *theta[prop_len * i : prop_len * i + prop_len], strength_lines, v_lines, vel, log_tau=log_tau, ) for i in range(n_comp) ] ) elif fit_type == "pure_gauss": intensity_model = np.array( [ hyperfine_structure_pure_gauss( *theta[prop_len * i : prop_len * i + prop_len], strength_lines, v_lines, vel, ) for i in range(n_comp) ] ) elif fit_type == "radex": qn_ul = np.array(range(len(radex_grid["QN_ul"].values))) intensity_model = [ get_radex_multiple_components( theta[prop_len * i : prop_len * i + prop_len], vel, v_lines, qn_ul ) for i in range(n_comp) ] else: raise Warning(f"fit type {fit_type} not understood!") intensity_model = np.sum(intensity_model, axis=0) return intensity_model
[docs] def get_radex_multiple_components( theta, vel, v_lines, qn_ul, ): """Wrapper around RADEX to get out the multiple components""" # Important point here, RADEX uses a square profile so transform the sigma into the right width. Pull out # the subset of data around our values tau, t_ex = radex_grid_interp(theta, qn_ul) intensity_model = hyperfine_structure_radex( t_ex, tau, theta[3], theta[4], v_lines, vel ) return intensity_model
[docs] def radex_grid_interp( theta, qn_ul, labels=None, ): """Interpolate generated RADEX grid to get useful values out without weird edge effects Args: theta (list): property values qn_ul (list): Names for the transitions labels (list): Labels for properties Returns: tau and t_ex """ if labels is None: labels = [ "T_kin", "N_mol", "n_H2", "dv", ] nearest_values = get_nearest_values( radex_grid, labels, [theta[0], 10 ** theta[1], 10 ** theta[2], theta[4] * 2.355] ) # Pull out grid subset of the nearest values grid_subset = radex_grid.sel( T_kin=nearest_values[0], N_mol=nearest_values[1], n_H2=nearest_values[2], dv=nearest_values[3], ) tau_subset = grid_subset["tau"].values t_ex_subset = grid_subset["T_ex"].values # Remove any singular (limit) values limit_vals = [ True if type(nearest_value) == np.ndarray else False for nearest_value in nearest_values ] theta_coords = np.array( [theta[0], 10 ** theta[1], 10 ** theta[2], theta[4] * 2.355] ) theta_coords = theta_coords[limit_vals] # Also remove the singular dimensions from the nearest values in the grid nearest_values = np.asarray(nearest_values)[limit_vals] # Construct an array to get values for each point in the grid coords = np.array([(value, *theta_coords) for value in qn_ul]) # Setup grid and fast interpolate grid_coords = (qn_ul, *nearest_values) reg_grid_tau = RegularGridInterpolator(grid_coords, tau_subset) reg_grid_t_ex = RegularGridInterpolator(grid_coords, t_ex_subset) tau = reg_grid_tau(coords) t_ex = reg_grid_t_ex(coords) return tau, t_ex
[docs] def initial_lmfit( params, intensity, intensity_err, vel, strength_lines, v_lines, props, n_comp=1, fit_type="lte", log_tau=True, ): """Get initial guess for MC walkers from lmfit This is the residual calculation for LMFIT. Just to get our initial parameters to instantiate the MC Args: params: List of lmfit parameters intensity: Measured intensity intensity_err: Measured error on intensity vel: Velocity axis strength_lines: Relative line strength v_lines: Relative line velocity props: List of properties for the line profiles n_comp: Number of components to fit. Defaults to 1 fit_type: Type of fit to do. Either 'lte' or 'radex'. Defaults to 'lte' log_tau: Whether to fit tau in log-space (default) or linear space Returns: Residual (chisq) value from the fit """ theta = np.array([params[key].value for key in params]) intensity_model = multiple_components( theta=theta, vel=vel, strength_lines=strength_lines, v_lines=v_lines, props=props, n_comp=n_comp, fit_type=fit_type, log_tau=log_tau, ) diff = intensity - intensity_model residuals = diff / intensity_err return residuals
[docs] def ln_like( theta, intensity, intensity_err, vel, strength_lines, v_lines, props, n_comp=1, fit_type="lte", log_tau=True, ): """Calculate the (negative) log-likelihood for the model This is the main meat of the MCMC fitting, given the parameters we calculate the likelihood of the model and return that Args: theta: Parameters for the fit intensity: Observed intensity intensity_err: Intensity uncertainty vel: Observed velocity strength_lines: Relative line strength v_lines: Relative line velocities props: List of properties being fit n_comp: Number of components to fit. Defaults to 1 fit_type: Either 'lte' (default) or 'radex' log_tau: Whether to fit tau in log-space (default) or linear space Returns: Negative ln-likelihood for the model. """ intensity_model = multiple_components( theta, vel, strength_lines, v_lines, props, n_comp=n_comp, fit_type=fit_type, log_tau=log_tau, ) chisq = chi_square(intensity, intensity_model, intensity_err) # # Scale the chisq by sqrt(2[N-P]), following Smith+ and others # p = n_comp * len(props) # n = len(intensity[~np.isnan(intensity)]) # scale_factor = np.sqrt(2 * (n - p)) # chisq /= scale_factor return -0.5 * chisq
[docs] def ln_prob( theta, intensity, intensity_err, vel, strength_lines, v_lines, props, bounds, n_comp=1, fit_type="lte", ): """Calculate the ln-probability for emcee This combines the prior (generally flat) with the ln-likelihood to return to emcee Args: theta: Parameters for the fit intensity: Observed intensity intensity_err: Intensity uncertainty vel: Observed velocity strength_lines: Relative line strength v_lines: Relative line velocities props: List of properties being fit bounds: Bounds on parameters n_comp: Number of components to fit. Defaults to 1 fit_type: Either 'lte' (default) or 'radex' Returns: float: Combined probability (likelihood+prior) for the model. """ lp = ln_prior(theta, vel, props, bounds, n_comp=n_comp) if not np.isfinite(lp): return -np.inf like = ln_like( theta, intensity, intensity_err, vel, strength_lines, v_lines, props, n_comp=n_comp, fit_type=fit_type, ) return lp + like
[docs] def calculate_goodness_of_fit( data, error, best_fit_pars, n_comp, ): """Calculate various goodness of fit metrics.""" m = len(data[~np.isnan(data)]) ln_m = np.log(m) if best_fit_pars is None: k = 0 else: k = len(best_fit_pars) if n_comp > 0: total_model = multiple_components( theta=best_fit_pars, vel=glob_vel, strength_lines=glob_config["strength_lines"], v_lines=glob_config["v_lines"], props=glob_config["props"], n_comp=n_comp, fit_type=glob_config["fit_type"], ) else: total_model = np.zeros_like(data) chisq = chi_square( data, total_model, error, ) deg_freedom = m - k chisq_red = chisq / deg_freedom # log-likelihood, BIC, AIC from chi-square likelihood = -0.5 * chisq bic = k * ln_m - 2 * likelihood aic = 2 * k - 2 * likelihood goodness_of_fit_metrics = { "likelihood": likelihood, "bic": bic, "aic": aic, "chisq": chisq, "deg_freedom": deg_freedom, "chisq_red": chisq_red, } return goodness_of_fit_metrics
[docs] def sample_tpeak_per_component( samples, n_comp, n_samples=500, ): """Get a number of Tpeak samples per component""" tpeak = np.zeros([n_samples, n_comp]) idx_offset = len(glob_config["props"]) for i in range(n_samples): choice_idx = np.random.randint(0, samples.shape[0]) tpeak_i = [ np.nanmax( multiple_components( theta=samples[ choice_idx, idx_offset * j : idx_offset * j + idx_offset ], vel=glob_vel, strength_lines=glob_config["strength_lines"], v_lines=glob_config["v_lines"], props=glob_config["props"], n_comp=1, fit_type=glob_config["fit_type"], ) ) for j in range(n_comp) ] tpeak[i, :] = copy.deepcopy(tpeak_i) return tpeak
[docs] def downsample( data, chunk_size=10, func=np.nanmean, ): """Downsample data, given a function Args: data: Data to downsample. Should be an image chunk_size: Chunk size to downsample to func: Function to downsample using. Defaults to nanmean """ if data.ndim == 2: axes = (0, 1) elif data.ndim == 3: axes = (1, 2) else: raise ValueError("Input to downsample should be a 2D or 3D array") ii = np.arange(chunk_size, data.shape[axes[0]], chunk_size) jj = np.arange(chunk_size, data.shape[axes[1]], chunk_size) with warnings.catch_warnings(): warnings.simplefilter("ignore") downsampled_array = np.array( [ [ func(i_split, axis=axes) for i_split in np.array_split(j_split, ii, axis=axes[0]) ] for j_split in np.array_split(data, jj, axis=axes[1]) ] ).T return downsampled_array
[docs] def parallel_fitting( ij, fit_dict_filename="fit_dict", data_type="original", fit_method="mcmc", overwrite=False, ): """Parallel function for MCMC fitting. Wraps up the MCMC fitting to pass off to multiple cores. Because of overheads, it's easier to farm out multiple fits to multiple cores, rather than run the MCMC with multiple threads. Args: ij (tuple): tuple containing (i, j) coordinates of the pixel to fit. fit_dict_filename (str): Base filename for MCMC ft pickle. Will append coordinates on afterward. Defaults to fit_dict. data_type: Data type to fit. Can be either "original" or "downsampled" fit_method: "mcmc" or "leastsq". "mcmc" will run emcee for each component and use fit parameters from that, whereas "leastsq" will only run an MCMC after a final fit has been found, to explore covariances. Defaults to "mcmc". overwrite (bool): Overwrite existing files? Defaults to False. Returns: Number of fitted components and the best-fit likelihood. """ if data_type not in ["original", "downsampled"]: raise ValueError("Data type to fit should be original or downsampled") i = ij[0] j = ij[1] cube_fit_dict_filename = f"{fit_dict_filename}_{i}_{j}.pkl" if not os.path.exists(cube_fit_dict_filename) or overwrite: logger.debug(f"Fitting {i}, {j}") if data_type == "original": data = glob_data[:, i, j] error = glob_error[:, i, j] elif data_type == "downsampled": data = glob_downsampled_data[:, i, j] error = glob_downsampled_error[:, i, j] else: raise ValueError("Data type to fit should be original or downsampled") # If somehow we've broken the logic here, then freak out if data.size == 0: raise ValueError("No data found!") # Get out initial number of components, if we have them initial_n_comp = None if glob_initial_n_comp.size != 0: initial_n_comp = glob_initial_n_comp[i, j] # Get out initial parameters, if we have them initial_pars = None if glob_initial_theta.size != 0: initial_pars = glob_initial_theta[i, j] # If we have an empty list, fall back to None if len(initial_pars) == 0: initial_pars = None # Limit to a single core to avoid weirdness with threadpool_limits(limits=1, user_api=None): fit_dict = delta_bic_looper( data=data, error=error, fit_method=fit_method, initial_n_comp=initial_n_comp, initial_pars=initial_pars, ) if not glob_config["keep_sampler"]: fit_dict.pop("sampler") if not glob_config["keep_covariance"]: fit_dict.pop("cov_matrix") fit_dict.pop("cov_med") save_pkl(fit_dict, cube_fit_dict_filename) return cube_fit_dict_filename
[docs] def get_parameters_from_fitting( data, error, n_comp, p0_fit=None, fit_method="mcmc", save=False, overwrite=True, progress=False, ): """Get fitted parameters, either using MCMC or least-squares Args: data: Observed data error: Observed error n_comp: Number of components to fit p0_fit: Initial guess of parameters for MCMC. If least-squares, will just use this. Defaults to None fit_method: "mcmc" or "leastsq". "mcmc" will run emcee for each component and use fit parameters from that, whereas "leastsq" will only run an MCMC after a final fit has been found, to explore covariances. Defaults to "mcmc". save: Whether to save out or not, defaults to False overwrite: Whether to overwrite existing fits. Defaults to False progress: Whether to display progress bars or not. Defaults to False Returns: parameter medians """ # If we're doing MCMC if fit_method == "mcmc": with warnings.catch_warnings(): warnings.simplefilter("ignore") sampler = run_mcmc( data, error, n_comp=n_comp, save=save, overwrite=overwrite, progress=progress, p0_fit=p0_fit, ) # Get flat samples and calculate median parameters flat_samples = get_samples( sampler, burn_in_frac=glob_config["burn_in"], thin_frac=glob_config["thin"], ) parameter_median = np.nanmedian( flat_samples, axis=0, ) # Do a leastsq fit elif fit_method == "leastsq": # If we already have a p0, then we don't need to run this if p0_fit is not None: parameter_median = p0_fit else: parameter_median = get_p0_lmfit( data, error, n_comp=n_comp, ) else: raise ValueError(f"fit_method should be one of {ALLOWED_FIT_METHODS}") return parameter_median
[docs] def delta_bic_looper( data, error, fit_method="mcmc", initial_n_comp=None, initial_pars=None, save=False, overwrite=True, progress=False, ): """Increase spectral complexity until we hit diminishing returns This will iteratively build up the spectral model one component at a time, before looping backwards to remove the weaker components which might just be fits to the noise Args: data: Observed data error: Observed error fit_method: "mcmc" or "leastsq". "mcmc" will run emcee for each component and use fit parameters from that, whereas "leastsq" will only run an MCMC after a final fit has been found, to explore covariances. Defaults to "mcmc". initial_n_comp: Initial guess at number of components. Defaults to None. initial_pars: Initial guess at parameters. Defaults to None. save: Whether to save out or not, defaults to False overwrite: Whether to overwrite existing fits. Defaults to False progress: Whether to display progress bars or not. Defaults to False Returns: fitted number of components, likelihood, and the emcee sampler """ delta_bic = np.inf delta_aic = np.inf sampler_old = None sampler = None best_fit_pars = None best_fit_errs = None cov_matrix = None cov_med = None prop_len = len(glob_config["props"]) if initial_n_comp is None: # We start with a zero component model, i.e. a flat line n_comp = 0 parameter_median = None gof = calculate_goodness_of_fit( data=data, error=error, best_fit_pars=parameter_median, n_comp=n_comp, ) bic = gof["bic"] aic = gof["aic"] else: # Start with an initial guess of component numbers n_comp = int(initial_n_comp) # If we have initial number of components but not parameters, # then calculate them if initial_pars is None: parameter_median = get_parameters_from_fitting( data, error, n_comp=n_comp, fit_method=fit_method, save=save, overwrite=overwrite, progress=progress, ) # Else, take the passed parameters else: parameter_median = copy.deepcopy(initial_pars) # Calculate goodness of fit, pull out likelihood/BIC/AIC gof = calculate_goodness_of_fit( data=data, error=error, best_fit_pars=parameter_median, n_comp=n_comp, ) bic = gof["bic"] aic = gof["aic"] parameter_median_old = parameter_median bic_old = bic aic_old = aic while ( delta_bic > glob_config["delta_bic_cutoff"] or delta_aic > glob_config["delta_aic_cutoff"] ): # Store the previous BIC and sampler, since we need them later parameter_median_old = parameter_median sampler_old = sampler bic_old = bic aic_old = aic # Increase the number of components, refit n_comp += 1 parameter_median = get_parameters_from_fitting( data, error, n_comp=n_comp, fit_method=fit_method, save=save, overwrite=overwrite, progress=progress, ) # Calculate goodness of fit, pull out likelihood/BIC/AIC gof = calculate_goodness_of_fit( data=data, error=error, best_fit_pars=parameter_median, n_comp=n_comp, ) bic = gof["bic"] aic = gof["aic"] delta_bic = bic_old - bic delta_aic = aic_old - aic # Move back to the previous values parameter_median = parameter_median_old sampler = sampler_old bic = bic_old aic = aic_old n_comp -= 1 # Now loop backwards, iteratively remove the weakest component and refit. Only if we have a >0 order fit! if n_comp > 0: max_back_loops = n_comp for i in range(max_back_loops): if glob_config["fit_type"] == "lte": line_intensities = np.array( [ hyperfine_structure_lte( *parameter_median[prop_len * i : prop_len * i + prop_len], strength_lines=glob_config["strength_lines"], v_lines=glob_config["v_lines"], vel=glob_vel, ) for i in range(n_comp) ] ) elif glob_config["fit_type"] == "pure_gauss": line_intensities = np.array( [ hyperfine_structure_pure_gauss( *parameter_median[prop_len * i : prop_len * i + prop_len], strength_lines=glob_config["strength_lines"], v_lines=glob_config["v_lines"], vel=glob_vel, ) for i in range(n_comp) ] ) elif glob_config["fit_type"] == "radex": line_intensities = np.array( [ hyperfine_structure_radex( *parameter_median[prop_len * i : prop_len * i + prop_len], v_lines=glob_config["v_lines"], vel=glob_vel, ) for i in range(n_comp) ] ) else: logger.warning(f"Fit type {glob_config['fit_type']} not understood!") sys.exit() integrated_intensities = np.trapezoid(line_intensities, x=glob_vel, axis=-1) component_order = np.argsort(integrated_intensities) bic_old = bic aic_old = aic parameter_median_old = parameter_median sampler_old = sampler # Remove the weakest component n_comp -= 1 if n_comp == 0: parameter_median = None gof = calculate_goodness_of_fit( data=data, error=error, best_fit_pars=parameter_median, n_comp=n_comp, ) bic = gof["bic"] aic = gof["aic"] else: idx_to_delete = range( prop_len * component_order[0], prop_len * component_order[0] + prop_len, ) p0 = np.delete(parameter_median, idx_to_delete) parameter_median = get_parameters_from_fitting( data, error, n_comp=n_comp, p0_fit=p0, fit_method=fit_method, save=save, overwrite=overwrite, progress=progress, ) # Calculate goodness of fit, pull out likelihood/BIC/AIC gof = calculate_goodness_of_fit( data=data, error=error, best_fit_pars=parameter_median, n_comp=n_comp, ) bic = gof["bic"] aic = gof["aic"] delta_bic = bic_old - bic delta_aic = aic_old - aic # If removing and refitting doesn't significantly improve things, then just jump out of here if ( delta_bic < glob_config["delta_bic_cutoff"] and delta_aic < glob_config["delta_aic_cutoff"] ): break # Revert to previous sampler/n_comp parameter_median = parameter_median_old sampler = sampler_old n_comp += 1 # Finally, if we've been using leastsq method and we have fitted # components, then run an MCMC here if fit_method == "leastsq" and n_comp > 0: with warnings.catch_warnings(): warnings.simplefilter("ignore") sampler = run_mcmc( data, error, n_comp=n_comp, save=save, overwrite=overwrite, progress=progress, p0_fit=parameter_median, ) # Calculate the covariance matrix and the parameter medians/errors. # This is only defined if we end up with a 0-component fit tpeak_percentiles = None tpeak_diff = None if sampler is not None: samples = get_samples( sampler, burn_in_frac=glob_config["burn_in"], thin_frac=glob_config["thin"], ) cov_matrix = np.cov(samples.T) cov_med = np.median(samples, axis=0) param_percentiles = np.percentile(samples, [16, 50, 84], axis=0) best_fit_pars = param_percentiles[1, :] best_fit_errs = np.diff(param_percentiles, axis=0) # Calculate Tpeak with errors from the samples tpeak = sample_tpeak_per_component( samples=samples, n_comp=n_comp, ) tpeak_percentiles = np.percentile(tpeak, [16, 50, 84], axis=0) tpeak_diff = np.diff(tpeak_percentiles, axis=0) # Calculate goodness-of-fit parameters goodness_of_fit_metrics = calculate_goodness_of_fit( data=data, error=error, best_fit_pars=best_fit_pars, n_comp=n_comp, ) # Put everything into one big ol' dictionary fit_dict = { "props": {}, "props_err_down": {}, "props_err_up": {}, "n_comp": n_comp, "sampler": sampler, "cov_matrix": cov_matrix, "cov_med": cov_med, } fit_dict.update(goodness_of_fit_metrics) # Put in Tpeak/Tpeak errors if n_comp > 0: fit_dict["props"]["tpeak"] = tpeak_percentiles[1, :] fit_dict["props_err_down"]["tpeak"] = tpeak_diff[0, :] fit_dict["props_err_up"]["tpeak"] = tpeak_diff[1, :] # Put in property and property errors arr = np.zeros(n_comp) for i in range(n_comp): for prop_idx, prop in enumerate(glob_config["props"]): param_idx = i * len(glob_config["props"]) + prop_idx if prop not in fit_dict["props"]: fit_dict["props"][prop] = copy.deepcopy(arr) fit_dict["props_err_down"][prop] = copy.deepcopy(arr) fit_dict["props_err_up"][prop] = copy.deepcopy(arr) fit_dict["props"][prop][i] = best_fit_pars[param_idx] fit_dict["props_err_down"][prop][i] = best_fit_errs[0, param_idx] fit_dict["props_err_up"][prop][i] = best_fit_errs[1, param_idx] return fit_dict
[docs] def run_mcmc( data, error, n_comp=1, save=True, overwrite=False, progress=False, fit_dict_filename="fit_dict.pkl", p0_fit=None, ): """Run emcee to get a fit out Args: data: Observed data error: Observed uncertainty n_comp: Number of components to fit. Defaults to 1 save: Whether to save the sampler out. Defaults to True overwrite: Whether to overwrite existing fit. Defaults to False progress: Whether to display progress bar. Defaults to False fit_dict_filename: Name for the sampler. Defaults to fit_dict.pkl p0_fit: Initial guess for the fit. Defaults to None, which will use some basic parameters """ if not os.path.exists(fit_dict_filename) or overwrite: # If we don't have p0_fit, get from lmfit if p0_fit is None: p0_fit = get_p0_lmfit( data=data, error=error, n_comp=n_comp, ) # Ensure we have an array here if not isinstance(p0_fit, np.ndarray): p0_fit = np.array(p0_fit) n_dims = len(p0_fit) # If we have an adaptive number of walkers, account for that here if isinstance(glob_config["n_walkers"], str): n_walkers = int(glob_config["n_walkers"].split("*")[0]) * n_dims else: n_walkers = copy.deepcopy(glob_config["n_walkers"]) sampler = emcee_wrapper( data, error, p0=p0_fit, n_walkers=n_walkers, n_dims=n_dims, n_comp=n_comp, progress=progress, ) if save: # Calculate max likelihood flat_samples = get_samples( sampler, burn_in_frac=glob_config["burn_in"], thin_frac=glob_config["thin"], ) parameter_median = np.nanmedian( flat_samples, axis=0, ) likelihood = ln_like( theta=parameter_median, intensity=data, intensity_err=error, vel=glob_vel, strength_lines=glob_config["strength_lines"], v_lines=glob_config["v_lines"], props=glob_config["props"], n_comp=n_comp, fit_type=glob_config["fit_type"], ) fit_dict = { "sampler": sampler, "n_comp": n_comp, "likelihood": likelihood, } save_pkl(fit_dict, fit_dict_filename) else: fit_dict = load_pkl(fit_dict_filename) sampler = fit_dict["sampler"] return sampler
[docs] def get_p0_lmfit( data, error, n_comp=1, ): """Get initial guesses for parameters using lmfit Args: data: Observed data error: Observed uncertainty n_comp: Number of components to fit. Defaults to 1 """ prop_len = len(glob_config["props"]) bounds = glob_config["bounds"] * n_comp vel_idx = np.where(np.array(glob_config["props"]) == "v")[0][0] # Pull in any relevant kwargs kwargs = {} for config_dict in [glob_config["config_defaults"], glob_config["config"]]: if "lmfit" in config_dict: for key in config_dict["lmfit"]: kwargs[key] = config_dict["lmfit"][key] # We have a default here set to add minimizer_kwargs, # which will crash for minimizers that don't support this minimizers_with_minimizer_kwargs = [ "basinhopping", "dual_annealing", "shgo", ] if kwargs.get("method", "leastsq") not in minimizers_with_minimizer_kwargs: kwargs.pop("minimizer_kwargs", None) # Find any NaNs in data or error maps good_idx = np.where(np.logical_and(np.isfinite(data), np.isfinite(error))) # And the velocity resolution dv = np.abs(np.nanmedian(np.diff(glob_vel))) p0 = np.array(glob_config["p0"] * n_comp) # Do the default method, where we brute force through # the least squares if glob_config["lmfit_method"] == "default": # Move the velocities a channel to encourage parameter space hunting for i in range(n_comp): p0[prop_len * i + vel_idx] += i * dv params = Parameters() p0_idx = 0 for i in range(n_comp): for j in range(prop_len): params.add( f"{glob_config["props"][j]}_{i}", value=p0[p0_idx], min=bounds[j][0], max=bounds[j][1], ) p0_idx += 1 # Use lmfit to get an initial fit lmfit_result = minimize( fcn=initial_lmfit, params=params, args=( data[good_idx], error[good_idx], glob_vel[good_idx], glob_config["strength_lines"], glob_config["v_lines"], glob_config["props"], n_comp, glob_config["fit_type"], True, ), **kwargs, ) p0_fit = np.array( [lmfit_result.params[key].value for key in lmfit_result.params] ) # Get an initial guess for the velocities via an iterative method elif glob_config["lmfit_method"] == "iterative": # Start with a flat line model it_model = np.zeros(len(data)) params = Parameters() lmfit_result = None p0_fit = np.array([]) # Convolve will fail if there are NaNs in the spectrum, # so interpolate over them now data_interp = copy.deepcopy(data) nan_idx = ~np.isfinite(data_interp) x = np.arange(len(data_interp)) data_interp[nan_idx] = np.interp( x[nan_idx], x[~nan_idx], data_interp[~nan_idx], ) for comp in range(n_comp): it_data = data_interp - it_model # Find lines. We impose no flux cut here # flux_threshold = np.nanmedian(error) * u.K flux_threshold = None spec = Spectrum( flux=it_data * u.K, spectral_axis=glob_vel * u.km / u.s, ) found_lines = find_lines_derivative( spec, flux_threshold=flux_threshold, ) # Only take emission lines if we have any, else take everything emission_lines = found_lines[found_lines["line_type"] == "emission"] if len(emission_lines) > 0: found_lines = copy.deepcopy(emission_lines) # Now take these lines and order by flux found_line_fluxes = np.array( [float(np.abs(it_data[x])) for x in found_lines["line_center_index"]] ) found_line_vels = found_lines["line_center"].value # Sort from brightest to faintest, pick the brightest component idxs = np.argsort(found_line_fluxes)[::-1] found_line_vel = found_line_vels[idxs][0] # Having found the velocity, we then fit all components to the data p0 = np.array([float(x) for x in glob_config["p0"]]) p0[vel_idx] = copy.deepcopy(found_line_vel) # Update the parameters with any new best guesses if lmfit_result is not None: for key in lmfit_result.params: params[key].set(value=lmfit_result.params[key].value) for j in range(prop_len): params.add( f"{glob_config["props"][j]}_{comp}", value=p0[j], min=bounds[j][0], max=bounds[j][1], ) # Get a fit to the actual data lmfit_result = minimize( fcn=initial_lmfit, params=params, args=( data[good_idx], error[good_idx], glob_vel[good_idx], glob_config["strength_lines"], glob_config["v_lines"], glob_config["props"], comp + 1, glob_config["fit_type"], True, ), **kwargs, ) p0_fit = np.array( [lmfit_result.params[key].value for key in lmfit_result.params] ) it_model = multiple_components( p0_fit, vel=glob_vel, strength_lines=glob_config["strength_lines"], v_lines=glob_config["v_lines"], props=glob_config["props"], n_comp=comp + 1, fit_type=glob_config["fit_type"], log_tau=True, ) else: raise ValueError(f"lmfit_method should be one of {ALLOWED_LMFIT_METHODS}") # Sort p0 so it has monotonically increasing velocities v0_values = np.array([p0_fit[prop_len * i + vel_idx] for i in range(n_comp)]) v0_sort = v0_values.argsort() p0_fit_sort = [p0_fit[prop_len * i : prop_len * i + prop_len] for i in v0_sort] p0_fit = np.array([item for sublist in p0_fit_sort for item in sublist]) return p0_fit
[docs] def emcee_wrapper( data, error, p0, n_walkers, n_dims, n_comp=1, progress=False, ): """Light wrapper around emcee The runs the emcee part, with some custom moves and passes the arguments neatly to functions Args: data: Observed data error: Observed uncertainty p0: Initial guess position for the walkers n_walkers: Number of emcee walkers n_dims: Number of dimensions for the problem n_comp: Number of components to fit. Defaults to None progress: Whether or not to display progress bar. Defaults to False """ if glob_config["data_type"] == "spectrum": # Multiprocess here for speed with mp.Pool(glob_config["n_cores"]) as pool: sampler = run_mcmc_sampler( data=data, error=error, p0=p0, n_walkers=n_walkers, n_dims=n_dims, n_comp=n_comp, progress=progress, pool=pool, ) else: # Run in serial since the cube is multiprocessing already (no daemons here, not today satan) sampler = run_mcmc_sampler( data=data, error=error, p0=p0, n_walkers=n_walkers, n_dims=n_dims, n_comp=n_comp, progress=progress, ) return sampler
[docs] def run_mcmc_sampler( data, error, p0, n_walkers, n_dims, n_comp=1, progress=False, pool=None, ): """Tool for actually running emcee There are two options here, fixed and adaptive which determine how long to run the MCMC walkers for. See the docs for more details Args: data: Observed data error: Observed uncertainty p0: Initial position guess for the walkers n_walkers: Number of emcee walkers n_dims: Number of dimensions for the problem n_comp: Number of components to fit. Defaults to None progress: Whether or not to display progress bar. Defaults to False pool: If running in multiprocessing mode, this is the mp Pool """ # Set up the sampler sampler = emcee.EnsembleSampler( nwalkers=n_walkers, ndim=n_dims, log_prob_fn=ln_prob, args=( data, error, glob_vel, glob_config["strength_lines"], glob_config["v_lines"], glob_config["props"], glob_config["bounds"], n_comp, glob_config["fit_type"], ), moves=[(emcee.moves.DEMove(), 0.8), (emcee.moves.DESnookerMove(), 0.2)], pool=pool, ) # START INITIALISATION RUNS # The initial positions use data percentage pos = initialise_positions( p0=p0, n_walkers=n_walkers, n_dims=n_dims, n_comp=n_comp, data_percentage=True, ) for i in range(glob_config["n_initialisation"]): state = sampler.run_mcmc( pos, glob_config["n_initialisation_steps"], progress=progress, ) # Get where we're at the maximum likelihood for the next # round, but also keep all the positions around in case # we're done max_prob_idx = np.argmax(state.log_prob) pos = copy.deepcopy(state.coords) p0 = pos[max_prob_idx] sampler.reset() # Initialise the walkers for the next run from the maximum # likelihood estimate pos = initialise_positions( p0=p0, n_walkers=n_walkers, n_dims=n_dims, n_comp=n_comp, data_percentage=False, ) # Simple case where we have the fixed steps if glob_config["emcee_run_method"] == "fixed": sampler.run_mcmc( pos, glob_config["n_steps"], progress=progress, ) elif glob_config["emcee_run_method"] == "adaptive": # Set up a tau for testing convergence old_tau = np.inf for _ in sampler.sample( pos, iterations=glob_config["max_steps"], progress=progress, ): # Only check convergence every 100 steps if sampler.iteration % 100: continue # Compute the median autocorrelation time so far # Using tol=0 means that we'll always get an estimate even # if it isn't trustworthy tau = np.nanmedian(sampler.get_autocorr_time(tol=0)) # Check convergence converged = tau * glob_config["convergence_factor"] < sampler.iteration converged &= np.abs(old_tau - tau) / tau < glob_config["tau_change"] if converged: break old_tau = tau else: raise ValueError( f"emcee_run_method should be one of {ALLOWED_EMCEE_RUN_METHODS}, " f"not {glob_config['emcee_run_method']}" ) return sampler
[docs] def initialise_positions( p0, n_walkers, n_dims, n_comp, data_percentage=False, ): """Initialise positions for the walkers Args: p0: "best fit" values n_walkers: number of walkers n_dims: number of dimensions n_comp: number of components data_percentage: Whether to move walkers around based on a percentage of data. Defaults to False """ # Shuffle the parameters around a little. # If we're using some percentage of the data, include that here, # but avoid values less than 1 if data_percentage: p0_movement = np.max( np.array([1e-2 * np.abs(p0), 1e-2 * np.ones_like(p0)]), axis=0, ) else: p0_movement = 1e-4 * np.ones_like(p0) # Reinitialise the walkers at this point, wiggle around a small amount pos = p0 + p0_movement * np.random.randn(n_walkers, n_dims) # Enforce positive values for t_ex, width for the LTE/pure Gaussian fitting if glob_config["fit_type"] == "lte": positive_idx = [0, 3] elif glob_config["fit_type"] == "pure_gauss": positive_idx = [0, 2] elif glob_config["fit_type"] == "radex": positive_idx = [0, 1, 4] else: logger.warning(f"Fit type {glob_config['fit_type']} not understood!") sys.exit() prop_len = len(glob_config["props"]) enforced_positives = [ [prop_len * i + j] for i in range(n_comp) for j in positive_idx ] enforced_positives = [item for sublist in enforced_positives for item in sublist] for i in enforced_positives: pos[:, i] = np.abs(pos[:, i]) return pos
[docs] def parallel_fit_samples( ij, fit_dict_filename=None, consolidate_fit_dict=True, ): """Pull fit percentiles from a single pixel Will pull from the fit dict if we can, else will read in each individual file """ if fit_dict_filename is None: logger.warning("fit_dict_filename must be defined!") sys.exit() i = ij[0] j = ij[1] if consolidate_fit_dict: fit_dict = glob_mcfine_output["fit"].get(i, {}).get(j, {}) else: cube_sampler_filename = f"{fit_dict_filename}_{i}_{j}.pkl" fit_dict = load_pkl(cube_sampler_filename) n_comp = fit_dict["n_comp"] if n_comp == 0: return np.zeros([3, len(glob_vel)]) flat_samples = get_samples_from_fit_dict( fit_dict, burn_in_frac=glob_config["burn_in"], thin_frac=glob_config["thin"] ) fit_lines = get_fits_from_samples( flat_samples, vel=glob_vel, props=glob_config["props"], strength_lines=glob_config["strength_lines"], v_lines=glob_config["v_lines"], fit_type=glob_config["fit_type"], n_draws=100, n_comp=n_comp, ) fit_percentiles = np.nanpercentile( np.nansum(fit_lines, axis=-1), [16, 50, 84], axis=1, ) fit_diffs = np.diff(fit_percentiles, axis=0) # Put these into down error/nominal/up error array fit_final = np.array([fit_diffs[0, :], fit_percentiles[1, :], fit_diffs[1, :]]) return fit_percentiles
[docs] def get_fits_from_samples( samples, vel, props, strength_lines, v_lines, fit_type="lte", n_draws=100, n_comp=1, ): """Get a number of fit lines from an MCMC run Args: samples: emcee output vel: Velocity grid to evaluate the fit on props: List of properties strength_lines: List of line strengths fit_type: Fit type. Defaults to "lte" v_lines: List of line velocities n_draws (int): Number of draws to pull from samples n_comp (int): Number of components in the fit Returns: array of best fit line """ fit_lines = np.zeros([len(vel), n_draws, n_comp]) for draw in range(n_draws): sample = np.random.randint(low=0, high=samples.shape[0]) for i in range(n_comp): theta_draw = samples[sample, ...][ len(props) * i : len(props) * i + len(props) ] if fit_type == "lte": fit_lines[:, draw, i] = hyperfine_structure_lte( *theta_draw, strength_lines=strength_lines, v_lines=v_lines, vel=vel, ) elif fit_type == "pure_gauss": fit_lines[:, draw, i] = hyperfine_structure_pure_gauss( *theta_draw, strength_lines=strength_lines, v_lines=v_lines, vel=vel, ) elif fit_type == "radex": qn_ul = np.array(range(len(radex_grid["QN_ul"].values))) fit_lines[:, draw, i] = get_radex_multiple_components( theta_draw, vel=vel, v_lines=v_lines, qn_ul=qn_ul, ) return fit_lines
[docs] class HyperfineFitter: def __init__( self, data=None, vel=None, error=None, mask=None, config_file=None, local_file=None, ): """Multi-component, hyperfine MCMC line fitting. A fully-automated, highly customisable multi-component fitter. For full details, see Williams & Watkins. For tutorials, see the docs Args: data (np.ndarray): Either a 1D array of intensity (spectrum), a 3D array of intensities (cube), or a string, which will load in a cube using spectral_cube. If an array, intensities should be in K. error (np.ndarray): Array of errors in intensity, or a string to load in using spectral_cube. Should have the same shape as `data`. Defaults to None. mask (np.ndarray): 1/0 mask to indicate significant emission in the cube (i.e. the pixels to fit). Should have shape of `data.shape[1:]`. Defaults to None, which will fit all pixels in a cube. vel (np.ndarray): Array of velocity values that correspond to data, in km/s. Not required if loading in a spectral cube. Defaults to None. config_file (str): Path to config.toml file. Defaults to None, which will use the default settings local_file (str): Path to local.toml file. Defaults to None, which will use the default settings Todo: * Bounds as parameters to input here. * Test the radex fitting routines on cubes. """ # Let us know if ndradex has been imported if not NDRADEX_IMPORTED: logger.warning("ndradex not imported. RT capabilities disabled") if data is None: logger.warning("data should be provided!") sys.exit() if vel is None and not isinstance(data, str): logger.warning( "velocity definition should be provided! Defaulting to monotonically increasing" ) vel = np.arange(data.shape[0]) if error is None: logger.info("No error provided. Defaulting to 0") error = np.zeros_like(data) self.logger = logger # Load in the data spectral cube wcs = None wcs_2d = None if isinstance(data, str): data = SpectralCube.read(data) # Get WCS out wcs = data.wcs wcs_2d = data[0, :, :].wcs # Get the velocity axis out vel = data.spectral_axis.to(u.km / u.s).value # Convert to K, pull out values data = data.unmasked_data[:].to(u.K).value # Load in the error spectral cube if isinstance(error, str): error = SpectralCube.read(error) # Convert to K, pull out values error = error.unmasked_data[:].to(u.K).value self.data = data self.error = error self.vel = vel self.wcs = wcs self.wcs_2d = wcs_2d # Define global variables for potentially huge arrays global glob_data, glob_error, glob_vel glob_data = self.data glob_error = self.error glob_vel = self.vel self.downsampled_data = np.array([]) self.downsampled_error = np.array([]) self.downsampled_mask = np.array([]) with open(CONFIG_DEFAULT_PATH, "rb") as f: config_defaults = tomllib.load(f) with open(LOCAL_DEFAULT_PATH, "rb") as f: local_defaults = tomllib.load(f) if config_file is not None: with open(config_file, "rb") as f: config = tomllib.load(f) else: self.logger.info("No config file provided. Using in-built defaults") config = copy.deepcopy(config_defaults) if local_file is not None: with open(local_file, "rb") as f: local = tomllib.load(f) else: self.logger.info("No local file provided. Using in-built defaults") local = copy.deepcopy(local_defaults) self.config = config self.local = local self.config_defaults = config_defaults self.local_defaults = local_defaults fit_type = get_dict_val( self.config, self.config_defaults, table="fitting_params", key="fit_type", logger=self.logger, ) if fit_type not in ALLOWED_FIT_TYPES: self.logger.warning(f"Fit type {fit_type} not understood!") sys.exit() self.fit_type = fit_type lmfit_method = get_dict_val( self.config, self.config_defaults, table="fitting_params", key="lmfit_method", logger=self.logger, ) if lmfit_method not in ALLOWED_LMFIT_METHODS: self.logger.warning( f"lmfit_method should be one of {ALLOWED_LMFIT_METHODS}!" ) sys.exit() self.lmfit_method = lmfit_method if self.fit_type == "radex" and not NDRADEX_IMPORTED: raise ValueError("Cannot use mode radex if ndradex is not installed!") fit_method = get_dict_val( self.config, self.config_defaults, table="fitting_params", key="fit_method", logger=self.logger, ) if fit_method not in ALLOWED_FIT_METHODS: self.logger.warning(f"Fitting procedure {fit_method} not understood!") sys.exit() self.fit_method = fit_method # Are we consolidate the fit dictionary into one file? consolidate_fit_dict = get_dict_val( self.config, self.config_defaults, table="fitting_params", key="consolidate_fit_dict", logger=self.logger, ) self.consolidate_fit_dict = consolidate_fit_dict # Are we keeping the sampler/covariance matrix? keep_sampler = get_dict_val( self.config, self.config_defaults, table="fitting_params", key="keep_sampler", logger=self.logger, ) self.keep_sampler = keep_sampler keep_covariance = get_dict_val( self.config, self.config_defaults, table="fitting_params", key="keep_covariance", logger=self.logger, ) self.keep_covariance = keep_covariance # If we're not keeping anything around, freak out if not self.keep_sampler and not self.keep_covariance: raise ValueError( "You have to keep around at least one of the sampler and covariance matrix!" ) self.dv = np.abs(np.nanmedian(np.diff(self.vel))) if self.data.ndim == 1: self.data_type = "spectrum" else: self.data_type = "cube" self.logger.info(f"Detected data type as {self.data_type}") if self.data_type == "cube": if mask is None: self.logger.info("No mask provided. Including every pixel") mask = np.ones([data.shape[1], data.shape[2]]) if self.data.shape != self.error.shape: self.logger.warning("Data and error should be the same shape") sys.exit() if self.data.shape[1:] != mask.shape: self.logger.warning("Mask should be 2D projection of data") sys.exit() self.mask = mask line = get_dict_val( self.config, self.config_defaults, table="fitting_params", key="line", logger=self.logger, ) if line not in allowed_lines: self.logger.warning(f"Line {line} not understood!") sys.exit() self.line = line if self.fit_type == "lte": self.strength_lines = np.array( [ strength_lines_dict[line_name] for line_name in strength_lines_dict.keys() if self.line in line_name ] ) self.v_lines = np.array( [ v_lines_dict[line_name] for line_name in v_lines_dict.keys() if self.line in line_name ] ) self.bounds = [ (T_BACKGROUND, 1e3), (np.log(0.1), np.log(30)), (np.nanmin(self.vel), np.nanmax(self.vel)), (self.dv / 2.355, 500), ] elif self.fit_type == "pure_gauss": self.strength_lines = np.array( [ strength_lines_dict[line_name] for line_name in strength_lines_dict.keys() if self.line in line_name ] ) self.v_lines = np.array( [ v_lines_dict[line_name] for line_name in v_lines_dict.keys() if self.line in line_name ] ) self.bounds = [ (0, 1e3), (np.nanmin(self.vel), np.nanmax(self.vel)), (self.dv / 2.355, 500), ] elif self.fit_type == "radex": rest_freq = get_dict_val( self.config, self.config_defaults, table="fitting_params", key="rest_freq", logger=self.logger, ) if rest_freq == "": self.logger.warning("rest_freq should be defined") sys.exit() if isinstance(rest_freq, float) or isinstance(rest_freq, int): rest_freq = rest_freq * u.GHz self.strength_lines = None self.transitions = [ transition_lines[line_name] for line_name in transition_lines.keys() if self.line in line_name ] freq = ( np.array( [ freq_lines[line_name] for line_name in freq_lines.keys() if self.line in line_name ] ) * u.GHz ) freq_to_vel = u.doppler_radio(rest_freq) self.v_lines = freq.to(u.km / u.s, equivalencies=freq_to_vel).value self.bounds = [ (T_BACKGROUND, 75), (13, 15), (5, 8), (np.nanmin(self.vel), np.nanmax(self.vel)), (self.dv / 2.355, 500), ] radex_datafile = get_dict_val( self.local, self.local_defaults, table="local", key="radex_datafile", logger=self.logger, ) self.radex_datafile = radex_datafile p0 = get_dict_val( self.config, self.config_defaults, table="initial_guess", key=fit_type, logger=self.logger, ) self.p0 = p0 if not len(self.bounds) == len(self.p0): self.logger.warning("bounds and p0 should have the same length!") sys.exit() delta_bic_cutoff = get_dict_val( self.config, self.config_defaults, table="mcmc", key="delta_bic_cutoff", logger=self.logger, ) self.delta_bic_cutoff = delta_bic_cutoff delta_aic_cutoff = get_dict_val( self.config, self.config_defaults, table="mcmc", key="delta_aic_cutoff", logger=self.logger, ) self.delta_aic_cutoff = delta_aic_cutoff emcee_run_method = get_dict_val( self.config, self.config_defaults, table="mcmc", key="emcee_run_method", logger=self.logger, ) if emcee_run_method not in ALLOWED_EMCEE_RUN_METHODS: raise ValueError( f"emcee_run_method should be one of {ALLOWED_EMCEE_RUN_METHODS}, not {emcee_run_method}" ) self.emcee_run_method = emcee_run_method # Variables for the adaptive fitting method max_steps = get_dict_val( self.config, self.config_defaults, table="mcmc", key="max_steps", logger=self.logger, ) self.max_steps = max_steps convergence_factor = get_dict_val( self.config, self.config_defaults, table="mcmc", key="convergence_factor", logger=self.logger, ) self.convergence_factor = convergence_factor tau_change = get_dict_val( self.config, self.config_defaults, table="mcmc", key="tau_change", logger=self.logger, ) self.tau_change = tau_change thin = get_dict_val( self.config, self.config_defaults, table="mcmc", key="thin", logger=self.logger, ) self.thin = thin burn_in = get_dict_val( self.config, self.config_defaults, table="mcmc", key="burn_in", logger=self.logger, ) self.burn_in = burn_in n_initialisation = get_dict_val( self.config, self.config_defaults, table="mcmc", key="n_initialisation", logger=self.logger, ) self.n_initialisation = n_initialisation n_initialisation_steps = get_dict_val( self.config, self.config_defaults, table="mcmc", key="n_initialisation_steps", logger=self.logger, ) self.n_initialisation_steps = n_initialisation_steps # Variables for fixed fitting method n_steps = get_dict_val( self.config, self.config_defaults, table="mcmc", key="n_steps", logger=self.logger, ) self.n_steps = n_steps n_walkers = get_dict_val( self.config, self.config_defaults, table="mcmc", key="n_walkers", logger=self.logger, ) self.n_walkers = n_walkers n_cores = get_dict_val( self.local, self.local_defaults, table="local", key="n_cores", logger=self.logger, ) if n_cores == "": self.n_cores = mp.cpu_count() else: self.n_cores = n_cores if self.fit_type == "lte": self.props = [ "tex", "tau", "v", "sigma", ] self.labels = [ r"$T_\mathrm{ex}$ (%s)", r"$\log(\tau)$ (%s)", r"$v$ (%s)", r"$\sigma$ (%s)", ] elif self.fit_type == "pure_gauss": self.props = [ "t", "v", "sigma", ] self.labels = [ r"$T$ (%s)", r"$v$ (%s)", r"$\sigma$ (%s)", ] elif self.fit_type == "radex": self.props = [ "t_kin", "N_col", "n_h2", "v", "sigma", ] self.labels = [ r"$T_\mathrm{kin}$ (%s)", r"$N_\mathrm{col}$ (%s)", r"$n_\mathrm{H2}$ (%s)", r"$v$ (%s)", r"$\sigma$ (%s)", ] else: self.logger.warning(f"fit_type {self.fit_type} not known") sys.exit() # Define a global configuration dictionary that we'll use in multiprocessing global glob_config keys_to_glob = [ "config_defaults", "config", "data_type", "props", "p0", "bounds", "strength_lines", "v_lines", "delta_bic_cutoff", "delta_aic_cutoff", "lmfit_method", "fit_type", "emcee_run_method", "n_cores", "n_walkers", "n_steps", "n_initialisation", "n_initialisation_steps", "burn_in", "thin", "convergence_factor", "tau_change", "max_steps", "keep_sampler", "keep_covariance", ] for k in keys_to_glob: glob_config[k] = self.__dict__[k] self.parameter_maps = None
[docs] def initialise_mcfine_output(self, data=None, data_type="original"): """Initialise the mcfine output, which stores useful information about the fit run Args: data: Array of data data_type: Type of data. Should be "original" or "downsampled". Will not take WCS for downsampled data """ mcfine_output = {} # Start by pulling in the configurations mcfine_output["user_configuration_settings"] = copy.deepcopy(self.config) mcfine_output["default_configuration_settings"] = copy.deepcopy( self.config_defaults ) mcfine_output["user_local_settings"] = copy.deepcopy(self.local) mcfine_output["default_local_settings"] = copy.deepcopy(self.local_defaults) # Then information about the data, which should only exist if we're fitting a cube if data is not None: mcfine_output["data"] = {} mcfine_output["data"]["shape"] = data.shape # Add in the WCS, so we can spit out fits files later if self.wcs is not None and data_type != "downsampled": mcfine_output["data"]["wcs"] = copy.deepcopy(self.wcs) mcfine_output["data"]["wcs_2d"] = copy.deepcopy(self.wcs_2d) # And finally, a space for the fits if self.consolidate_fit_dict: mcfine_output["fit"] = {} return mcfine_output
[docs] def generate_radex_grid( self, output_file=None, ): """Pre-generate RADEX grid, given various bounds and steps This will be the grid we interpolate over for speed later. As such, the parameters should be set to cover the parameter range you care about. Parameters are stored in config.toml Args: output_file (str): Output filename. Defaults to None, which will pull from config.toml """ if self.fit_type != "radex": self.logger.warning("fit_type should be radex") sys.exit() if output_file is None: output_file = get_dict_val( self.config, self.config_defaults, table="generate_radex_grid", key="output_file", logger=self.logger, ) t_kin = get_dict_val( self.config, self.config_defaults, table="generate_radex_grid", key="t_kin", logger=self.logger, ) n_mol = get_dict_val( self.config, self.config_defaults, table="generate_radex_grid", key="n_mol", logger=self.logger, ) n_h2 = get_dict_val( self.config, self.config_defaults, table="generate_radex_grid", key="n_h2", logger=self.logger, ) dv = get_dict_val( self.config, self.config_defaults, table="generate_radex_grid", key="dv", logger=self.logger, ) if t_kin == "": t_kin = self.bounds[0] if n_mol == "": n_mol = self.bounds[1] if n_h2 == "": n_h2 = self.bounds[2] if dv == "": dv = self.bounds[4] t_kin_step = get_dict_val( self.config, self.config_defaults, table="generate_radex_grid", key="t_kin_step", logger=self.logger, ) n_mol_step = get_dict_val( self.config, self.config_defaults, table="generate_radex_grid", key="n_mol_step", logger=self.logger, ) n_h2_step = get_dict_val( self.config, self.config_defaults, table="generate_radex_grid", key="n_h2_step", logger=self.logger, ) dv_step = get_dict_val( self.config, self.config_defaults, table="generate_radex_grid", key="dv_step", logger=self.logger, ) geom = get_dict_val( self.config, self.config_defaults, table="generate_radex_grid", key="geom", logger=self.logger, ) progress = get_dict_val( self.config, self.config_defaults, table="generate_radex_grid", key="progress", logger=self.logger, ) f_name = inspect.currentframe().f_code.co_name overwrite = check_overwrite(self.config, f_name) if not os.path.exists(output_file) or overwrite: t_kin_array = np.arange(t_kin[0], t_kin[1] + t_kin_step, t_kin_step) n_mol_array = np.arange(n_mol[0], n_mol[1] + n_mol_step, n_mol_step) n_h2_array = np.arange(n_h2[0], n_h2[1] + n_h2_step, n_h2_step) dv_array = ( np.arange(dv[0], dv[1] + dv_step, dv_step) * 2.355 ) # 2.355 since distinction between sigma/dv ds = ndradex.run( self.radex_datafile, self.transitions, T_kin=t_kin_array, N_mol=10**n_mol_array, n_H2=10**n_h2_array, dv=dv_array, T_bg=T_BACKGROUND, geom=geom, progress=progress, n_procs=self.n_cores, ) ndradex.save_dataset(ds, output_file) else: ds = ndradex.load_dataset(output_file) global radex_grid radex_grid = ds
[docs] def get_props( self, fit_dict, ): """Get properties out from a fit dictionary Args: fit_dict: Dictionary of parameters """ # Only do this if we actually have a fitted number of components if fit_dict.get("n_comp", 0) > 0: props = [ float(fit_dict["props"][p][n]) for n in range(fit_dict["n_comp"]) for p in self.props ] else: props = [] return props
[docs] def downsample_fitter( self, fit_dict_filename=None, mcfine_output_filename=None, downsample_factor=10, ): """Run multicomponent fitter on downsampled data This is a light wrapper around multicomponent fitter, but also takes Args: fit_dict_filename: Filename to save the fitted emcee walkers to. Defaults to None, which will not save anything mcfine_output_filename: Filename to save the mcfine output to. Defaults to None, which will not save anything downsample_factor: Factor to downsample the data down on each spatial axis """ if self.data_type != "cube": raise ValueError("Can only do downsample fitting on cubes") f_name = inspect.currentframe().f_code.co_name overwrite = check_overwrite(self.config, f_name) # For data and error, we want some representative values within the averaging # area. Otherwise, we can hugely boost S/N and end up fitting all the blended # components, which is not what we want! downsampled_data = downsample( self.data, chunk_size=downsample_factor, func=np.nanmedian ) downsampled_error = downsample( self.error, chunk_size=downsample_factor, func=np.nanmedian ) # Mask is a little different. Take sum and reduce to bool downsampled_mask = downsample( self.mask, chunk_size=downsample_factor, func=np.nansum ) downsampled_mask[downsampled_mask < 1] = 0 downsampled_mask[downsampled_mask >= 1] = 1 self.downsampled_data = downsampled_data self.downsampled_error = downsampled_error self.downsampled_mask = downsampled_mask global glob_downsampled_data, glob_downsampled_error glob_downsampled_data = downsampled_data glob_downsampled_error = downsampled_error if mcfine_output_filename is None: mcfine_output_filename = get_dict_val( self.config, self.config_defaults, table="multicomponent_fitter", key="mcfine_output_filename", logger=self.logger, ) if not os.path.exists(f"{mcfine_output_filename}.pkl") or overwrite: mcfine_output = self.multicomponent_fitter( fit_dict_filename=fit_dict_filename, mcfine_output_filename=mcfine_output_filename, data_type="downsampled", ) # Save the output save_pkl( mcfine_output, f"{mcfine_output_filename}.pkl", ) else: mcfine_output = load_pkl(f"{mcfine_output_filename}.pkl") # Get n_comp and parameters out and sample back to the original grid. If we've # got the info in the fit dict, this is trivial if "fit" in mcfine_output: n_comp = np.array( [ [ mcfine_output["fit"].get(i, {}).get(j, {}).get("n_comp", 0) for i in range(downsampled_data.shape[1]) ] for j in range(downsampled_data.shape[2]) ] ).T theta = np.array( [ [ self.get_props(mcfine_output["fit"].get(i, {}).get(j, {})) for i in range(downsampled_data.shape[1]) ] for j in range(downsampled_data.shape[2]) ], dtype=list, ).T # Otherwise, loop over files and pull out info else: n_comp = np.zeros(downsampled_data.shape[1:]) theta = np.empty(downsampled_data.shape[1:], dtype=list) if fit_dict_filename is None: fit_dict_filename = get_dict_val( self.config, self.config_defaults, table="multicomponent_fitter", key="fit_dict_filename", logger=self.logger, ) for i in range(downsampled_data.shape[1]): for j in range(downsampled_data.shape[2]): fit_dict_f = f"{fit_dict_filename}_{i}_{j}.pkl" if os.path.exists(fit_dict_f): fit_dict = load_pkl(fit_dict_f) n_comp[i, j] = fit_dict["n_comp"] theta[i, j] = self.get_props(fit_dict) else: theta[i, j] = [] # Get indices for the downsampled and upsampled arrays i_us = np.arange(self.data.shape[1]) j_us = np.arange(self.data.shape[2]) i_ds = np.arange(downsample_factor, self.data.shape[1], downsample_factor) j_ds = np.arange(downsample_factor, self.data.shape[2], downsample_factor) n_comp_upsampled = np.zeros(self.data.shape[1:]) theta_upsampled = np.empty(self.data.shape[1:], dtype=list) i_split = np.array_split(i_us, i_ds) j_split = np.array_split(j_us, j_ds) for i_idx, i in enumerate(i_split): for j_idx, j in enumerate(j_split): # Get min/max indices to map back to the array i_low = np.min(i) i_high = np.max(i) j_low = np.min(j) j_high = np.max(j) n_comp_upsampled[i_low : i_high + 1, j_low : j_high + 1] = n_comp[ i_idx, j_idx ] for i_t in np.arange(i_low, i_high + 1): for j_t in np.arange(j_low, j_high + 1): theta_upsampled[i_t, j_t] = theta[i_idx, j_idx] global glob_initial_n_comp, glob_initial_theta glob_initial_n_comp = copy.deepcopy(n_comp_upsampled) glob_initial_theta = copy.deepcopy(theta_upsampled) return True
[docs] def multicomponent_fitter( self, fit_dict_filename=None, mcfine_output_filename=None, data_type="original", ): """Run the multicomponent fitter This runs everything, essentially, and is the part that you should call. It'll do LMFIT to get guesses, emcee to properly sample parameter space and then iteratively add components before removing them. Args: fit_dict_filename: Filename to save the fitted emcee walkers to. Defaults to None, which will not save anything mcfine_output_filename: Filename to save the mcfine output to. Defaults to None, which will not save anything data_type: Data type to fit. Can be either "original" or "downsampled" """ if data_type not in ["original", "downsampled"]: raise ValueError("Data type to fit should be original or downsampled") f_name = inspect.currentframe().f_code.co_name overwrite = check_overwrite(self.config, f_name) self.logger.info("Starting multi-component fitting") if mcfine_output_filename is None: mcfine_output_filename = get_dict_val( self.config, self.config_defaults, table="multicomponent_fitter", key="mcfine_output_filename", logger=self.logger, ) if not os.path.exists(f"{mcfine_output_filename}.pkl") or overwrite: if fit_dict_filename is None: fit_dict_filename = get_dict_val( self.config, self.config_defaults, table="multicomponent_fitter", key="fit_dict_filename", logger=self.logger, ) # Create output directory if it doesn't exist fit_dict_base_dir = os.path.dirname(fit_dict_filename) if not os.path.exists(fit_dict_base_dir): os.makedirs(fit_dict_base_dir) chunksize = get_dict_val( self.config, self.config_defaults, table="multicomponent_fitter", key="chunksize", logger=self.logger, ) progress = get_dict_val( self.config, self.config_defaults, table="multicomponent_fitter", key="progress", logger=self.logger, ) save = get_dict_val( self.config, self.config_defaults, table="multicomponent_fitter", key="save", logger=self.logger, ) if data_type == "original": data = copy.deepcopy(self.data) error = copy.deepcopy(self.error) mask = copy.deepcopy(self.mask) elif data_type == "downsampled": data = copy.deepcopy(self.downsampled_data) error = copy.deepcopy(self.downsampled_error) mask = copy.deepcopy(self.downsampled_mask) # If somehow we've broken the logic here, then freak out if data.size == 0: raise ValueError( "Must run data downsampling if you want to fit downsampled data" ) else: raise ValueError("Data type to fit should be original or downsampled") # Start fitting! mcfine_output = None if self.data_type == "spectrum": mcfine_output = self.initialise_mcfine_output() if not os.path.exists(f"{mcfine_output_filename}.pkl") or overwrite: fit_dict = delta_bic_looper( data, error, fit_method=self.fit_method, progress=progress, ) if not self.keep_sampler: fit_dict.pop("sampler") if not self.keep_covariance: fit_dict.pop("cov_matrix") fit_dict.pop("cov_med") if self.consolidate_fit_dict: mcfine_output["fit"].update(fit_dict) if save: save_pkl(fit_dict, f"{fit_dict_filename}.pkl") save_pkl(mcfine_output, f"{mcfine_output_filename}.pkl") elif self.data_type == "cube": mcfine_output = self.initialise_mcfine_output( data=data, data_type=data_type, ) ij_list = [ (i, j) for i in range(data.shape[1]) for j in range(data.shape[2]) if mask[i, j] != 0 ] self.logger.info(f"Fitting using {self.n_cores} cores") # Do the fitting. This requires saving out files, since # we just return a filename with mp.Pool(self.n_cores) as pool: result = list( tqdm( pool.imap( partial( parallel_fitting, fit_dict_filename=fit_dict_filename, data_type=data_type, fit_method=self.fit_method, overwrite=overwrite, ), ij_list, chunksize=chunksize, ), total=len(ij_list), dynamic_ncols=True, ) ) for idx, ij in enumerate(ij_list): if ij[0] not in mcfine_output["fit"]: mcfine_output["fit"][ij[0]] = {} if ij[1] not in mcfine_output["fit"][ij[0]]: mcfine_output["fit"][ij[0]][ij[1]] = {} fit_dict = load_pkl(result[idx]) # Consolidate fit dictionaries if self.consolidate_fit_dict: mcfine_output["fit"][ij[0]][ij[1]].update(fit_dict) if save: save_pkl(mcfine_output, f"{mcfine_output_filename}.pkl") else: mcfine_output = load_pkl(f"{mcfine_output_filename}.pkl") return mcfine_output
[docs] def encourage_spatial_coherence( self, input_dir="fit", output_dir="fit_coherence", fit_dict_filename=None, mcfine_input_filename=None, mcfine_output_filename=None, reverse_direction=False, ): """Loop over fits to encourage spatial coherence This will loop over RA/Dec to potentially replace fits with neighbouring fits. This helps to encourage spatial coherence and can also help in the case of catastrophic misfits Args: input_dir: directory for the input fits. Defaults to 'fit' output_dir: directory for the output of the coherence checking. Defaults to 'fit_coherence' fit_dict_filename: Filename structure for the fit dictionary. Default to None, which will choose some generic name mcfine_input_filename: Name for the consolidated mcfine output to start with. Defaults to None, which will pull the default name "mcfine" mcfine_output_filename: Name for the consolidated mcfine output to save at the end. Defaults to None, which will pull the default name "mcfine_coherence" reverse_direction: Whether to reverse how we step through x/y in the coherence encouragement. Defaults to False, but likely you should run one case forward, then another backward """ if self.data_type != "cube": self.logger.warning("Can only do spatial coherence on a cube!") sys.exit() f_name = inspect.currentframe().f_code.co_name overwrite = check_overwrite(self.config, f_name) if fit_dict_filename is None: fit_dict_filename = get_dict_val( self.config, self.config_defaults, table="multicomponent_fitter", key="fit_dict_filename", logger=self.logger, ) if mcfine_input_filename is None: mcfine_input_filename = get_dict_val( self.config, self.config_defaults, table="multicomponent_fitter", key="mcfine_output_filename", logger=self.logger, ) if not os.path.exists(f"{mcfine_input_filename}.pkl"): raise FileNotFoundError( f"{mcfine_input_filename}.pkl not found! Make sure this exists" ) # If we don't have an output filename, take the default plus a "coherence" appended if mcfine_output_filename is None: mcfine_output_filename = get_dict_val( self.config, self.config_defaults, table="multicomponent_fitter", key="mcfine_output_filename", logger=self.logger, ) mcfine_output_filename += "_coherence" if os.path.exists(f"{mcfine_output_filename}.pkl") and not overwrite: return True if reverse_direction: step = -1 else: step = 1 if not os.path.exists(output_dir): os.makedirs(output_dir) # Test hardlinks work hardlinks_supported = True hl_input = os.path.join(input_dir, "hl_test.txt") hl_output = os.path.join(output_dir, "hl_test.txt") os.system(f"touch {hl_input}") try: os.link(hl_input, hl_output) except: hardlinks_supported = False os.system(f"rm {hl_output}") os.system(f"rm {hl_input}") if overwrite: # Flush out the directory files_in_dir = glob.glob(os.path.join(output_dir, "*")) for file_in_dir in files_in_dir: os.remove(file_in_dir) if os.path.exists(f"{mcfine_output_filename}.pkl"): os.remove(f"{mcfine_output_filename}.pkl") total_found = 0 ij_list = [ (i, j) for i in range(self.data.shape[1])[::step] for j in range(self.data.shape[2])[::step] if self.mask[i, j] != 0 ] # Read in the previous output dictionary, to define the new one mcfine_output = load_pkl(f"{mcfine_input_filename}.pkl") # Set up a dictionary to record best-fit parameters, to # reduce I/O fit_params = {} for ij in tqdm( ij_list, dynamic_ncols=True, ): i, j = ij[0], ij[1] input_file = os.path.join(input_dir, f"{fit_dict_filename}_{i}_{j}.pkl") output_file = os.path.join(output_dir, f"{fit_dict_filename}_{i}_{j}.pkl") if not os.path.exists(output_file) or overwrite: # Pull original likelihood for the pixel fit_dict = load_pkl(input_file) bic_original = fit_dict["bic"] aic_original = fit_dict["aic"] # Pull out the neighbouring pixels i_min = max(0, i - 1) i_max = min(self.data.shape[1], i + 2) j_min = max(0, j - 1) j_max = min(self.data.shape[2], j + 2) delta_bic = np.zeros([i_max - i_min, j_max - j_min]) delta_aic = np.zeros_like(delta_bic) likelihood_cutout = np.zeros_like(delta_bic) for i_cutout, i_full in enumerate(range(i_min, i_max)): for j_cutout, j_full in enumerate(range(j_min, j_max)): if self.mask[i_full, j_full] == 0: continue # If we haven't already looked at this one, pull in # the fit dictionary and get parameters out fit_params_key = f"{i_full}_{j_full}" if fit_params_key not in fit_params: # Check if we already have moved the fit dictionary cutout_fit_dict_filename = os.path.join( output_dir, f"{fit_dict_filename}_{i_full}_{j_full}.pkl", ) if not os.path.exists(cutout_fit_dict_filename): cutout_fit_dict_filename = os.path.join( input_dir, f"{fit_dict_filename}_{i_full}_{j_full}.pkl", ) cutout_fit_dict = load_pkl(cutout_fit_dict_filename) fit_params[fit_params_key] = {} fit_params[fit_params_key].update(cutout_fit_dict) # Pull out the parameters theta = self.get_props(cutout_fit_dict) fit_params[fit_params_key]["theta"] = theta n_comp_new = fit_params[fit_params_key]["n_comp"] theta = fit_params[fit_params_key]["theta"] # Get the various goodness of fit metrics gof_metrics = calculate_goodness_of_fit( data=self.data[:, i, j], error=self.error[:, i, j], best_fit_pars=theta, n_comp=n_comp_new, ) delta_bic[i_cutout, j_cutout] = ( bic_original - gof_metrics["bic"] ) delta_aic[i_cutout, j_cutout] = ( aic_original - gof_metrics["aic"] ) likelihood_cutout[i_cutout, j_cutout] = gof_metrics[ "likelihood" ] # Set ones where we don't have a meaningful value to something small delta_bic[delta_bic == 0] = -1000 delta_aic[delta_aic == 0] = -1000 # Find the position of maximum change in BIC between pixels. idx = np.unravel_index(np.argmax(delta_bic, axis=None), delta_bic.shape) fit_updated = False # Use both the BIC and AIC criterion here to distinguish, we require both to be significant if ( delta_bic[idx] > self.delta_bic_cutoff and delta_aic[idx] > self.delta_aic_cutoff ): total_found += 1 # If we're replacing with a file we've already replaced, pull from the output directory. Else # pull from the input directory. input_file = os.path.join( output_dir, f"{fit_dict_filename}_{idx[0] + i_min}_{idx[1] + j_min}.pkl", ) if not os.path.exists(input_file): input_file = os.path.join( input_dir, f"{fit_dict_filename}_{idx[0] + i_min}_{idx[1] + j_min}.pkl", ) fit_updated = True # If the fits have been updated, then update the likelihood and save out if fit_updated: updated_fit_dict = load_pkl(input_file) # Update the fit parameter dictionary fit_param_key = f"{i}_{j}" fit_params[fit_param_key].update(updated_fit_dict) # Pull out parameters theta = self.get_props(updated_fit_dict) fit_params[fit_param_key]["theta"] = copy.deepcopy(theta) # Update goodness of fit n_comp = updated_fit_dict["n_comp"] gof_metrics = calculate_goodness_of_fit( data=self.data[:, i, j], error=self.error[:, i, j], best_fit_pars=theta, n_comp=n_comp, ) updated_fit_dict.update(gof_metrics) # Save out the updated fit dictionary save_pkl(updated_fit_dict, output_file) if self.consolidate_fit_dict: mcfine_output["fit"][i][j].update(updated_fit_dict) # Move the right file to the new directory. Use hardlinks to minimize space if possible else: if hardlinks_supported: os.link(input_file, output_file) else: os.system(f"cp {input_file} {output_file}") # Otherwise, load in and potentially update the dictionary else: # Do a quick comparison to see if the file has changed if not filecmp.cmp(input_file, output_file): total_found += 1 if self.consolidate_fit_dict: updated_fit_dict = load_pkl(output_file) mcfine_output["fit"][i][j].update(updated_fit_dict) self.logger.info(f"Number replaced: {total_found}") if not os.path.exists(f"{mcfine_output_filename}.pkl") or overwrite: save_pkl(mcfine_output, f"{mcfine_output_filename}.pkl") return True
[docs] def create_fit_cube( self, fit_dict_filename=None, mcfine_output_filename=None, cube_filename=None, ): """Create upper/lower errors for spectral fit plots Will preferentially get this from the mcfine_output, which is quicker and reduces I/O. Otherwise, will pull in each individual fit If WCS is present, this will be saved out as a multi-extension fits. Otherwise, will save as a 4-D numpy array. Args: fit_dict_filename (str): Name for the filename of fitted parameter dictionary. Defaults to None, which will pull from config.toml mcfine_output_filename (str): Name for the mcfine output. Defaults to None, which will pull from config.toml cube_filename (str): Name for the filename of output cube. Defaults to None, which will pull from config.toml """ if self.data_type != "cube": self.logger.warning("Can only make fit cubes for cubes") sys.exit() f_name = inspect.currentframe().f_code.co_name overwrite = check_overwrite(self.config, f_name) if fit_dict_filename is None: fit_dict_filename = get_dict_val( self.config, self.config_defaults, table="multicomponent_fitter", key="fit_dict_filename", logger=self.logger, ) if mcfine_output_filename is None: mcfine_output_filename = get_dict_val( self.config, self.config_defaults, table="multicomponent_fitter", key="mcfine_output_filename", logger=self.logger, ) if not os.path.exists(f"{mcfine_output_filename}.pkl"): raise FileNotFoundError(f"{mcfine_output_filename}.pkl does not exist") if cube_filename is None: cube_filename = get_dict_val( self.config, self.config_defaults, table="create_fit_cube", key="cube_filename", logger=self.logger, ) chunksize = get_dict_val( self.config, self.config_defaults, table="create_fit_cube", key="chunksize", logger=self.logger, ) # If we have WCS, we create fits files if self.wcs is not None: cube_filename = f"{cube_filename}.fits" else: cube_filename = f"{cube_filename}.npy" if not os.path.exists(cube_filename) or overwrite: # Load in the mcfine output as a global variable global glob_mcfine_output glob_mcfine_output = load_pkl(f"{mcfine_output_filename}.pkl") ij_list = [ (i, j) for i in range(self.data.shape[1]) for j in range(self.data.shape[2]) if self.mask[i, j] != 0 ] # Setup fit cube. If we have WCS info, then this is 3 # fits cubes if self.wcs is not None: init_data = np.ones([*self.data.shape]) * np.nan main_hdu = fits.PrimaryHDU(header=self.wcs.to_header()) cube_hdu = fits.ImageHDU( data=init_data, header=self.wcs.to_header(), name="NOMINAL", ) err_down_hdu = fits.ImageHDU( data=init_data, header=self.wcs.to_header(), name="ERR_DOWN", ) err_up_hdu = fits.ImageHDU( data=init_data, header=self.wcs.to_header(), name="ERR_UP", ) fit_cube = fits.HDUList([main_hdu, cube_hdu, err_down_hdu, err_up_hdu]) else: fit_cube = np.zeros([3, *self.data.shape], dtype=np.float32) with mp.Pool(self.n_cores) as pool: map_result = list( tqdm( pool.imap( partial( parallel_fit_samples, fit_dict_filename=fit_dict_filename, consolidate_fit_dict=self.consolidate_fit_dict, ), ij_list, chunksize=chunksize, ), total=len(ij_list), dynamic_ncols=True, ) ) for idx, ij in enumerate(ij_list): if self.wcs is not None: fit_cube["ERR_DOWN"].data[:, ij[0], ij[1]] = map_result[idx][0, :] fit_cube["NOMINAL"].data[:, ij[0], ij[1]] = map_result[idx][1, :] fit_cube["ERR_UP"].data[:, ij[0], ij[1]] = map_result[idx][2, :] else: fit_cube[:, :, ij[0], ij[1]] = map_result[idx] # Save out the cube if self.wcs is not None: fit_cube.writeto(cube_filename, overwrite=True) else: np.save(cube_filename, fit_cube) return True
[docs] def make_parameter_maps( self, fit_dict_filename=None, mcfine_output_filename=None, maps_filename=None, ): """Make maps of fitted parameters. Will preferentially get this from the mcfine_output, which is quicker and reduces I/O. Otherwise, will pull in each individual fit If WCS is defined, these will be .fits files, otherwise just numpy arrays Args: fit_dict_filename (str): Name for the filename of fitted parameter dictionary (without i/j suffix). Defaults to None, which will pull from config.toml mcfine_output_filename (str): Name for the mcfine output. Defaults to None, which will pull from config.toml maps_filename (str): Name for the filename of output maps. Defaults to None, which will pull from config.toml """ if self.data_type != "cube": self.logger.warning("Can only make parameter maps for fitted cubes") sys.exit() f_name = inspect.currentframe().f_code.co_name overwrite = check_overwrite(self.config, f_name) if fit_dict_filename is None: fit_dict_filename = get_dict_val( self.config, self.config_defaults, table="multicomponent_fitter", key="fit_dict_filename", logger=self.logger, ) if mcfine_output_filename is None: mcfine_output_filename = get_dict_val( self.config, self.config_defaults, table="multicomponent_fitter", key="mcfine_output_filename", logger=self.logger, ) if not os.path.exists(f"{mcfine_output_filename}.pkl"): raise FileNotFoundError(f"{mcfine_output_filename}.pkl does not exist") if maps_filename is None: maps_filename = get_dict_val( self.config, self.config_defaults, table="make_parameter_maps", key="maps_filename", logger=self.logger, ) parameter_maps = {} # Define what we'll pull out to maps that aren't # coming from the properties themselves non_prop_maps = [ "n_comp", "likelihood", "bic", "aic", "chisq", "chisq_red", ] if not os.path.exists(maps_filename) or overwrite: # Load in the mcfine output mcfine_output = load_pkl(f"{mcfine_output_filename}.pkl") # Pull 2D data shape out, since we need that later data_shape_2d = mcfine_output["data"]["shape"][1:] ij_list = [ [i, j] for i in range(self.data.shape[1]) for j in range(self.data.shape[2]) if self.mask[i, j] != 0 ] for ij in tqdm( ij_list, dynamic_ncols=True, ): i = ij[0] j = ij[1] # If we've consolidated the fit dictionary, # we can pull directly from the output if self.consolidate_fit_dict: fit_dict = mcfine_output["fit"].get(i, {}).get(j, {}) else: fit_dict = load_pkl(f"{fit_dict_filename}_{i}_{j}.pkl") # Start looping over the parameters. If we don't have it already, # set it up for p in non_prop_maps: if p not in parameter_maps: f = self.setup_minimal_array(shape=data_shape_2d) parameter_maps[p] = copy.deepcopy(f) if self.wcs_2d is not None: parameter_maps[p].data[i, j] = fit_dict[p] else: parameter_maps[p][i, j] = fit_dict[p] # Loop over properties with associated errors prop_key_names = [ "props", "props_err_down", "props_err_up", ] for k in prop_key_names: for p in fit_dict[k]: for n in range(fit_dict["n_comp"]): if f"{p}_{n}" not in parameter_maps: f = self.setup_minimal_array(shape=data_shape_2d) parameter_maps[f"{p}_{n}"] = copy.deepcopy(f) if self.wcs_2d is not None: parameter_maps[f"{p}_{n}"].data[i, j] = fit_dict[ "props" ][p][n] else: parameter_maps[f"{p}_{n}"][i, j] = fit_dict["props"][p][ n ] # Also pull out covariance, if we keep that around. # This works a little differently if self.keep_covariance: if "cov_matrix" not in parameter_maps: parameter_maps["cov_matrix"] = {} if "cov_med" not in parameter_maps: parameter_maps["cov_med"] = {} parameter_maps["cov_matrix"][f"{i}, {j}"] = copy.deepcopy( fit_dict["cov_matrix"] ) parameter_maps["cov_med"][f"{i}, {j}"] = copy.deepcopy( fit_dict["cov_med"] ) # Save maps out if maps_filename is not None: save_pkl(parameter_maps, maps_filename) else: with open(maps_filename, "rb") as f: parameter_maps = pickle.load(f) self.parameter_maps = parameter_maps
[docs] def setup_minimal_array( self, shape, ): """Set up a minimal array of NaNs If WCS is present, will create a fits file. Otherwise a numpy array Args: shape: Data shape TODO: For fits files, may be worth pulling in units """ f = np.zeros(shape) * np.nan if self.wcs_2d is not None: hdr = self.wcs_2d.to_header() f = fits.PrimaryHDU( data=f, header=hdr, ) return f