Source code for py21cmmc.likelihood

"""Module containing 21CMMC likelihoods."""
import logging
import numpy as np
from cached_property import cached_property
from io import IOBase
from os import path, rename
from pathlib import Path
from powerbox.tools import get_power
from py21cmfast import wrapper as lib
from scipy.interpolate import (
    InterpolatedUnivariateSpline,
    RectBivariateSpline,
    interp1d,
)
from scipy.special import erf

from . import core

loaded_cliks = {}
logger = logging.getLogger("21cmFAST")

np.seterr(invalid="ignore", divide="ignore")


def _ensure_iter(a):
    try:
        iter(a)
        return a
    except TypeError:
        return [a]


def _listify(lst):
    if isinstance(lst, list):
        return lst
    else:
        return [lst]


[docs] class LikelihoodBase(core.ModuleBase): """Base class for Likelihoods in 21CMMC."""
[docs] def computeLikelihood(self, model): """ Calculate the likelihood of the instance data given the model. Parameters ---------- model : dict A dictionary containing all model-dependent quantities required to calculate the likelihood. Explicitly, matches the output of :meth:`~reduce_data`. Returns ------- lnL : float The log-posterior of the given model. """ raise NotImplementedError("The Base likelihood should never be used directly!")
[docs] def reduce_data(self, ctx): """ Perform reduction on raw data (from all cores in the chain). This should perform whatever reduction operations are required on the real/mock data to obtain the form used directly in the likelihood. That is, keep in mind that this method will be called on mock data produced by the core modules in this chain to obtain the static data which is used in the likelihood. In this regard it is called, if applicable, once at :meth:`setup()`. For efficiency then, this method should reduce the data as far as possible so that no un-necessary calculations are required per-iteration. Furthermore, it should be a deterministic calculation. Parameters ---------- ctx ; dict-like A context filled with model data from all cores in the chain. Specifically, this is the context obtained by using the __call__ method of each core in sequence. Returns ------- model : dict A dictionary containing all reduced quantities required for obtaining the likelihood. """ raise NotImplementedError("The Base likelihood should never be used directly!")
def _expose_core_parameters(self): # Try to get the params out of the core module for m in self._cores: for k in ["user_params", "flag_options", "cosmo_params", "astro_params"]: if hasattr(m, k): if hasattr(self, k) and getattr(self, k) != getattr(m, k): raise ValueError( f"Setup has detected incompatible input parameter dicts in " f"specified cores: {k}" ) else: setattr(self, k, getattr(m, k))
[docs] def setup(self): """Perform post-init setup.""" super().setup() # Expose user, flag, cosmo, astro params to this likelihood if available. self._expose_core_parameters()
[docs] def get_fiducial_model(self): """Compute and return a model dictionary at the fiducial set of parameters.""" ctx = self.chain.build_model_data() return self.reduce_data(ctx)
[docs] class LikelihoodBaseFile(LikelihoodBase): """Base class for likelihoods whose data is read from a file. Parameters ---------- datafile : str, optional The file(s) from which to read the data. Alternatively, the file to which to write the data. Data will *only* be written to the file if `simulate` is set to True explicitly. noisefile : str, optional The file(s) from which to read the noise profile. If not given, no noise is used. simulate : bool, optional Whether to perform a simulation to create the (mock) data (i.e. instead of reading from file). Default is False, which prevents the mock data from potentially overwriting the `datafile`. emulate : bool, optional Default is False use_data : bool, optional Sometimes you may want to construct the likelihood without any data at all. Set this to False if this is the case. """ _ignore_attributes = LikelihoodBase._ignore_attributes + ("simulate",)
[docs] def __init__( self, datafile=None, noisefile=None, simulate=False, emulate=False, use_data=True, ): super().__init__() self.datafile = datafile self.noisefile = noisefile self._use_data = use_data # We *always* make the datafile and noisefile a list if isinstance(self.datafile, str) or isinstance(self.datafile, IOBase): self.datafile = [self.datafile] if isinstance(self.noisefile, str) or isinstance(self.noisefile, IOBase): self.noisefile = [self.noisefile] self._simulate = simulate self._emulate = emulate self.data = None self.noise = None
[docs] def setup(self): """Perform post-init setup.""" super().setup() if self._use_data: if not self._simulate and not self.datafile and not self._emulate: raise ValueError( "Either an existing datafile has to be specified, or simulate or emulate set to True." ) if self._simulate or self._emulate: simctx = self.chain.simulate_mock() # Read in or simulate the data and noise. self.data = ( self.reduce_data(simctx) if (self._simulate or self._emulate) else self._read_data() ) # If we can't/won't simulate noise, and no noisefile is provided, assume no # noise is necessary. # If emulate is True, this part includes the avg emulator error. if (hasattr(self, "define_noise") and self._simulate) or self.noisefile: self.noise = ( self.define_noise(simctx, self.data) if (hasattr(self, "define_noise") and self._simulate) else self._read_noise() ) # Now, if data has been simulated, and a file is provided, write to the file. if self.datafile and (self._simulate or self._emulate): self._write_data() if ( self.noisefile and (self._simulate or self._emulate) and hasattr(self, "define_noise") ): self._write_noise()
def _read_data(self): data = [] for fl in self.datafile: if not path.exists(fl): raise FileNotFoundError( "Could not find datafile: {fl}. If you meant to simulate data, set simulate=True.".format( fl=fl ) ) else: data.append(dict(np.load(fl, allow_pickle=True))) return data def _read_noise(self): if self.noisefile: noise = [] for fl in self.noisefile: if not path.exists(fl): msg = "" if hasattr(self, "define_noise"): msg = "If you meant to simulate noise, set simulate=True." raise FileNotFoundError(f"Could not find noisefile: {fl}. {msg}") else: try: noise.append(dict(np.load(fl, allow_pickle=True))) except ValueError: # TODO: this one is for reading the error covariance matrix, need to better deal with it noise.append(np.load(fl, allow_pickle=True)) return noise def _write_data(self): for fl, d in zip(self.datafile, self.data): if path.exists(fl): logger.warning( "File {fl} already exists. Moving previous version to {fl}.bk".format( fl=fl ) ) rename(fl, fl + ".bk") np.savez(fl, **d) logger.info(f"Saving data file: {fl}") def _write_noise(self): for fl, d in zip(self.noisefile, self.noise): if path.exists(fl): logger.warning( "File {fl} already exists. Moving previous version to {fl}.bk".format( fl=fl ) ) rename(fl, fl + ".bk") np.savez(fl, **d) logger.info(f"Saved noise file: {fl}") def _check_data_format(self): pass def _check_noise_format(self): pass
[docs] class Likelihood1DPowerCoeval(LikelihoodBaseFile): r""" A Gaussian likelihood for the 1D power spectrum of a coeval cube. Requires the :class:`~core.CoreCoevalModule` to be loaded to work, and inherently deals with the multiple-redshift cubes which that module produces. Parameters ---------- datafile : str, optional Path to file containing data. See :class:`LikelihoodBaseFile` for details. noisefile : str, optional Path to file containing noise data. See :class:`LikelihoodBaseFile` for details. n_psbins : int, optional The number of bins for the spherically averaged power spectrum. By default automatically calculated from the number of cells. min_k : float, optional The minimum k value at which to compare model and data (units 1/Mpc). max_k : float, optional The maximum k value at which to compare model and data (units 1/Mpc). logk : bool, optional Whether the power spectrum bins should be regular in logspace or linear space. model_uncertainty : float, optional The amount of uncertainty in the modelling, per power spectral bin (as fraction of the amplitude). error_on_model : bool, optional Whether the `model_uncertainty` is applied to the model, or the data. ignore_kperp_zero : bool, optional Whether to ignore the kperp=0 when generating the power spectrum. ignore_kpar_zero : bool, optional Whether to ignore the kpar=0 when generating the power spectrum. ignore_k_zero : bool, optional Whether to ignore the ``|k| = 0`` mode when generating the power spectrum. Other Parameters ---------------- \*\*kwargs : All other parameters sent to :class:`LikelihoodBaseFile`. In particular, ``datafile``, ``noisefile``, ``simulate`` and ``use_data``. Notes ----- The ``datafile`` and ``noisefile`` have specific formatting required. Both should be `.npz` files. The datafile should have 'k' and 'delta' arrays in it (k-modes in 1/Mpc and power spectrum respectively) and the noisefile should have 'k' and 'errs' arrays in it (k-modes and their standard deviations respectively). Note that the latter is *almost* the default output of 21cmSense, except that 21cmSense has k in units of h/Mpc, whereas 21cmFAST/21CMMC use units of 1/Mpc. .. warning:: Please ensure that the data/noise is in the correct units for 21CMMC, as this class does not automatically convert units! To make this more flexible, simply subclass this class, and overwrite the :meth:`_read_data` or :meth:`_read_noise` methods, then use that likelihood instead of this in your likelihood chain. Both of these functions should return dictionaries in which the above entries exist. For example:: >>> class MyCoevalLikelihood(Likelihood1DPowerCoeval): >>> def _read_data(self): >>> data = np.genfromtxt(self.datafile) >>> return {"k": data[:, 0], "p": data[:, 1]} Also note that an extra method, ``define_noise`` may be used to define the noise properties dynamically (i.e. without reading it). This method will be called if available and simulate=True. It should have the signature ``define_noise(self, ctx, model)``, where ``ctx`` is the context with all cores having added their data, and ``model`` is the output of the :meth:`simulate` method. """ required_cores = ((core.CoreCoevalModule, core.Core21cmEMU),)
[docs] def __init__( self, n_psbins=None, min_k=0.1, max_k=1.0, logk=True, model_uncertainty=0.15, error_on_model=True, ignore_kperp_zero=True, ignore_kpar_zero=False, ignore_k_zero=False, name="", *args, **kwargs, ): super().__init__(*args, **kwargs) if ( self.noisefile and self.datafile and len(self.datafile) != len(self.noisefile) ): raise ValueError( "If noisefile or datafile are provided, they should have the same number " "of files (one for each coeval box)" ) self.n_psbins = n_psbins self.min_k = min_k self.max_k = max_k self.logk = logk self.error_on_model = error_on_model self.model_uncertainty = model_uncertainty self.ignore_k_zero = ignore_k_zero self.ignore_kperp_zero = ignore_kperp_zero self.ignore_kpar_zero = ignore_kpar_zero self.name = name
def _check_data_format(self): for i, d in enumerate(self.data): if ("k" not in d and "kwfband8" not in d) or ( "delta" not in d and "band8" not in d ): raise ValueError( f"datafile #{i+1} of {len(self.datafile)} has the wrong format." ) def _check_noise_format(self): for i, n in enumerate(self.noise): if "k" not in n or "errs" not in n: raise ValueError( f"noisefile #{i+1} of {len(self.noise)} has the wrong format" )
[docs] def setup(self): """Perform post-init setup.""" super().setup() # Ensure that there is one dataset and noiseset per redshift. if len(self.data) != len(self.redshift): raise ValueError( "There needs to be one dataset (datafile) for each redshift!" ) if self.noise and len(self.noise) != len(self.redshift): raise ValueError( "There needs to be one dataset (noisefile) for each redshift!" ) # Check if all data is formatted correctly. self._check_data_format() if self.noise: self._check_noise_format()
@cached_property def data_spline(self): """Splines of data power spectra.""" return [ InterpolatedUnivariateSpline(d["k"], d["delta"], k=1) for d in self.data ] @cached_property def noise_spline(self): """Splines of noise power spectra.""" if self.noise: return [ InterpolatedUnivariateSpline(n["k"], n["errs"], k=1) for n in self.noise ] else: return None
[docs] @staticmethod def compute_power( brightness_temp, L, n_psbins, log_bins=True, ignore_kperp_zero=True, ignore_kpar_zero=False, ignore_k_zero=False, ): """Compute power spectrum from coeval box. Parameters ---------- brightness_temp : :class:`py21cmfast.BrightnessTemp` instance The brightness temperature coeval box. L : float Size of the coeval cube along a side, in Mpc. n_psbins : int Number of power spectrum bins to return. log_bins : bool, optional Whether the bins are regular in log-space. ignore_kperp_zero : bool, optional Whether to ignore perpendicular ``k=0`` modes when performing spherical average. ignore_kpar_zero : bool, optoinal Whether to ignore parallel ``k=0`` modes when performing spherical average. ignore_k_zero : bool, optional Whether to ignore the ``|k|=0`` mode when performing spherical average. Returns ------- power : ndarray The power spectrum as a function of k k : ndarray The centres of the k-bins defining the power spectrum. """ # Determine the weighting function required from ignoring k's. k_weights = np.ones(brightness_temp.shape, dtype=int) n = k_weights.shape[0] if ignore_kperp_zero: k_weights[n // 2, n // 2, :] = 0 if ignore_kpar_zero: k_weights[:, :, n // 2] = 0 if ignore_k_zero: k_weights[n // 2, n // 2, n // 2] = 0 res = get_power( brightness_temp, boxlength=L, bins=n_psbins, bin_ave=False, get_variance=False, log_bins=log_bins, k_weights=k_weights, ) res = list(res) k = res[1] if log_bins: k = np.exp((np.log(k[1:]) + np.log(k[:-1])) / 2) else: k = (k[1:] + k[:-1]) / 2 res[1] = k return res
@property def redshift(self): """The redshifts of coeval simulations.""" if isinstance(self.paired_core, core.Core21cmEMU): return self.data[0]["z_bands"] else: return self.core_primary.redshift
[docs] def computeLikelihood(self, model): """Compute the likelihood given a model. Parameters ---------- model : list of dict A list of dictionaries, one for each redshift. Exactly the output of :meth:'reduce_data`. """ if isinstance(self.paired_core, core.Core21cmEMU): N = len(model) if N > 1: lnl = np.zeros(N) else: lnl = 0 hera_data = self.data[0] for i in range(N): for j, band in enumerate(self.redshift): band_key = "band" + str(int(np.round(band))) nfields = hera_data[band_key].shape[0] for field in range(nfields): PS_limit_ks = hera_data[band_key][field, :, 0] PS_limit_ks = PS_limit_ks[~np.isnan(PS_limit_ks)] Nkbins = len(PS_limit_ks) PS_limit_vals = hera_data[band_key][field, :Nkbins, 1] PS_limit_vars = hera_data[band_key][field, :Nkbins, 2] kwf_limit_vals = hera_data["kwf" + band_key] Nkwfbins = len(kwf_limit_vals) PS_limit_wfcs = hera_data["wf" + band_key][field, :Nkbins, :] PS_limit_wfcs = PS_limit_wfcs.reshape([Nkbins, Nkwfbins]) ModelPS_val = model[i][j]["delta"][:Nkwfbins] ModelPS_val_afterWF = np.dot(PS_limit_wfcs, ModelPS_val) # Include emulator error term if present if "delta_err" in model[i][j].keys(): ModelPS_val_1sigma_upper_afterWF = np.dot( PS_limit_wfcs, ModelPS_val + model[i][j]["delta_err"][:Nkwfbins], ) ModelPS_val_1sigma_lower_afterWF = np.dot( PS_limit_wfcs, ModelPS_val - model[i][j]["delta_err"][:Nkwfbins], ) # The upper and lower errors are very similar usually, so we can just take the mean and use that. mean_err = np.mean( [ ModelPS_val_1sigma_upper_afterWF - ModelPS_val_afterWF, ModelPS_val_afterWF - ModelPS_val_1sigma_lower_afterWF, ], axis=0, ) error_val = np.sqrt( PS_limit_vars + (0.2 * ModelPS_val_afterWF) ** 2 + (mean_err) ** 2 ) else: error_val = np.sqrt( PS_limit_vars + (0.2 * ModelPS_val_afterWF) ** 2 ) if N > 1: lnl[i] += -0.5 * np.sum( (ModelPS_val_afterWF - PS_limit_vals) ** 2 / (error_val**2) ) else: lnl += -0.5 * np.sum( (ModelPS_val_afterWF - PS_limit_vals) ** 2 / (error_val**2) ) else: lnl = 0 noise = 0 for i, (m, pd) in enumerate(zip(model, self.data_spline)): mask = np.logical_and(m["k"] <= self.max_k, m["k"] >= self.min_k) moduncert = ( self.model_uncertainty * pd(m["k"][mask]) if not self.error_on_model else self.model_uncertainty * m["delta"][mask] ) if self.noise_spline: noise = self.noise_spline[i](m["k"][mask]) # TODO: if moduncert depends on model, not data, then it should appear # as -0.5 log(sigma^2) term below. lnl += -0.5 * np.sum( (m["delta"][mask] - pd(m["k"][mask])) ** 2 / (moduncert**2 + noise**2) ) logger.debug(f"Likelihood computed: {lnl}") return lnl
[docs] def reduce_data(self, ctx): """Reduce the data in the context to a list of models (one for each redshift).""" data = [] if isinstance(self.paired_core, core.Core21cmEMU): # Interpolate the data onto the HERA bands and ks if len(ctx.get("PS").shape) > 2: N = ctx.get("PS").shape[0] else: N = 1 for j in range(N): tmp_data = [] for i in range(self.redshift.shape[0]): interp_ks = self.k[i] tmp_data.append( { "k": interp_ks, "delta": RectBivariateSpline( ctx.get("PS_redshifts"), ctx.get("k"), ctx.get("PS")[j, ...] if N > 1 else ctx.get("PS"), )(self.redshift[i], interp_ks)[0], "delta_err": RectBivariateSpline( ctx.get("PS_redshifts"), ctx.get("k"), ctx.get("PS_err"), )(self.redshift[i], interp_ks)[0], } ) data.append(tmp_data) else: brightness_temp = ctx.get("brightness_temp") for bt in brightness_temp: power, k = self.compute_power( bt, self.user_params.BOX_LEN, self.n_psbins, log_bins=self.logk, ignore_k_zero=self.ignore_k_zero, ignore_kpar_zero=self.ignore_kpar_zero, ignore_kperp_zero=self.ignore_kperp_zero, ) data.append({"k": k, "delta": power * k**3 / (2 * np.pi**2)}) return data
[docs] def store(self, model, storage): """Save elements of the model to the storage dict. Do not use this method -- it is called by the MCMC routine to save data to the storage backend. """ # add the power to the written data for i, m in enumerate(model): storage.update({k + "_z%s" % self.redshift[i]: v for k, v in m.items()})
@cached_property def paired_core(self): """The PS core that is paired with this likelihood.""" paired = [] for c in self._cores: if isinstance(c, core.Core21cmEMU) and c.name == self.name: paired.append(c) else: if isinstance(c, core.CoreCoevalModule) or isinstance( c, core.CoreCoevalModule ): paired.append(c) if len(paired) > 1: raise ValueError( "You've got more than one CoreCoevalModule / Core21cmEMU with the same name -- they will overwrite each other!" ) return paired[0]
[docs] class Likelihood1DPowerLightcone(Likelihood1DPowerCoeval): """ A likelihood very similar to :class:`Likelihood1DPowerCoeval`, except for a lightcone. This likelihood is vectorized i.e., it accepts an array of ``astro_params``, but only when used with ``Core21cmEMU``. Since most of the functionality is the same, please see the other documentation for details. """ required_cores = ((core.CoreLightConeModule, core.Core21cmEMU),)
[docs] def __init__(self, *args, datafile="", nchunks=1, **kwargs): super().__init__(*args, **kwargs) self.nchunks = nchunks self.datafile = [datafile] if isinstance(datafile, (str, Path)) else datafile
[docs] @classmethod def from_builtin_data(cls, datafile="", **kwargs): """Create the class loading in built-in data.""" datafile = path.join(path.dirname(__file__), "data", datafile + ".npz") return cls(datafile=datafile, **kwargs)
[docs] def setup(self): """Perform post-init setup.""" LikelihoodBaseFile.setup(self) if isinstance(self.paired_core, core.Core21cmEMU): self.redshifts = self.data[0]["z_bands"] self.k = [ self.data[0]["kwfband" + str(int(np.round(z)))] for z in self.redshifts ] self.k_len = max(len(i) for i in self.k) # Ensure that there is one dataset and noiseset per redshift. if len(self.data) != self.nchunks: raise ValueError( "There needs to be one dataset (datafile) for each chunk!!" ) if self.noise and len(self.noise) != self.nchunks: raise ValueError( "There needs to be one dataset (noisefile) for each chunk!" ) # Check if all data is formatted correctly. self._check_data_format() if self.noise: self._check_noise_format()
[docs] @staticmethod def compute_power( box, length, n_psbins, log_bins=True, ignore_kperp_zero=True, ignore_kpar_zero=False, ignore_k_zero=False, ): """Compute power spectrum from coeval box. Parameters ---------- box : :class:`py21cmfast.Lightcone` instance The lightcone to take the power spectrum of. length : 3-tuple Size of the lightcone in its 3 dimensions (X,Y,Z) n_psbins : int Number of power spectrum bins to return. log_bins : bool, optional Whether the bins are regular in log-space. ignore_kperp_zero : bool, optional Whether to ignore perpendicular k=0 modes when performing spherical average. ignore_kpar_zero : bool, optional Whether to ignore parallel k=0 modes when performing spherical average. ignore_k_zero : bool, optional Whether to ignore the ``|k|=0`` mode when performing spherical average. Returns ------- power : ndarray The power spectrum as a function of k k : ndarray The centres of the k-bins defining the power spectrum. """ # Determine the weighting function required from ignoring k's. k_weights = np.ones(box.shape, dtype=int) n0 = k_weights.shape[0] n1 = k_weights.shape[-1] if ignore_kperp_zero: k_weights[n0 // 2, n0 // 2, :] = 0 if ignore_kpar_zero: k_weights[:, :, n1 // 2] = 0 if ignore_k_zero: k_weights[n0 // 2, n0 // 2, n1 // 2] = 0 res = get_power( box, boxlength=length, bins=n_psbins, bin_ave=False, get_variance=False, log_bins=log_bins, k_weights=k_weights, ) res = list(res) k = res[1] if log_bins: k = np.exp((np.log(k[1:]) + np.log(k[:-1])) / 2) else: k = (k[1:] + k[:-1]) / 2 res[1] = k return res
[docs] def reduce_data(self, ctx): """Reduce the data in the context to a list of models (one for each redshift chunk).""" data = [] if isinstance(self.paired_core, core.Core21cmEMU): # Interpolate the data onto the HERA bands and ks if len(ctx.get("PS").shape) > 2: N = ctx.get("PS").shape[0] else: N = 1 for j in range(N): tmp_data = [] for i in range(self.redshift.shape[0]): interp_ks = self.k[i] tmp_data.append( { "k": interp_ks, "delta": RectBivariateSpline( ctx.get("PS_redshifts"), ctx.get("k"), ctx.get("PS")[j, ...] if N > 1 else ctx.get("PS"), )(self.redshift[i], interp_ks)[0], "delta_err": RectBivariateSpline( ctx.get("PS_redshifts"), ctx.get("k"), ctx.get("PS_err"), )(self.redshift[i], interp_ks)[0], } ) data.append(tmp_data) else: brightness_temp = ctx.get("lightcone") data = [] chunk_indices = list( range( 0, brightness_temp.n_slices, round(brightness_temp.n_slices / self.nchunks), ) ) if len(chunk_indices) > self.nchunks: chunk_indices = chunk_indices[:-1] chunk_indices.append(brightness_temp.n_slices) for i in range(self.nchunks): start = chunk_indices[i] end = chunk_indices[i + 1] chunklen = (end - start) * brightness_temp.cell_size power, k = self.compute_power( brightness_temp.brightness_temp[:, :, start:end], (self.user_params.BOX_LEN, self.user_params.BOX_LEN, chunklen), self.n_psbins, log_bins=self.logk, ignore_kperp_zero=self.ignore_kperp_zero, ignore_kpar_zero=self.ignore_kpar_zero, ignore_k_zero=self.ignore_k_zero, ) data.append({"k": k, "delta": power * k**3 / (2 * np.pi**2)}) return data
[docs] def store(self, model, storage): """Store the model into backend storage.""" # add the power to the written data for i, m in enumerate(model): if isinstance(self.paired_core, core.Core21cmEMU): if isinstance(m, list): for j, n in enumerate(m): storage.update({k + "_%s" % j: v for k, v in n.items()}) else: storage.update({k + "_%s" % i: v for k, v in m.items()})
@cached_property def paired_core(self): """The PS core that is paired with this likelihood.""" paired = [] for c in self._cores: if (isinstance(c, core.Core21cmEMU) and c.name == self.name) or ( isinstance(c, core.CoreLightConeModule) and c.name == self.name ): paired.append(c) if len(paired) > 1: raise ValueError( "You've got more than one CoreCoevalModule / Core21cmEMU with the same name -- they will overwrite each other!" ) return paired[0]
[docs] class LikelihoodPlanckPowerSpectra(LikelihoodBase): r"""A likelihood template to use Planck power spectrum. It makes use of the clik code developed by the planck collaboration. Go to the planck legacy archive website to download the code and data and get information: https://pla.esac.esa.int You should download the "baseline" data package. This likelihood currently supports Planck_lensing, Planck_highl_TTTEEE and Planck_lowl_EE data. Could easily be extended if required. Parameters ---------- name_lkl : str the planck likelihood to compute. choice: Planck_lensing, Planck_highl_TTTEEE, Planck_lowl_EE """ required_cores = ((core.CoreLightConeModule, core.CoreCMB),)
[docs] def __init__( self, *args, name_lkl=None, A_planck_prior_center=1, A_planck_prior_variance=0.1, **kwargs, ): super().__init__(*args, **kwargs) self.name = name_lkl self.A_planck_prior_center = A_planck_prior_center self.A_planck_prior_variance = A_planck_prior_variance if "Planck_lensing" in self.name: self.lensing = True else: self.lensing = False if "Planck_highl_TTTEEE" in self.name: self.TTTEEE = True else: self.TTTEEE = False if "Planck_lowl_EE" in self.name: self.EE = True else: self.EE = False if not self.TTTEEE and not self.EE and not self.lensing: raise AttributeError( "I did not understand name %s" % (self.name) + " please choose between " + "Planck_lensing, Planck_highl_TTTEEE, Planck_lowl_EE" ) self.initialize = True if self.initialize: self.initialize_clik_and_class(self.name)
[docs] def reduce_data(self, ctx): """Get the CMB power spectra and the nuisance parameter A_planck from the coreCMB.""" cl = ctx.get("cl_cmb") A_planck = ctx.get("A_planck", 1) data = {"cl_cmb": cl, "A_planck_cmb": A_planck} return data
[docs] def computeLikelihood(self, model): """ Compute the likelihood. This is the likelihood arising from Planck Lite (2018). Parameters ---------- cosmo : contains cosmological observables computed with CLASS Exactly the output of CLASS Returns ------- lnl : float The log-likelihood for the given model. """ cl = model["cl_cmb"] if self.lensing: # these my_clik * are global variables defined in initialize_clik_and_class. my_clik = my_clik_lensing my_l_max = my_l_max_lensing if self.TTTEEE: my_clik = my_clik_TTTEEE my_l_max = my_l_max_TTTEEE if self.EE: my_clik = my_clik_EE my_l_max = my_l_max_EE if self.lensing: try: length = len(my_clik.get_lmax()) tot = np.zeros( np.sum(my_clik.get_lmax()) + length + len(my_clik.get_extra_parameter_names()) ) # following 3 lines for compatibility with lensing likelihoods of 2013 and before # (then, clik.get_lmax() just returns an integer for lensing likelihoods, # and the length is always 2 for cl['pp'], cl['tt']) except: length = 2 tot = np.zeros( 2 * my_l_max + length + len(my_clik.get_extra_parameter_names()) ) else: length = len(my_clik.get_has_cl()) tot = np.zeros( np.sum(my_clik.get_lmax()) + length + len(my_clik.get_extra_parameter_names()) ) # # fill with Cl's index = 0 if not self.lensing: for i in range(length): if my_clik.get_lmax()[i] > -1: for j in range(my_clik.get_lmax()[i] + 1): if i == 0: tot[index + j] = cl["tt"][j] elif i == 1: tot[index + j] = cl["ee"][j] elif i == 2: tot[index + j] = cl["bb"][j] elif i == 3: tot[index + j] = cl["te"][j] elif i == 4: tot[index + j] = 0 # cl['tb'][j] class does not compute tb elif i == 5: tot[index + j] = 0 # cl['eb'][j] class does not compute eb index += my_clik.get_lmax()[i] + 1 else: try: for i in range(length): if my_clik.get_lmax()[i] > -1: for j in range(my_clik.get_lmax()[i] + 1): if i == 0: tot[index + j] = cl["pp"][j] elif i == 1: tot[index + j] = cl["tt"][j] elif i == 2: tot[index + j] = cl["ee"][j] elif i == 3: tot[index + j] = cl["bb"][j] elif i == 4: tot[index + j] = cl["te"][j] elif i == 5: tot[ index + j ] = 0 # cl['tb'][j] class does not compute tb elif i == 6: tot[ index + j ] = 0 # cl['eb'][j] class does not compute eb index += my_clik.get_lmax()[i] + 1 # following 8 lines for compatibility with lensing likelihoods of 2013 and before # (then, clik.get_lmax() just returns an integer for lensing likelihoods, # and the length is always 2 for cl['pp'], cl['tt']) except: for i in range(length): for j in range(my_l_max): if i == 0: tot[index + j] = cl["pp"][j] if i == 1: tot[index + j] = cl["tt"][j] index += my_l_max + 1 # fill with nuisance parameters A_planck = model["A_planck_cmb"] tot[index] = A_planck index += 1 lkl = my_clik(tot)[0] # -loglike # add nuisance parameter A_planck lkl += ( -0.5 * ((A_planck - self.A_planck_prior_center) / self.A_planck_prior_variance) ** 2 ) return lkl
[docs] def initialize_clik_and_class(self, name=None): """Initialize clik and class.""" global my_clik_TTTEEE, my_clik_lensing, my_clik_EE, my_l_max_lensing, my_l_max_EE, my_l_max_TTTEEE self.initialize = False try: import clik except ModuleNotFoundError: raise ImportError( "You must first activate the binaries from the Clik " + "distribution. Please run : \n " + "]$ source /path/to/clik/bin/clik_profile.sh \n " + "and try again." ) my_path = path.join( "%s/.ccode/baseline/plc_3.0/low_l/simall/simall_100x143_offlike5_EE_Aplanck_B.clik" % path.expanduser("~") ) if not path.isdir(my_path): import tarfile from astropy.utils.data import download_file tarfile.open( download_file( "http://pla.esac.esa.int/pla/aio/product-action?COSMOLOGY.FILE_ID=COM_Likelihood_Data-baseline_R3.00.tar.gz", ) ).extractall(path.expanduser("~/.ccode")) try: if self.lensing: my_clik_lensing = clik.clik_lensing(my_path) try: my_l_max_lensing = max(my_clik_lensing.get_lmax()) # following 2 lines for compatibility with lensing likelihoods of 2013 and before # (then, clik.get_lmax() just returns an integer for lensing likelihoods; # this behavior was for clik versions < 10) except: my_l_max_lensing = my_clik_lensing.get_lmax() elif self.TTTEEE: my_clik_TTTEEE = clik.clik(my_path) my_l_max_TTTEEE = max(my_clik_TTTEEE.get_lmax()) elif self.EE: my_clik_EE = clik.clik(my_path) my_l_max_EE = max(my_clik_EE.get_lmax()) else: raise AttributeError( f"I did not understand name {name}" + "please choose between" + "Planck_lensing, Planck_highl_TTTEEE, Planck_lowl_EE" ) except AttributeError: raise AttributeError( "The path to the .clik file for the likelihood " "%s was not found where indicated:\n%s\n" % (name, my_path) + " Note that the default path to search for it is" " one directory above the path['clik'] field. You" " can change this behaviour in all the " "Planck_something.data, to reflect your local configuration, " "or alternatively, move your .clik files to this place." )
[docs] class LikelihoodPlanck(LikelihoodBase): """ A likelihood which utilises Planck optical depth data. In practice, any optical depth measurement (or mock measurement) may be used, by defining the class variables ``tau_mean`` and ``tau_sigma``. This likelihood is vectorized i.e., it accepts an array of ``astro_params``. Parameters ---------- tau_mean : float Mean for the optical depth constraint. By default, it is 0.0569 from Planck 2018 (2006.16828) tau_sigma_u : float One sigma errors for the optical depth constraint. By default, it is 0.0081 from Planck 2018 (2006.16828) tau_sigma_l : float One sigma errors for the optical depth constraint. By default, it is 0.0066 from Planck 2018 (2006.16828) """ required_cores = ( (core.CoreCoevalModule, core.CoreLightConeModule, core.Core21cmEMU), )
[docs] def __init__( self, *args, tau_mean=0.0569, tau_sigma_u=0.0073, tau_sigma_l=0.0066, **kwargs, ): super().__init__(*args, **kwargs) # Mean and one sigma errors for the Planck constraints # Cosmology from Planck 2018(https://arxiv.org/abs/2006.16828) self.tau_mean = tau_mean self.tau_sigma_u = tau_sigma_u self.tau_sigma_l = tau_sigma_l # Simple linear extrapolation of the redshift range provided by the user, to be # able to estimate the optical depth self.n_z_interp = 25 self.z_extrap_min = 5.0 self.z_extrap_max = 30.0
[docs] def computeLikelihood(self, model): """ Compute the likelihood. This is the likelihood arising from Planck (2016) (https://arxiv.org/abs/1605.03507). Parameters ---------- model : list of dicts Exactly the output of :meth:`simulate`. Returns ------- lnl : float The log-likelihood for the given model. """ tau_sigma_u = np.sqrt(self.tau_sigma_u**2 + 0.5 * model["tau_err"] ** 2) tau_sigma_l = np.sqrt(self.tau_sigma_l**2 + 0.5 * model["tau_err"] ** 2) lnl = ( -0.5 * np.square(self.tau_mean - model["tau"]) / ( tau_sigma_u * tau_sigma_l + (tau_sigma_u - tau_sigma_l) * (model["tau"] - self.tau_mean) ) ) logger.debug(f"Planck Likelihood computed: {lnl}") return lnl
@property def _is_lightcone(self): return isinstance(self.core_primary, core.CoreLightConeModule) @property def _is_emu(self): return isinstance(self.core_primary, core.Core21cmEMU)
[docs] def reduce_data(self, ctx): """Reduce the data in the context to a model. Returns ------- dict : Only key is "tau", the optical depth to reionization. If the emulator is used, 'tau_err' is the emulator error on tau calculated using compute_tau. """ # Extract relevant info from the context. if self._is_emu: tau_err = ctx.get("tau_err") tau_value = ctx.get("tau") elif self._is_lightcone: lc = ctx.get("lightcone") redshifts = lc.node_redshifts xHI = lc.global_xHI tau_err = 0.0 else: redshifts = self.core_primary.redshift xHI = [np.mean(x.xH_box) for x in ctx.get("xHI")] tau_err = 0.0 if not self._is_emu: if len(redshifts) < 3: raise ValueError( "You cannot use the Planck prior likelihood with less than 3 redshifts" ) # Order the redshifts in increasing order redshifts, xHI = np.sort(np.array([redshifts, xHI])) # The linear interpolation/extrapolation function, taking as input the redshift # supplied by the user and the corresponding neutral fractions recovered for # the specific EoR parameter set neutral_frac_func = InterpolatedUnivariateSpline(redshifts, xHI, k=1) # Perform extrapolation z_extrap = np.linspace( self.z_extrap_min, self.z_extrap_max, self.n_z_interp ) xHI = neutral_frac_func(z_extrap) # Ensure that the neutral fraction does not exceed unity, or go negative np.clip(xHI, 0, 1, xHI) # Set up the arguments for calculating the estimate of the optical depth. # Once again, performed using command line code. # TODO: not sure if this works. tau_value = lib.compute_tau( user_params=self.core_primary.user_params, cosmo_params=self.core_primary.cosmo_params, redshifts=z_extrap, global_xHI=xHI, ) return {"tau": tau_value, "tau_err": tau_err}
[docs] class LikelihoodNeutralFraction(LikelihoodBase): """ A likelihood based on the measured neutral fraction at a range of redshifts. This likelihood is vectorized i.e., it accepts an array of ``astro_params``. The log-likelihood statistic is a simple chi^2 if the model has xHI > threshold, and 0 otherwise. """ required_cores = ( ( core.CoreLightConeModule, core.CoreCoevalModule, core.CoreCMB, core.Core21cmEMU, ), ) threshold = 0.06
[docs] def __init__(self, redshift=5.9, xHI=0.06, xHI_sigma=0.05): """ Neutral fraction likelihood/prior. Note that the default parameters are based on McGreer et al. constraints Modelled as a flat, unity prior at x_HI <= 0.06, and a one sided Gaussian at x_HI > 0.06 (Gaussian of mean 0.06 and one sigma of 0.05). Limit on the IGM neutral fraction at z = 5.9, from dark pixels by I. McGreer et al. (2015) (http://adsabs.harvard.edu/abs/2015MNRAS.447..499M) Parameters ---------- redshift : float or list of floats Redshift(s) at which the neutral fraction has been measured. xHI : float or list of floats Measured values of the neutral fraction, corresponding to `redshift`. xHI_sigma : float or list of floats One-sided uncertainty of measurements. """ self.redshift = _ensure_iter(redshift) self.xHI = _ensure_iter(xHI) self.xHI_sigma = _ensure_iter(xHI_sigma) # By default, setup as if using coeval boxes. self.redshifts = ( [] ) # these will become the redshifts of all coeval boxes, if that exists. self._use_coeval = True self._require_spline = False self._use_tanh = False
@property def lightcone_modules(self): """All lightcone core modules that are loaded.""" return [m for m in self._cores if isinstance(m, core.CoreLightConeModule)] @property def coeval_modules(self): """All coeval core modules that are loaded.""" return [ m for m in self._cores if isinstance(m, core.CoreCoevalModule) and not isinstance(m, core.CoreLightConeModule) ] @property def emu_modules(self): """All emulator core modules that are loaded.""" return [m for m in self._cores if isinstance(m, core.Core21cmEMU)] @property def cmb_modules(self): """All CMB core modules that are loaded.""" return [m for m in self._cores if (isinstance(m, core.CoreCMB))]
[docs] def setup(self): """Perform post-init setup.""" if ( not self.lightcone_modules + self.coeval_modules + self.cmb_modules + self.emu_modules ): raise ValueError( "LikelihoodNeutralFraction needs the CoreLightConeModule *or* " "CoreCoevalModule *or* CoreCMB to be loaded." ) if not self.lightcone_modules: if self.cmb_modules: self._use_tanh = True self._use_coeval = False self._require_spline = True elif self.emu_modules: self._use_tanh = True self._use_coeval = False self._require_spline = True else: # Get all unique redshifts from all coeval boxes in cores. self.redshifts = list( set(sum((x.redshift for x in self.coeval_modules), [])) ) for z in self.redshift: if z not in self.redshifts and len(self.redshifts) < 3: raise ValueError( "To use LikelihoodNeutralFraction, the core must be a lightcone, " "or coeval with >=3 redshifts, or containing the desired redshift" ) elif z not in self.redshifts: self._require_spline = True self._use_coeval = True else: self._use_coeval = False self._require_spline = True
[docs] def reduce_data(self, ctx): """Return a dictionary of model quantities from the context.""" err = 0 if self._use_coeval: xHI = np.array([np.mean(x) for x in ctx.get("xHI")]) redshifts = self.redshifts else: if self._use_tanh: # TODO just a temperory fix to get xHI from x_e # 1.0818709330934035 = 1+0.25*YHe/(1-YHe) with YHe coming from BBN # I think there is a way to fix YHe using user defined value xHI = ctx.get("xHI") redshifts = ctx.get("zs") if redshifts is None: redshifts = ctx.get("redshifts") err = ctx.get("xHI_err") else: xHI = ctx.get("lightcone").global_xHI redshifts = ctx.get("lightcone").node_redshifts if xHI.ndim == 1: redshifts, xHI = np.sort([redshifts, xHI]) return {"xHI": xHI, "redshifts": redshifts, "err": err}
[docs] def computeLikelihood(self, model): """Compute the likelihood.""" n = model["xHI"].shape[0] xHI = np.atleast_2d(model["xHI"]) lnprob = np.zeros(n) for i in range(n): if self._require_spline: model_spline = InterpolatedUnivariateSpline( model["redshifts"], xHI[i, :], k=1 ) if np.sum(model["err"]) > 0: err_spline = InterpolatedUnivariateSpline( model["redshifts"], model["err"], k=1 ) for z, data, sigma in zip(self.redshift, self.xHI, self.xHI_sigma): if np.sum(model["err"]) > 0: sigma_t = np.sqrt(sigma**2 + err_spline(z) ** 2) else: sigma_t = sigma if z in model["redshifts"]: lnprob[i] += self.lnprob( xHI[model["redshifts"].index(z)], data, sigma_t ) else: lnprob[i] += self.lnprob(model_spline(z), data, sigma_t) logger.debug(f"Neutral fraction Likelihood computed: {lnprob}") return lnprob
[docs] def lnprob(self, model, data, sigma): """Compute the log prob given a model, data and error.""" model = np.clip(model, 0, 1) if model > self.threshold: return -0.5 * ((data - model) / sigma) ** 2 else: return 0
[docs] class LikelihoodNeutralFractionTwoSided(LikelihoodNeutralFraction): """ A likelihood based on the measured neutral fraction at a range of redshifts. This likelihood is vectorized i.e., it accepts an array of ``astro_params``. See ``LikelihoodNeutralFraction`` for more information. The log-likelihood statistic is a simple chi^2. """ required_cores = ( ( core.CoreLightConeModule, core.CoreCoevalModule, core.CoreCMB, core.Core21cmEMU, ), ) threshold = 0.06
[docs] def __init__(self, redshift=5.9, xHI=0.06, xHI_sigma=0.05): """ Neutral fraction likelihood/prior. Parameters ---------- redshift : float or list of floats Redshift(s) at which the neutral fraction has been measured. xHI : float or list of floats Measured values of the neutral fraction, corresponding to `redshift`. xHI_sigma : float or list of floats Two-sided uncertainty of measurements. """ super().__init__(redshift=redshift, xHI=xHI, xHI_sigma=xHI_sigma)
[docs] def lnprob(self, model, data, sigma): """Compute the log prob given a model, data and error.""" model = np.clip(model, 0, 1) return -0.5 * ((data - model) / sigma) ** 2
[docs] class LikelihoodGreig(LikelihoodNeutralFraction, LikelihoodBaseFile): """Likelihood using QSOs. See :class:`LikelihoodNeutralFraction` and :class:`LikelihoodBaseFile` for parameter descriptions. """ qso_redshift = 7.0842 # The redshift of the QSO
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Read in data files. nf = np.load( path.join(path.dirname(__file__), "data", "NeutralFractionsForPDF.npy") ) pdf = np.load( path.join(path.dirname(__file__), "data", "NeutralFractionPDF_SmallHII.npy") ) # Normalising the PDF to have a peak probability of unity (consistent with # how other priors are treated). Ultimately, this step does not matter pdf /= np.amax(pdf) # Interpolate the QSO damping wing PDF # Interpolate in log space because the pdf must be greater than zero. self.spline_qso_damping_pdf = InterpolatedUnivariateSpline( nf[pdf > 0], np.log(pdf[pdf > 0]) )
[docs] def computeLikelihood(self, model): """ Compute the likelihood. Constraints on the IGM neutral fraction at z = 7.1 from the IGM damping wing of ULASJ1120+0641. Greig et al (2016) (http://arxiv.org/abs/1606.00441). """ redshifts = model["redshifts"] ave_nf = model["xHI"] if self.qso_redshift in redshifts: nf_qso = redshifts.index(self.qso_redshift) elif len(redshifts) > 2: # Check the redshift range input by the user to determine whether to # interpolate or extrapolate the IGM neutral fraction to the QSO redshift if self.qso_redshift < np.min(redshifts): # The QSO redshift is outside the range set by the user. Need to # extrapolate the reionisation history to obtain the neutral fraction at # the QSO redshift # The linear interpolation/extrapolation function, taking as input the # redshift supplied by the user and the corresponding neutral fractions # recovered for the specific EoR parameter set global_nf_spl = InterpolatedUnivariateSpline(redshifts, ave_nf, k=1) else: # The QSO redshift is within the range set by the user. Can interpolate # the reionisation history to obtain the neutral fraction at the QSO redshift global_nf_spl = InterpolatedUnivariateSpline( redshifts, ave_nf, k=2 if len(redshifts) == 3 else 3 ) nf_qso = global_nf_spl(self.qso_redshift) else: raise ValueError( "You cannot use the Greig prior likelihood with either less than 3 " "redshifts or the redshift being directly evaluated." ) # Ensure that the neutral fraction does not exceed unity, or go negative nf_qso = np.clip(nf_qso, 0, 1) # un-log the spline. qso_prob = np.exp(self.spline_qso_damping_pdf(nf_qso)) # We work with the log-likelihood, therefore convert the IGM Damping wing PDF to # log space return np.log(qso_prob)
[docs] class LikelihoodGlobalSignal(LikelihoodBaseFile): """Chi^2 likelihood of Global Signal, where global signal is in mK as a function of MHz.""" required_cores = ((core.CoreLightConeModule, core.Core21cmEMU),) @property def _is_emu(self): return isinstance(self.core_primary, core.Core21cmEMU)
[docs] def reduce_data(self, ctx): """Reduce data to model.""" if self._is_emu: return { "frequencies": 1420.4 / (self._cores[0].redshift + 1.0), "global_signal": ctx.get("brightness_temp"), } else: return { "frequencies": 1420.4 / (np.array(ctx.get("lightcone").node_redshifts) + 1.0), "global_signal": ctx.get("lightcone").global_brightness_temp, }
[docs] def computeLikelihood(self, model): """Compute the likelihood, given the lightcone output from 21cmFAST.""" model_spline = InterpolatedUnivariateSpline( model["frequencies"], model["global_signal"] ) lnl = -0.5 * np.sum( (self.data["global_signal"] - model_spline(self.data["frequencies"])) ** 2 / self.noise["sigma"] ** 2 ) return lnl
[docs] class LikelihoodLuminosityFunction(LikelihoodBaseFile): r""" Likelihood based on Chi^2 comparison to luminosity function data. This likelihood is vectorized i.e., it accepts an array of ``astro_params``. Parameters ---------- datafile : str, optional Input data should be in a `.npz` file, and contain the arrays: * ``Muv``: the brightness magnitude array * ``lfunc``: the number density of galaxies at each ``Muv`` bin. Each of these arrays can be either 1D or 2D. If 1D, they will be interpreted to be arrays over ``Muv``. If 2D, first axis will be interpreted to be redshift. If you require each luminosity function at different redshifts to have different numbers of Muv bins, you should create multiple files, and create a separate core/likelihood instance pair for each, pairing them by ``name``. A set of default LFs (z=6,7,8,10; Bouwens+15,16; Oesch+17) are provided in the folder ``data`` where datafile and noisefile (see below) are named LF_lfuncs_z*.npz and LF_sigmas_z*.npz. To use these files, a separate core/likelihood instance pair for each redshift is required. noisefile : str, optional Noise should be a `.npz` file with a single array 'sigma` which gives the error at each of the `Muv` bins in the `datafile`. If 1D, it must have the same length as ``Muv``. If 2D, must have the same length as the number of redshifts as the first dimension. mag_brightest : float, optional Brightest magnitude when calculating the likelihood. Default is -20. name : str, optional A name for the instance. This is used to pair it with a particular core instance. """ required_cores = ((core.CoreLuminosityFunction, core.Core21cmEMU),)
[docs] def __init__(self, *args, name="", mag_brightest=-20.0, z=None, **kwargs): super().__init__(*args, **kwargs) if self.datafile is not None and len(self.datafile) != 1: raise ValueError( "can only pass a single datafile to LikelihoodLuminosityFunction!" ) if self.noisefile is not None and len(self.noisefile) != 1: raise ValueError( "can only pass a single noisefile to LikelihoodLuminosityFunction!" ) # This argument is for the emulator to know which z bin this likelihood is for. self.z = z self.name = str(name) self.mag_brightest = mag_brightest
[docs] def setup(self): """Setup instance.""" if isinstance(self.paired_core, core.Core21cmEMU): if self.z is None: raise ValueError( "Must specify which z bin out of [6, 7, 8, 10] the likelihood is comparing to the data." ) self.zidx = np.argmin(abs(np.array([6, 7, 8, 10]) - int(self.z))) if not self._simulate: if self.datafile is None: if len(self.redshifts) != 1: raise ValueError( "to use the provided LFs, a separate core/likelihood instance pair for each redshift is required!" ) if self.redshifts[0] not in [6, 7, 8, 10]: raise ValueError( "only LFs at z=6,7,8 and 10 are provided! use your own LF :)" ) self.datafile = [ path.join( path.dirname(__file__), "data", "LF_lfuncs_z%d.npz" % self.redshifts[0], ) ] if self.noisefile is None: self.noisefile = [ path.join( path.dirname(__file__), "data", "LF_sigmas_z%d.npz" % self.redshifts[0], ) ] super().setup() # We only allow one datafile, so get the data out of it # to make it easier to work with. self.data = self.data[0] if self.noise is not None: self.noise = self.noise[0]
def _read_data(self): data = super()._read_data() # data only has one entry (only one file allowed) # if that entry has values with one dimension, add a first # dimension as a redshift dimension of size 1. for key, val in data[0].items(): if not hasattr(val[0], "__len__"): data[0][key] = np.atleast_2d(val) return data def _read_noise(self): noise = super()._read_noise() # data only has one entry (only one file allowed) # if that entry has values with one dimension, add a first # dimension as a redshift dimension of size 1. for key, val in noise[0].items(): if not hasattr(val[0], "__len__"): noise[0][key] = np.atleast_2d(val) return noise @cached_property def paired_core(self): """The luminosity function core that is paired with this likelihood.""" paired = [] for c in self._cores: if (isinstance(c, core.CoreLuminosityFunction) and c.name == self.name) or ( isinstance(c, core.Core21cmEMU) and c.name == self.name ): paired.append(c) if len(paired) > 1: raise ValueError( "You've got more than one CoreLuminosityFunction / Core21cmEMU with the same name -- they will overwrite each other!" ) return paired[0] @property def redshifts(self): """Redshifts at which luminosity function is defined.""" if isinstance(self.paired_core, core.Core21cmEMU): if hasattr(self.z, "__len__"): return self.z else: return np.array([self.z]) else: return self.paired_core.redshift
[docs] def reduce_data(self, ctx): """Reduce simulated model data.""" if isinstance(self.paired_core, core.Core21cmEMU): final_data = {} shape = ctx.get("UVLFs").shape if len(shape) == 3: final_data["lfunc"] = ctx.get("UVLFs")[:, self.zidx, :].reshape( (shape[0], 1, shape[-1]) ) else: final_data["lfunc"] = ctx.get("UVLFs")[self.zidx, :].reshape([1, -1])[ np.newaxis, ... ] # Add two dimensions for nparams N = final_data["lfunc"].shape[0] final_data["Muv"] = np.array( list(ctx.get("Muv").reshape([1, -1])[np.newaxis, ...]) * N ) # TODO check if error is Gaussian and incorporate it properly return final_data else: lfunc = ctx.get("luminosity_function" + self.name) if not self._is_setup: # During setup, return a list, so that it can be matched with the # list of length one of datafile to be written. return [lfunc] else: return lfunc
[docs] def computeLikelihood(self, model): """Compute the likelihood.""" N = model["lfunc"].shape[0] lnl = np.zeros(N) if len(self.data["lfunc"].shape) == 3: data = {"lfunc": self.data["lfunc"][0], "Muv": self.data["Muv"][0]} else: data = self.data for n in range(N): for i, z in enumerate(self.redshifts): if len(model["Muv"].shape) == 3: if model["Muv"][n][i][0] > model["Muv"][n][i][1]: muv = model["Muv"][n][i][::-1] lfunc = model["lfunc"][n, i][::-1] else: muv = model["Muv"][n][i] lfunc = model["lfunc"][n, i] else: muv = model["Muv"][n][i] lfunc = model["lfunc"][n, i] mask = ~np.isnan(lfunc) model_spline = InterpolatedUnivariateSpline(muv[mask], lfunc[mask]) total_err = self.noise["sigma"][i] ** 2 lnl[n] += -0.5 * np.sum( ( (data["lfunc"][i] - 10 ** model_spline(data["Muv"][i])) ** 2 / total_err )[data["Muv"][i] > self.mag_brightest] ) logger.debug(f"UV LF Likelihood computed: {lnl}") return lnl
[docs] def define_noise(self, ctx, model): """Define noise properties.""" sig = self.paired_core.sigma if callable(sig[0]): return [{"sigma": [s(m["Muv"]) for s, m in zip(sig, model)]}] else: return [{"sigma": sig}]
[docs] class LikelihoodEDGES(LikelihoodBaseFile): """ A likelihood based on chi^2 comparison to Global Signal of EDGES timing and fwhm. This is the likelihood arising from Bowman et al. (2018), which reports an absorption feature in the 21-cm brightness temperature spectra Parameters ---------- use_width : bool whether to use the fwhm in the likelihood, by default it's False """ freq_edges = 78.0 freq_err_edges = 1.0 fwhm_edges = 19.0 fwhm_err_upp_edges = 4.0 fwhm_err_low_edges = 2.0 required_cores = (core.CoreLightConeModule,)
[docs] def __init__(self, use_width=False, *args, **kwargs): super().__init__(*args, **kwargs) self.use_width = use_width
[docs] def reduce_data(self, ctx): """Reduce data to model.""" frequencies = 1420.4 / (np.array(ctx.get("lightcone").node_redshifts) + 1) global_signal = ctx.get("lightcone").global_brightness_temp global_signal_interp = InterpolatedUnivariateSpline( frequencies, global_signal, k=4 ) cr_pts = global_signal_interp.derivative().roots() cr_vals = global_signal_interp(cr_pts) results = {} if len(cr_vals) == 0: # there is no solution -- global signal never reaches a minimum results["freq_tb_min"] = None results["fwhm"] = None else: freq_tb_min = cr_pts[np.argmin(cr_vals)] results["freq_tb_min"] = freq_tb_min if not self.use_width: results["fwhm"] = None # calculating the frequencies when global signal = the fwhm freqs_hm = InterpolatedUnivariateSpline( frequencies, global_signal - global_signal_interp(freq_tb_min) * 0.5, k=3, ).roots() if len(freqs_hm) == 2: # there are two of them, one is the lower bound of the fwhm and the # other one is the upper freq_r = freqs_hm[1] freq_l = freqs_hm[0] elif len(freqs_hm) == 1: # therea are only one of them if freqs_hm[0] > freq_tb_min: # it's larger than the frequency of the minimum, so it's the upper # bound of the fwhm then use the boundary to be the lower one freq_r = freqs_hm[0] freq_l = frequencies[0] else: # it's smaller than the frequency of the minimum, so it's the lower # bound of the fwhm then use the boundary to be the upper one freq_l = freqs_hm[0] freq_r = frequencies[-1] elif len(freqs_hm) > 2: # therea are more two of them, need to find the closest two freq_1 = freqs_hm[np.argmin(np.abs(freqs_hm - freq_tb_min))] if freq_1 < freq_tb_min: # the closest one is smaller than the frequency of the minimum, so # it's the lower bound of the fwhm freq_l = freq_1 freq_rs = freqs_hm[freqs_hm > freq_tb_min] # find the rest which are larger than the frequency of the minimum if len(freq_rs) > 0: # the smallest should be the upper bound of the fwhm freq_r = freq_rs[0] else: # if none, use the boundary freq_r = frequencies[-1] else: # the closest one is larger than the frequency of the minimum, so # it's the upper bound of the fwhm freq_r = freq_1 freq_ls = freqs_hm[freqs_hm < freq_tb_min] # find the rest which are smaller than the frequency of the minimum if len(freq_ls) > 0: # the largest should be the lower bound of the fwhm freq_l = freq_ls[-1] else: # if none, use the boundary freq_l = frequencies[0] if len(freqs_hm) == 0: results["fwhm"] = None else: results["fwhm"] = freq_r - freq_l return results
[docs] def computeLikelihood(self, model): """ Compute the likelihood, given the lightcone output from 21cmFAST. Parameters ---------- model : list of dicts Exactly the output of :meth:`simulate`. Returns ------- lnl : float The log-likelihood for the given model. """ if model["freq_tb_min"] is None: return -np.inf if self.use_width: if model["fwhm"] is None: return -np.inf else: # asymmetric uncertainty follows approximation in Barlow04, Sec 3.6 denominator = self.fwhm_err_upp_edges * self.fwhm_err_low_edges + ( self.fwhm_err_upp_edges - self.fwhm_err_low_edges ) * (model["fwhm"] - self.fwhm_edges) if denominator <= 0: return -np.inf else: return ( -0.5 * np.square( (model["freq_tb_min"] - self.freq_edges) / self.freq_err_edges ) + -0.5 * np.square(model["fwhm"] - self.fwhm_edges) / denominator ) else: return -0.5 * np.square( (model["freq_tb_min"] - self.freq_edges) / self.freq_err_edges )
[docs] class LikelihoodForest(LikelihoodBaseFile): """ A likelihood based on chi^2 comparison to measured CDF of Lyman-alpha forest effective optical depth. This is the likelihood arising from Bosman et al. (2018), which reports new constraints on Lyman-alpha opacity with a sample of 62 quasars at z>5.7, or D'Odorico et al. in prep., which includes new samples from the XQR-30 survey. Parameters ---------- name : str The name used to match the core observation : str The observation that is used to construct the tau_eff statisctic. """ required_cores = (core.CoreForest,)
[docs] def __init__(self, name="", observation="bosman_optimistic", *args, **kwargs): super().__init__(*args, **kwargs) self.name = str(name) self.observation = str(observation) self.n_realization = 150
[docs] def setup(self): """Setup instance.""" if len(self.redshifts) != 1: raise ValueError( "to use the provided forests, a separate core/likelihood instance pair for each redshift is required!" ) if "bosman" in self.observation: if self.redshifts[0] not in [5.0, 5.2, 5.4, 5.6, 5.8, 6.0]: raise ValueError( "only forests at z=5.0, 5.2, 5.4, 5.6, 5.8, 6.0 are provided for bosman!" ) self.tau_range = [0, 8] self.hist_bin_width = 0.1 self.hist_bin_size = int( (self.tau_range[1] - self.tau_range[0]) / self.hist_bin_width ) self.datafile = [ path.join(path.dirname(__file__), "data/Forests/Bosman18/data.npz") ] self.noisefile = [ path.join( path.dirname(__file__), "data/Forests/Bosman18/PDF_ErrorCovarianceMatrix_GP/z%s.npy" % str(self.redshifts[0]).replace(".", "pt"), ) ] else: raise NotImplementedError("Use bosman_optimistic or bosman_pessimistic!") super().setup()
def _read_data(self): data = super()._read_data()[0] targets = np.where( (data["zs"] > (self.redshifts[0] - 0.1)) * (data["zs"] <= (self.redshifts[0] + 0.1)) )[0] pdfs = np.zeros([2, self.hist_bin_size]) pdfs[0] = np.histogram( data["tau_lower"][targets], range=self.tau_range, bins=self.hist_bin_size )[0] pdfs[1] = np.histogram( data["tau_upper"][targets], range=self.tau_range, bins=self.hist_bin_size )[0] return pdfs def _read_noise(self): # read the ECM due to the GP approximation ErrorCovarianceMatrix_GP = super()._read_noise()[0] return ErrorCovarianceMatrix_GP @cached_property def paired_core(self): """The forest core that is paired with this likelihood.""" paired = [] for c in self._cores: if isinstance(c, core.CoreForest) and c.name == self.name: paired.append(c) if len(paired) > 1: raise ValueError( "You've got more than one CoreForest with the same name -- they will overwrite each other!" ) return paired[0] @property def redshifts(self): """Redshifts at which forest is defined.""" return self.paired_core.redshift @property def _is_lightcone(self): return isinstance(self.core_primary, core.CoreLightConeModule)
[docs] def reduce_data(self, ctx): """Reduce data to model.""" if not self._is_lightcone: raise NotImplementedError( "The Forest can only work with lightcone at the moment" ) tau_eff = ctx.get("tau_eff_%s" % self.name) # use the same binning as the obs n_realization = tau_eff.shape[0] pdfs = np.zeros([n_realization, self.hist_bin_size]) for jj in range(n_realization): pdfs[jj] = np.histogram( tau_eff[jj], range=self.tau_range, bins=self.hist_bin_size )[0] ecm_cosmic = np.cov(pdfs.T) self.noise = ( self.noise + ecm_cosmic + np.diag(np.ones(self.hist_bin_size) * 1e-5) ) return np.mean(pdfs, axis=0)
[docs] def computeLikelihood(self, model): """ Compute the likelihood, given the lightcone output from 21cmFAST. Parameters ---------- model : list of pdfs Exactly the output of :meth:`simulate`. Returns ------- lnl : float The log-likelihood for the given model. """ det = np.linalg.det(self.noise) if det == 0: logger.warning( "Determinant is zero for this error covariance matrix, return -inf for lnl" ) return -np.inf diff = model - self.data[0] for ii in np.where(self.data[0] != self.data[1])[0]: if model[ii] < self.data[0][ii]: diff[ii] = min(0, model[ii] - self.data[1][ii]) diff = diff.reshape([1, -1]) lnl = ( -0.5 * np.linalg.multi_dot([diff, np.linalg.inv(self.noise), diff.T])[0, 0] ) if det < 0: logger.warning( "Determinant (%f) is negative for this error covariance matrix, lnl=%f, return -inf for lnl" % (det, lnl) ) return -np.inf return lnl
[docs] class Likelihood1DPowerLightconeUpper(Likelihood1DPowerLightcone): r""" Likelihood based on Chi^2 comparison of a 21 cm PS model to HERA H1C upper limit data. Parameters ---------- datafile : str, optional Input data should be in a `.npz` file, and contain the following fields: * ``z_bands``: the redshift of the observation bands * ``bandx``: the power spectrum upper limits at redshift x * ``wfbandx``: the window function for band x * ``kwfbandx``: the k values [/Mpc] of the window function for band x """
[docs] def __init__( self, datafile="", data=None, name="", *args, **kwargs, ): super().__init__(*args, **kwargs) self.name = name self.datafile = [datafile] if isinstance(datafile, (str, Path)) else datafile
[docs] @classmethod def from_builtin_data(cls, datafile="", **kwargs): """Create the class loading in built-in data.""" datafile = path.join(path.dirname(__file__), "data", datafile + ".npz") return cls(datafile=datafile, **kwargs)
[docs] def setup(self): """Setup the object.""" super().setup() self.redshifts = self.data[0]["z_bands"] self.k = [ self.data[0]["kwfband" + str(int(np.round(z)))] for z in self.redshifts ] self.k_len = max(len(i) for i in self.k)
[docs] def reduce_data(self, ctx): """Get the computed core data in nice form.""" # Interpolate the data onto the HERA bands and ks if len(ctx.get("PS").shape) > 2: final_PS = np.zeros( (ctx.get("PS").shape[0], len(self.redshifts), self.k_len) ) for j in range(ctx.get("PS").shape[0]): for i in range(self.redshifts.shape[0]): interp_ks = self.k[i] final_PS[j, i, : len(interp_ks)] = RectBivariateSpline( ctx.get("PS_redshifts"), ctx.get("k"), ctx.get("PS")[j, ...] )(self.redshifts[i], interp_ks) final_data = { "k": self.k, "delta": final_PS, } else: final_PS = np.zeros((1, len(self.redshifts), self.k_len)) for i in range(self.redshifts.shape[0]): interp_ks = self.k[i] final_PS[0, i, : len(interp_ks)] = RectBivariateSpline( ctx.get("PS_redshifts"), ctx.get("k"), ctx.get("PS") )(self.redshifts[i], interp_ks) final_data = { "k": self.k, "delta": final_PS, } # If we are using the emualtor, include the error. try: final_PS_err = np.zeros((len(self.redshifts), self.k_len)) for i in range(self.redshifts.shape[0]): interp_ks = self.k[i] final_PS_err[i, : len(interp_ks)] = RectBivariateSpline( ctx.get("PS_redshifts"), ctx.get("k"), ctx.get("PS_err") )(self.redshifts[i], interp_ks) final_data["delta_err"] = final_PS_err except: pass return [final_data]
[docs] def computeLikelihood(self, model): """ Compute the likelihood given 1D power spectrum values at the same k-bins as the data. Parameters ---------- model : np.ndarray 1D Power spectrum in log10(mK^2) and its k bin values in /Mpc If there is a window function available, then input the PS after the WF has been applied. Returns ------- lnl : float Log likelihood for the provided model. For H1C: Data shape = 5 fields, 37 kbins, 4 = [kval, power, variance], 2 (band1=10 band2=8) """ N = model[0]["delta"].shape[0] lnl = np.zeros(N) hera_data = self.data[0] for i in range(N): for band in self.redshifts: for field in range(hera_data["band" + str(round(band))].shape[0]): PS_limit_ks = hera_data["band" + str(round(band))][field, :, 0] PS_limit_ks = PS_limit_ks[~np.isnan(PS_limit_ks)] Nkbins = len(PS_limit_ks) PS_limit_vals = hera_data["band" + str(round(band))][ field, :Nkbins, 1 ] PS_limit_vars = hera_data["band" + str(round(band))][ field, :Nkbins, 2 ] kwf_limit_vals = hera_data["kwfband" + str(round(band))] Nkwfbins = len(kwf_limit_vals) PS_limit_wfcs = hera_data["wfband" + str(round(band))][ field, :Nkbins, : ] PS_limit_wfcs = PS_limit_wfcs.reshape([Nkbins, Nkwfbins]) model_zs = self.redshifts zbin = np.argmin(abs(band - model_zs)) ModelPS_val = model[0]["delta"][i, zbin, :Nkwfbins] ModelPS_val_afterWF = np.dot(PS_limit_wfcs, ModelPS_val) # Include emulator error term if present if "delta_err" in model[0].keys(): ModelPS_val_1sigma_upper_afterWF = np.dot( PS_limit_wfcs, ModelPS_val + model[0]["delta_err"][zbin, :Nkwfbins], ) ModelPS_val_1sigma_lower_afterWF = np.dot( PS_limit_wfcs, ModelPS_val - model[0]["delta_err"][zbin, :Nkwfbins], ) # The upper and lower errors are very similar usually, so we can just take the mean and use that. mean_err = np.mean( [ ModelPS_val_1sigma_upper_afterWF - ModelPS_val_afterWF, ModelPS_val_afterWF - ModelPS_val_1sigma_lower_afterWF, ], axis=0, ) error_val = np.sqrt( PS_limit_vars + (0.2 * ModelPS_val_afterWF) ** 2 + (mean_err) ** 2 ) else: error_val = np.sqrt( PS_limit_vars + (0.2 * ModelPS_val_afterWF) ** 2 ) likelihood = 0.5 + 0.5 * erf( (PS_limit_vals - ModelPS_val_afterWF) / (np.sqrt(2) * error_val) ) # another way to write likelihood for 1-side Gaussian likelihood[likelihood <= 0.0] = 1e-50 lnl[i] += np.nansum(np.log(likelihood)) logger.debug( "HERA PS upper Likelihood computed: {lnl}".format( lnl=np.nansum(np.log(likelihood)) ) ) logger.debug(f"Total HERA PS upper Likelihood computed: {lnl}") return lnl
@cached_property def paired_core(self): """The 21cmEMU core that is paired with this likelihood.""" paired = [] for c in self._cores: if isinstance(c, core.Core21cmEMU) and c.name == self.name: paired.append(c) if len(paired) > 1: raise ValueError( "You've got more than one 21cmEMU with the same name -- they will overwrite each other!" ) return paired[0]