"""Module providing Core Modules for cosmoHammer.
This is the basis of the plugin system for :mod:`py21cmmc`.
TODO: Add description of the API of cores (and how to define new ones).
"""
import copy
import inspect
import logging
import numpy as np
import py21cmfast as p21
import warnings
from os import path
from scipy.interpolate import interp1d
from py21cmmc.cosmoHammer import Params
from . import _utils as ut
logger = logging.getLogger("21cmFAST")
class NotSetupError(AttributeError):
"""Exception for when a Core has not yet been setup."""
def __init__(self):
default_message = (
"setup() must have been called on the chain to use this method/attribute!"
)
super().__init__(default_message)
class NotAChain(AttributeError):
"""Exception when method is called outside a :class:`LikelihoodComputationChain`."""
def __init__(self):
default_message = (
"this Core or Likelihood must be part of a LikelihoodComputationChain "
"to enable this method/attribute!"
)
super().__init__(default_message)
class AlreadySetupError(Exception):
"""Exception to be raised if trying to setup a core twice."""
pass
[docs]
class ModuleBase:
"""Base module for both Cores and Likelihoods."""
# extra attributes (in addition to those passed to init) that define equality
_extra_defining_attributes = ()
# attributes to ignore (from those passed to init) for determining equality
_ignore_attributes = ()
# Cores that need to be loaded if this core is loaded. Sub-tuples in the list indicate
# "or" relationship.
required_cores = ()
[docs]
def __init__(self):
self._is_setup = False
def _check_required_cores(self):
for rc in self.required_cores:
# Ensure the required_core is a tuple -- we check that at least *one*
# of the cores in the tuple is in the _cores.
if not hasattr(rc, "__len__"):
rc = (rc,)
if not any(any(isinstance(m, r) for r in rc) for m in self._cores):
raise ValueError(
"%s needs the %s to be loaded."
% (self.__class__.__name__, rc.__class__.__name__)
)
@property
def chain(self):
"""Reference to the :class:`~LikelihoodComputationChain` containing this core."""
try:
return self._LikelihoodComputationChain
except AttributeError:
raise NotAChain
@property
def parameter_names(self):
"""Names of the parameters of the full chain."""
return getattr(self.chain.params, "keys", [])
def __eq__(self, other):
"""Compare this to another object for equality."""
if self.__class__.__name__ != other.__class__.__name__:
return False
args = []
for cls in self.__class__.mro():
args += inspect.getfullargspec(cls.__init__).args
args = tuple(set(args))
for arg in args + self._extra_defining_attributes:
if arg == "self" or arg in self._ignore_attributes:
continue
try:
if hasattr(self, "_" + arg):
if getattr(self, "_" + arg) != getattr(other, "_" + arg):
return False
elif hasattr(self, arg):
if getattr(self, arg) != getattr(other, arg):
return False
else:
logger.warning(f"parameter {arg} not found in instance")
except ValueError:
logger.warning(
f"parameter {arg} has type which does not allow for comparison"
)
return True
@property
def _cores(self):
"""List of all loaded cores."""
return self.chain.getCoreModules()
@property
def _rq_cores(self):
"""List of all loaded cores that are in the requirements, in order of the requirements."""
req = ut.flatten(self.required_cores)
return tuple(core for core in self._cores for r in req if isinstance(core, r))
@property
def core_primary(self):
"""The first core that appears in the requirements."""
return self._rq_cores[0] if self._rq_cores else self._cores[0]
[docs]
def setup(self):
"""Perform any post-init setup of the object."""
self._check_required_cores()
[docs]
class CoreBase(ModuleBase):
"""Base module for all cores."""
[docs]
def __init__(self, store=None):
super().__init__()
self.store = store or {}
def _check_required_cores(self):
for core in self._cores:
for rc in self.required_cores:
if core.__class__.__name__ == rc.__class__.__name__:
break
if core.__class__.__name__ == self.__class__.__name__:
raise ValueError(
"{this} requires {that} to be loaded.".format(
this=self.__class__.__name__, that=rc.__class__.__name__
)
)
[docs]
def prepare_storage(self, ctx, storage):
"""Add variables to dict which cosmoHammer will automatically store with the chain."""
for name, storage_function in self.store.items():
try:
storage[name] = storage_function(ctx)
except Exception:
logger.error(
"Exception while trying to evaluate storage function %s" % name
)
raise
[docs]
def build_model_data(self, ctx):
"""
Construct model data and place it in the context.
The data generated by this method should ideally be *deterministic*, so that
input parameters (which are inherently contained in the `ctx` object) map
uniquely to output data. The addition of stochasticity in order to produce mock
data is done in the :meth:`~convert_model_to_mock` method. All data necessary to
fully evaluate probabilities of mock data from the model data should be
determined in this method (including model uncertainties, if applicable).
Parameters
----------
ctx : dict-like
The context, from which parameters are accessed.
Returns
-------
dct : dict
A dictionary of data which was simulated.
"""
pass
[docs]
def convert_model_to_mock(self, ctx):
"""
Generate random mock data.
Given a context object containing data from :meth:`~build_model_data`, generate
random mock data, which should represent an exact forward-model of the process
under investigation.
Parameters
----------
ctx : dict-like
The context, from which parameters and other simulated model data can be accessed.
"""
pass
[docs]
def simulate_mock(self, ctx):
"""Generate all mock data and add it to the context."""
self.build_model_data(ctx)
self.convert_model_to_mock(ctx)
def __call__(self, ctx):
"""Call the class.
By default, it will just build model data, with no stochasticity.
"""
self.build_model_data(ctx)
[docs]
class CoreCoevalModule(CoreBase):
"""A Core Module which evaluates coeval cubes at given redshift.
On each iteration, this module will add to the context:
* ``init``: an :class:`~py21cmmc._21cmfast.wrapper.InitialConditions` instance
* ``perturb``: a :class:`~py21cmmc._21cmfast.wrapper.PerturbedField` instance
* ``xHI``: an :class:`~py21cmmc._21cmfast.wrapper.IonizedBox` instance
* ``brightness_temp``: a :class:`~py21cmmc._21cmfast.wrapper.BrightnessTemp` instance
Parameters
----------
redshift : float or array_like
The redshift(s) at which to evaluate the coeval cubes.
user_params : dict or :class:`~py21cmfast.UserParams`
Parameters affecting the overall dimensions of the cubes.
flag_options : dict or :class:`~py21cmfast.FlagOptions`
Options affecting choices for how the reionization is calculated.
astro_params : dict or :class:`~py21cmfast.AstroParams`
Astrophysical parameters of reionization.
.. note:: None of the parameters provided here affect the *MCMC* as such; they
merely provide a background model on which the MCMC will be performed.
Thus for example, passing ``HII_EFF_FACTOR=30`` in ``astro_params`` here
will be over-written per-iteration if ``HII_EFF_FACTOR`` is also passed as
a ``parameter`` to an MCMC routine using this core module.
cosmo_params : dict or :class:`~py21cmfast.CosmoParams`
Cosmological parameters of the simulations. Like ``astro_params``, these
are the *fiducial* parameters, but may be updated during an MCMC.
regenerate : bool, optional
Whether to force regeneration of simulations, even if matching cached data is found.
do_spin_temp: bool, optional
Whether to use spin temperature in the calculation, or assume the saturated limit.
z_step_factor: float, optional
How large the logarithmic steps between redshift are (if required).
z_heat_max: float, optional
Controls the global `Z_HEAT_MAX` parameter, which specifies the maximum redshift
up to which heating sources are required to specify the ionization field. Beyond
this, the ionization field is specified directly from the perturbed density field.
ctx_variables : list of str, optional
A list of strings. The strings must correspond to any (pickleable) member of
:class:`py21cmfast.Coeval`. These will be stored in the context on every iteration. Omitting as many as
possible is useful in that it reduces the memory that needs to be transmitted to each process. Furthermore,
in-built pickling has a restriction that arrays cannot be larger than 4GiB, which can be
easily over-run. Some typical options are:
* "brightness_temp"
* "xH_box"
* "density"
* "velocity"
* "Gamma12_box"
initial_conditions_seed : int, optional
If not `change_seeds_every_iter`, then this will define the random seed on which
the initial conditions for _all_ iterations is based. By default, a seed will be
chosen at random, _unless_ initial conditions exist in cache that match the
parameters of this instance (and ``regenerate`` is False). In this case, the
seed of the existing box will be adopted.
Other Parameters
----------------
store : dict, optional
The (derived) quantities/blobs to store in the MCMC chain, default empty. See
Notes below for details.
cache_dir : str, optional
The directory in which to search for the boxes and write them. By default, this
is the directory given by ``direc`` in the configuration file,
``~/.21CMMC/config.yml``. Note that for *reading* data, while the specified
``direc`` is searched first, the default directory will *also* be searched if no
appropriate data is found in ``direc``.
cache_ionize : bool, optional
Whether to cache ionization data sets (done before parameter retention step).
Default False.
Notes
-----
The ``store`` keyword is a dictionary, where each key specifies the name of the
resulting data entry in the samples object, and the value is a callable which
receives the ``context``, and returns a value from it.
.. note:: the ``store`` callable is saved to the core instance, which must be
pickleable in order to use multiprocessing. Thus it is generally unwise
to use a ``lambda`` function as the callable.
This means that the context can be inspected and arbitrarily summarised before
storage. In particular, this allows for taking slices of arrays and saving them. One
thing to note is that the context is dictionary-like, but is not a dictionary. The
elements of the context are only available by using the ``get`` method, rather than
directly subscripting the object like a normal dictionary.
.. warning:: Only scalars and arrays are supported for storage in the chain itself.
"""
_ignore_attributes = ["keep_data_in_memory"]
[docs]
def __init__(
self,
redshift,
user_params=None,
flag_options=None,
astro_params=None,
cosmo_params=None,
regenerate=True,
change_seed_every_iter=False,
ctx_variables=("brightness_temp", "xH_box"),
initial_conditions_seed=None,
global_params=None,
**io_options,
):
super().__init__(io_options.get("store", None))
self.redshift = redshift
if not hasattr(self.redshift, "__len__"):
self.redshift = [self.redshift]
self.user_params = p21.UserParams(user_params)
self.flag_options = p21.FlagOptions(flag_options)
self.astro_params = p21.AstroParams(astro_params)
self.cosmo_params = p21.CosmoParams(cosmo_params)
self.change_seed_every_iter = change_seed_every_iter
self.initial_conditions_seed = initial_conditions_seed
self.regenerate = regenerate
self.ctx_variables = ctx_variables
self.global_params = global_params or {}
self.io_options = {
"store": {}, # (derived) quantities to store in the MCMC chain.
"cache_dir": None, # where full data sets will be written/read from.
"cache_mcmc": False, # whether to cache ionization data sets
# (done before parameter retention step)
}
self.io_options.update(io_options)
if self.initial_conditions_seed and self.change_seed_every_iter:
logger.warning(
"Attempting to set initial conditions seed while desiring to change seeds every "
"iteration. Unsetting initial conditions seed."
)
self.initial_conditions_seed = None
[docs]
def setup(self):
"""
Perform setup of the core.
Notes
-----
This method is called automatically by its parent
:class:`~LikelihoodComputationChain`, and should not be invoked directly.
"""
super().setup()
# If the chain has different parameter truths, we want to use those for our defaults.
self.astro_params, self.cosmo_params = self._update_params(
self.chain.createChainContext().getParams()
)
# Here we save to disk the full default realization.
# The init and perturb boxes here can usually be re-used, and the box serves
# as a nice thing to compare to after MCMC.
# If modifying cosmo, we don't want to do this, because we'll create them
# on the fly on every iteration.
if (
all(p not in self.cosmo_params.self.keys() for p in self.parameter_names)
and not self.change_seed_every_iter
):
logger.info("Initializing default boxes for the entire chain.")
coeval = p21.run_coeval(
redshift=self.redshift,
user_params=self.user_params,
cosmo_params=self.cosmo_params,
astro_params=self.astro_params,
flag_options=self.flag_options,
write=True,
regenerate=self.regenerate,
direc=self.io_options["cache_dir"],
random_seed=self.initial_conditions_seed,
**self.global_params,
)
# update the seed
self.initial_conditions_seed = coeval[0].random_seed
logger.info("Initialization done.")
[docs]
def build_model_data(self, ctx):
"""Compute all data defined by this core and add it to the context."""
# Update parameters
logger.debug(f"Updating parameters: {ctx.getParams()}")
astro_params, cosmo_params = self._update_params(ctx.getParams())
logger.debug(f"AstroParams: {astro_params}")
logger.debug(f"CosmoParams: {cosmo_params}")
# Call C-code
coeval = p21.run_coeval(
redshift=self.redshift,
astro_params=astro_params,
cosmo_params=cosmo_params,
flag_options=self.flag_options,
user_params=self.user_params,
regenerate=False,
random_seed=self.initial_conditions_seed,
write=self.io_options["cache_mcmc"],
direc=self.io_options["cache_dir"],
**self.global_params,
)
logger.debug(f"Adding {self.ctx_variables} to context data")
for key in self.ctx_variables:
try:
ctx.add(key, [getattr(c, key) for c in coeval])
except AttributeError:
raise ValueError(f"ctx_variable {key} not an attribute of Coeval")
def _update_params(self, params):
"""
Update all the parameter structures which get passed to the driver, for one iteration.
Parameters
----------
params :
Parameter object from cosmoHammer
"""
ap_dict = copy.copy(self.astro_params.self)
cp_dict = copy.copy(self.cosmo_params.self)
ap_dict.update(
**{
k: getattr(params, k)
for k, v in params.items()
if k in self.astro_params.defining_dict
}
)
cp_dict.update(
**{
k: getattr(params, k)
for k, v in params.items()
if k in self.cosmo_params.defining_dict
}
)
return p21.AstroParams(**ap_dict), p21.CosmoParams(**cp_dict)
[docs]
class CoreLightConeModule(CoreCoevalModule):
"""
Core module for evaluating lightcone simulations.
See :class:`~CoreCoevalModule` for info on all parameters, which are identical to
this class, with the exception of `redshift`, which in this case must be a scalar.
This module will add the following quantities to the context:
* ``lightcone``: a :class:`~py21cmfast.LightCone` instance.
"""
[docs]
def __init__(self, *, name="", max_redshift=None, **kwargs):
if "ctx_variables" in kwargs:
warnings.warn(
"ctx_variables does not apply to the lightcone module (at least not yet). It will "
"be ignored."
)
super().__init__(**kwargs)
self.max_redshift = max_redshift
self.name = name
[docs]
def setup(self):
"""Setup the chain."""
# If the chain has different parameter truths, we want to use those for our defaults.
self.astro_params, self.cosmo_params = self._update_params(
self.chain.createChainContext().getParams()
)
# Here we save to disk the full default realization.
# The init and perturb boxes here can usually be re-used, and the box serves
# as a nice thing to compare to after MCMC.
# If modifying cosmo, we don't want to do this, because we'll create them
# on the fly on every iteration.
if (
all(p not in self.cosmo_params.self.keys() for p in self.parameter_names)
and not self.change_seed_every_iter
):
logger.info("Initializing default boxes for the entire chain.")
lightcone = p21.run_lightcone(
redshift=self.redshift[0],
max_redshift=self.max_redshift,
user_params=self.user_params,
cosmo_params=self.cosmo_params,
astro_params=self.astro_params,
flag_options=self.flag_options,
write=True,
regenerate=self.regenerate,
direc=self.io_options["cache_dir"],
random_seed=self.initial_conditions_seed,
**self.global_params,
)
# update the seed
self.initial_conditions_seed = lightcone.random_seed
logger.info("Initialization done.")
[docs]
def build_model_data(self, ctx):
"""Compute all data defined by this core and add it to the context."""
# Update parameters
astro_params, cosmo_params = self._update_params(ctx.getParams())
# TODO: make it a option that users can decide
lightcone_quantities = (
"brightness_temp",
"xH_box",
"temp_kinetic_all_gas",
"Gamma12_box",
"density",
)
lightcone = p21.run_lightcone(
redshift=self.redshift[0],
max_redshift=self.max_redshift,
astro_params=astro_params,
flag_options=self.flag_options,
cosmo_params=cosmo_params,
user_params=self.user_params,
regenerate=False,
random_seed=self.initial_conditions_seed,
write=self.io_options["cache_mcmc"],
direc=self.io_options["cache_dir"],
lightcone_quantities=lightcone_quantities,
global_quantities=lightcone_quantities,
**self.global_params,
)
ctx.add("lightcone", lightcone)
[docs]
class CoreLuminosityFunction(CoreCoevalModule):
r"""A Core Module that produces model luminosity functions at a range of redshifts.
Notes
-----
This core is vectorized i.e., it accepts an array of ``astro_params``.
Parameters
----------
sigma : float, callable, list of callables, or array_like
The standard deviation on the luminosity function measurement. If a float,
it is considered to be the standard deviation for all redshifts and luminosity
bins. If a 1D array, it is assumed to be a function of luminosity, and must
have the same length as the output luminosity from
:func:`py21cmfast.wrapper.compute_luminosity_function`. If a callable,
assumed to take a single argument -- a UV magitude array -- and return the
standard deviation (the same for all redshifts). If a list of callables, must
be the same length as redshift, with each callable having the same signature
as already described. If a 2D array, must have shape ``(n_redshifts, n_luminosity_bins)``.
Other Parameters
----------------
\*\*kwargs :
All other parameters are the same as :class:`CoreCoevalModule`.
"""
[docs]
def __init__(
self, sigma=None, name="", cosmo_params=None, n_muv_bins=100, **kwargs
):
self._sigma = sigma
self.name = str(name)
self.n_muv_bins = n_muv_bins
super().__init__(**kwargs)
if cosmo_params is not None:
self.cosmo_params = p21.CosmoParams(**cosmo_params)
[docs]
def setup(self):
"""Run post-init setup."""
CoreBase.setup(self)
# If the chain has different parameter truths, we want to use those for our defaults.
self.astro_params, self.cosmo_params = self._update_params(
self.chain.createChainContext().getParams()
)
[docs]
def run(self, astro_params, cosmo_params, ctx):
"""Return the luminosity function for given parameters."""
if self.flag_options.USE_MINI_HALOS:
lc = ctx.get("lightcone")
z_all = np.array(lc.node_redshifts)[::-1]
mturnovers = 10 ** interp1d(z_all, np.array(lc.log10_mturnovers)[::-1])(
self.redshift
)
mturnovers_mini = 10 ** interp1d(
z_all, np.array(lc.log10_mturnovers_mini)[::-1]
)(self.redshift)
if isinstance(astro_params, np.ndarray):
N = len(astro_params)
else:
N = 1
Muv = []
Mhalo = []
lfunc = []
for i in range(N):
muv, mhalo, lf = p21.compute_luminosity_function(
mturnovers=mturnovers if self.flag_options.USE_MINI_HALOS else None,
mturnovers_mini=mturnovers_mini
if self.flag_options.USE_MINI_HALOS
else None,
redshifts=self.redshift,
astro_params=astro_params[i]
if not isinstance(astro_params, p21.AstroParams)
else astro_params,
flag_options=self.flag_options,
cosmo_params=cosmo_params,
user_params=self.user_params,
nbins=self.n_muv_bins,
)
Muv.append(muv)
Mhalo.append(mhalo)
lfunc.append(lf)
return (
np.array(Muv),
np.array(Mhalo),
np.array(lfunc),
)
[docs]
def build_model_data(self, ctx):
"""Compute all data defined by this core and add it to the context."""
# Update parameters
astro_params = ctx.getParams()
if isinstance(astro_params, dict):
values = astro_params.values()
keys = astro_params.keys()
else:
values = astro_params.values
keys = astro_params.keys
if all(isinstance(v, (int, float)) for v in values):
astro_params, cosmo_params = self._update_params(astro_params)
elif all(isinstance(v, (np.ndarray, list)) for v in values):
lengths = [len(v) for v in values]
if lengths.count(lengths[0]) != len(lengths):
raise ValueError(
"For vectorized case, all parameters should have the same length."
)
ap = []
for t in zip(*values):
apars, cosmo_params = self._update_params(
Params(*[(k, v) for k, v in zip(keys, t)])
)
ap.append(apars)
astro_params = ap
astro_params = np.array(astro_params, dtype=object)
logger.debug(f"AstroParams: {astro_params}")
# Call C-code
Muv, mhalo, lfunc = self.run(astro_params, cosmo_params, ctx)
ctx.add(
"luminosity_function" + self.name,
{"Muv": Muv, "mhalo": mhalo, "lfunc": lfunc},
)
@property
def sigma(self):
"""Either a list of callables, or list/array of arrays. Length n_redshifts."""
if self._sigma is None:
return None
if not hasattr(self._sigma, "__len__") or len(self._sigma) != len(
self.redshift
):
return [self._sigma] * len(self.redshift)
else:
return self._sigma
[docs]
def convert_model_to_mock(self, ctx):
"""Update context entries for luminosity function to have randomness."""
if self.sigma is None:
raise ValueError("Cannot create a mock with sigma=None!")
lfunc = ctx.get("luminosity_function" + self.name)["lfunc"][0]
muv = ctx.get("luminosity_function" + self.name)["Muv"][0]
for i, s in enumerate(self.sigma): # each redshift
try:
lfunc[i] += np.random.normal(loc=0, scale=s(muv), size=len(lfunc[i]))
except TypeError:
lfunc[i] += np.random.normal(loc=0, scale=s, size=len(lfunc[i]))
[docs]
class CoreForest(CoreLightConeModule):
r"""A Core Module that produces model effective optical depth at a range of redshifts.
name : str
The name used to match the likelihood
observation : str
The observation that is used to construct the tau_eff statisctic.
Currently, only bosman_optimistic and bosman_pessimistic are provided.
n_realization : int
The number of realizations to evaluate the error covariance matrix, default is 150.
mean_flux : float
The mean flux (usually from observation) used to rescale the modelling results.
If not provided, the modelled mean flux will be rescaled according to input parameters
log10_f_rescale and f_rescale_slope.
Other Parameters
----------------
\*\*kwargs :
All other parameters are the same as :class:`CoreCoevalModule`.
"""
[docs]
def __init__(
self,
name="",
observation="bosman_optimistic",
n_realization=150,
mean_flux=None,
**kwargs,
):
self.observation = str(observation)
self.n_realization = n_realization
self.mean_flux = mean_flux
super().__init__(**kwargs)
self.name = str(name)
if (
self.observation == "bosman_optimistic"
or self.observation == "bosman_pessimistic"
):
data = np.load(
path.join(path.dirname(__file__), "data/Forests/Bosman18/data.npz"),
allow_pickle=True,
)
targets = (data["zs"] > self.redshift[0] - 0.1) * (
data["zs"] <= self.redshift[0] + 0.1
)
self.nlos = sum(targets)
self.bin_size = 50 / self.cosmo_params.hlittle
else:
raise NotImplementedError("Use bosman_optimistic or bosman_pessimistic!")
if self.nlos * self.n_realization > self.user_params.HII_DIM**2:
raise ValueError(
"You asked for %d realizations, larger than what the box has (Total los / needed los = %d / %d)! Increase HII_DIM!"
% (self.n_realization, self.user_params.HII_DIM**2, self.nlos)
)
[docs]
def setup(self):
"""Run post-init setup."""
CoreBase.setup(self)
[docs]
def tau_GP(self, gamma_bg, delta, temp, redshifts):
r"""Calculating the lyman-alpha optical depth in each pixel using the fluctuating GP approximation.
Parameters
----------
gamma_bg : float or array_like
The background photonionization rate in units of 1e-12 s**-1
delta : float or array_like
The underlying overdensity
temp : float or array_like
The kinectic temperature of the gas in 1e4 K
redshifts : float or array_like
Correspoding redshifts along the los
"""
gamma_local = np.zeros_like(gamma_bg)
residual_xHI = np.zeros_like(gamma_bg, dtype=np.float64)
flag_neutral = gamma_bg == 0
flag_zerodelta = delta == 0
if gamma_bg.shape != redshifts.shape:
redshifts = np.tile(redshifts, (*gamma_bg.shape[:-1], 1))
delta_ss = (
2.67e4 * temp**0.17 * (1.0 + redshifts) ** -3 * gamma_bg ** (2.0 / 3.0)
)
gamma_local[~flag_neutral] = gamma_bg[~flag_neutral] * (
0.98
* (
(1.0 + (delta[~flag_neutral] / delta_ss[~flag_neutral]) ** 1.64)
** -2.28
)
+ 0.02 * (1.0 + (delta[~flag_neutral] / delta_ss[~flag_neutral])) ** -0.84
)
Y_He = 0.245
# TODO: use global_params
residual_xHI[~flag_zerodelta] = 1 + gamma_local[~flag_zerodelta] * 1.0155e7 / (
1.0 + 1.0 / (4.0 / Y_He - 3)
) * temp[~flag_zerodelta] ** 0.75 / (
delta[~flag_zerodelta] * (1.0 + redshifts[~flag_zerodelta]) ** 3
)
residual_xHI[~flag_zerodelta] = residual_xHI[~flag_zerodelta] - np.sqrt(
residual_xHI[~flag_zerodelta] ** 2 - 1.0
)
return (
7875.053145028655
/ (
self.cosmo_params.hlittle
* np.sqrt(
self.cosmo_params.OMm * (1.0 + redshifts) ** 3
+ self.cosmo_params.OMl
)
)
* delta
* (1.0 + redshifts) ** 3
* residual_xHI
)
[docs]
def find_n_rescale(self, tau, mean_fluxave_target):
"""Find the rescaling factor so that the mean transmission equal to observations."""
# Newton-Raphson method
x = 1
Ntry = 0
while np.abs(np.mean(np.exp(-tau * x)) / mean_fluxave_target - 1) > 1e-2:
f_x = np.mean(np.exp(-tau * x)) - mean_fluxave_target
f_prime_x = np.min([-1e-10, np.mean(-tau * np.exp(-tau * x))])
x -= f_x / f_prime_x
if x < 0:
x = 0
Ntry += 1
if Ntry > 1e3:
break
raise RuntimeError("I've tried too many times...", x, f_x, f_prime_x)
return x
[docs]
def build_model_data(self, ctx):
"""Compute all data defined by this core and add it to the context."""
astro_params, cosmo_params = self._update_params(ctx.getParams())
lc = ctx.get("lightcone")
if not lc:
raise NotImplementedError("A lightcone core is required!")
lightcone_redshifts = lc.lightcone_redshifts
lightcone_distances = lc.lightcone_distances
total_los = lc.user_params.HII_DIM**2
index_right = np.where(
lightcone_distances
> (
lightcone_distances[
np.where(lightcone_redshifts > self.redshift[0])[0][0]
]
+ self.bin_size / 2
)
)[0][0]
index_left = np.where(
lightcone_distances
> (
lightcone_distances[
np.where(lightcone_redshifts > self.redshift[0])[0][0]
]
- self.bin_size / 2
)
)[0][0]
if index_left == 0:
# TODO here should give a warning!
index_right = np.where(
lightcone_distances > (lightcone_distances[0] + self.bin_size)
)[0][0]
# select a few number of the los according to the observation
tau_eff = np.zeros([self.n_realization, self.nlos])
if not self.mean_flux:
if not hasattr(ctx.getParams(), "log10_f_rescale"):
logger.warning(
"missing input hyper parameter, log10_f_rescale, assigning 0!"
)
f_rescale = 1
else:
f_rescale = 10 ** ctx.getParams().log10_f_rescale
if not hasattr(ctx.getParams(), "f_rescale_slope"):
logger.warning(
"missing input hyper parameter, f_rescale_slope, assigning 0!"
)
else:
f_rescale += (self.redshift[0] - 5.7) * ctx.getParams().f_rescale_slope
for jj in range(self.n_realization):
gamma_bg = lc.Gamma12_box[:, :, index_left:index_right].reshape(
[total_los, index_right - index_left]
)[jj :: int(total_los / self.nlos)][: self.nlos]
delta = (
lc.density[:, :, index_left:index_right].reshape(
[total_los, index_right - index_left]
)[jj :: int(total_los / self.nlos)][: self.nlos]
+ 1.0
)
temp = (
lc.temp_kinetic_all_gas[:, :, index_left:index_right].reshape(
[total_los, index_right - index_left]
)[jj :: int(total_los / self.nlos)][: self.nlos]
/ 1e4
)
tau_lyman_alpha = self.tau_GP(
gamma_bg, delta, temp, lightcone_redshifts[index_left:index_right]
)
if self.mean_flux:
f_rescale = self.find_n_rescale(tau_lyman_alpha, self.mean_flux)
tau_eff[jj] = -np.log(np.mean(np.exp(-tau_lyman_alpha * f_rescale), axis=1))
ctx.add("tau_eff_%s" % self.name, tau_eff)
[docs]
class CoreCMB(CoreBase):
r"""A Core Module that computes Cl^TT,TE,EE and phiphi (the lensing potentials).
Notes
-----
This core calls the CLASS CMB code and takes as an input the reionization history from 21cmFAST and a few cosmological parameters.
Parameters
----------
z_extrap_min : float
Minimal z for reionization in CLASS. It should basically always be set to 0.
z_extrap_max : float
Maximal z for reionization in CLASS. It depends on the reionization model.
z_HeI : float
Redshift of the first helium reionization. CLASS models helium reionzation with a tanh centered around zHeI.
z_HeII : float
Redshift of the second helium reionization. CLASS models helium reionzation with a tanh centered around zHeII.
use_21cmfast : float
Whether or not using EoR history from 21cmfast.
"""
[docs]
def __init__(
self,
verbose=0,
z_extrap_min=0,
z_extrap_max=20,
z_HeI=4,
z_HeII=3,
use_21cmfast=True,
user_params=None,
flag_options=None,
astro_params=None,
cosmo_params=None,
regenerate=True,
change_seed_every_iter=False,
ctx_variables=("brightness_temp", "xH_box"),
initial_conditions_seed=None,
global_params=None,
**io_options,
):
super().__init__(io_options.get("store", None))
if not use_21cmfast:
self.user_params = p21.UserParams(user_params)
self.flag_options = p21.FlagOptions(flag_options)
self.astro_params = p21.AstroParams(astro_params)
self.cosmo_params = p21.CosmoParams(cosmo_params)
self.change_seed_every_iter = change_seed_every_iter
self.initial_conditions_seed = initial_conditions_seed
self.regenerate = regenerate
self.ctx_variables = ctx_variables
self.global_params = global_params or {}
self.io_options = {
"store": {}, # (derived) quantities to store in the MCMC chain.
"cache_dir": None, # where full data sets will be written/read from.
"cache_mcmc": False, # whether to cache ionization data sets
# (done before parameter retention step)
}
self.io_options.update(io_options)
try:
from classy import Class
if verbose > 0:
print("import CLASS")
global cosmo
cosmo = Class()
self.verbose = verbose
self.z_extrap_min = z_extrap_min
self.z_extrap_max = z_extrap_max
self.z_HeI = z_HeI
self.z_HeII = z_HeII
self.use_21cmfast = use_21cmfast
except ImportError:
raise ImportError(
"You must have compiled the classy.pyx file. Please go to "
+ "/path/to/class/python and run the command\n "
+ "python setup.py build"
)
[docs]
def setup(self):
"""Perform any post-init setup of the object."""
super().setup()
def _update_params(self, params):
"""
Update all the parameter structures which get passed to the driver, for one iteration.
Parameters
----------
params :
Parameter object from cosmoHammer
"""
ap_dict = copy.copy(self.astro_params.self)
cp_dict = copy.copy(self.cosmo_params.self)
ap_dict.update(
**{
k: getattr(params, k)
for k, v in params.items()
if k in self.astro_params.defining_dict
}
)
cp_dict.update(
**{
k: getattr(params, k)
for k, v in params.items()
if k in self.cosmo_params.defining_dict
}
)
return p21.AstroParams(**ap_dict), p21.CosmoParams(**cp_dict)
[docs]
def build_model_data(self, ctx):
"""Compute the CMB power spectra from a ionization history."""
# option for class z_class_min = self.z_extrap_min
z_HeI = self.z_HeI # 4
z_HeII = self.z_HeII # 3
z_class_max = self.z_extrap_max
z_xe_0 = (
z_class_max + 1
) # xe is set to 0 at z=z_xe_0. placeholder: will be overwritten by recombination table in class.
# Extract relevant info from the context.
if self.use_21cmfast:
lightcone = ctx.get("lightcone")
h = lightcone.cosmo_params.hlittle
omega_b = lightcone.cosmo_params.OMb * h * h
omega_cdm = lightcone.cosmo_params.OMm * h * h - omega_b
sigma8 = lightcone.cosmo_params.SIGMA_8
n_s = lightcone.cosmo_params.POWER_INDEX
xHI = lightcone.global_xH
redshifts = lightcone.node_redshifts
if len(redshifts) < 3:
raise ValueError(
"You cannot use the Planck prior likelihood with less than 3 redshifts"
)
# Order the redshifts in increasing order
redshifts, xHI = np.sort(np.array([redshifts, xHI]))
# Translate xHI into xe for CLASS.
# The option -1, -2 ensure helium first and second reionization respectively at z_HeI and z_HeII.
xe = 1 - xHI
redshift_class = np.concatenate(
([0, z_HeII, z_HeI], redshifts[xe > 0], [z_xe_0])
)
xe = np.concatenate(([-2, -2, -1], xe[xe > 0], [0]))
common_settings = {
"output": "tCl, pCl, lCl",
"lensing": "yes",
"l_max_scalars": 3000,
# LambdaCDM parameters
"h": h,
"omega_b": omega_b,
"omega_cdm": omega_cdm,
"sigma8": sigma8,
"n_s": n_s,
# Take fixed value for primordial Helium (instead of automatic BBN adjustment)
"reio_parametrization": "reio_inter",
"reio_inter_num": len(xe),
"reio_inter_z": ",".join(
["%.5f" % x for x in redshift_class]
), # str(redshift_class),
"reio_inter_xe": ",".join(["%.5e" % x for x in xe]),
"input_verbose": self.verbose,
"background_verbose": self.verbose,
"thermodynamics_verbose": self.verbose,
"perturbations_verbose": self.verbose,
"transfer_verbose": self.verbose,
"primordial_verbose": self.verbose,
"spectra_verbose": self.verbose,
"nonlinear_verbose": self.verbose,
"lensing_verbose": self.verbose,
}
else:
# Update parameters
astro_params, cosmo_params = self._update_params(ctx.getParams())
h = self.cosmo_params.hlittle
omega_b = self.cosmo_params.OMb * h * h
omega_cdm = self.cosmo_params.OMm * h * h - omega_b
sigma8 = self.cosmo_params.SIGMA_8
n_s = self.cosmo_params.POWER_INDEX
common_settings = {
"output": "tCl, pCl, lCl",
"lensing": "yes",
"l_max_scalars": 3000,
# LambdaCDM parameters
"h": h,
"omega_b": omega_b,
"omega_cdm": omega_cdm,
"sigma8": sigma8,
"n_s": n_s,
"reionization_z_start_max": 70,
"z_reio": astro_params.F_STAR10,
"reionization_width": astro_params.ALPHA_STAR,
"helium_fullreio_redshift": z_HeII,
"input_verbose": self.verbose,
"background_verbose": self.verbose,
"thermodynamics_verbose": self.verbose,
"perturbations_verbose": self.verbose,
"transfer_verbose": self.verbose,
"primordial_verbose": self.verbose,
"spectra_verbose": self.verbose,
"nonlinear_verbose": self.verbose,
"lensing_verbose": self.verbose,
}
##############
#
# call CLASS
#
###############
cosmo.set(common_settings)
cosmo.compute()
if not self.use_21cmfast:
thermo = cosmo.get_thermodynamics()
# TODO: for some reason, truncating the output range is important for late use in the LH, e.g.LikelihoodNeutralFraction
flag = (thermo["z"] > 4) & (thermo["z"] < 50)
ctx.add("zs", thermo["z"][flag])
ctx.add("xHI", 1.0 - thermo["x_e"][flag] / 1.0818709330934035)
cl = self.get_cl(cosmo)
cosmo.struct_cleanup()
cosmo.empty()
ctx.add("cl_cmb", cl)
[docs]
def get_cl(self, cosmo, l_max=-1):
r"""Return the :math:`C_{\\ell}` from the cosmological code in :math:`\\mu {\\rm K}^2`."""
# get C_l^XX from the cosmological code
cl = cosmo.lensed_cl(int(l_max))
# convert dimensionless C_l's to C_l in muK**2
T = cosmo.T_cmb() # checked
for key in cl.keys():
# All quantities need to be multiplied by this factor, except the
# phi-phi term, that is already dimensionless
# phi cross-terms should only be multiplied with this factor once
if key not in ["pp", "ell", "tp", "ep"]:
cl[key] *= (T * 1.0e6) ** 2
elif key in ["tp", "ep"]:
cl[key] *= T * 1.0e6
return cl
[docs]
class Core21cmEMU(CoreBase):
r"""A Core Module that loads 21cmEMU and uses it to obtain 21cmFAST summaries.
Notes
-----
This core calls 21cmEMU and uses it to evaluate 21cmFAST summaries (power spectrum, global signal, neutral fraction, spin temperature)
given a set of astro_params. This core is vectorized i.e., it accepts an array of ``astro_params``.
Parameters
----------
redshift : float or array_like
The redshift(s) at which to evaluate the summary statistics.
astro_params : dict or :class:`~py21cmfast.AstroParams`
Astrophysical parameters of reionization model according to Park+19 parametrization.
version : str, optional
Emulator version to use, defaults to 'latest'.
"""
[docs]
def __init__(
self,
astro_params=None,
redshift=None,
k=None,
name="",
global_params=None,
ctx_variables=(
"Tb",
"Tb_err",
"Ts",
"Ts_err",
"xHI",
"xHI_err",
"redshifts",
"PS_redshifts",
"PS",
"PS_err",
"Muv",
"UVLFs",
"UVLFs_err",
"UVLF_redshifts",
"k",
"tau",
"tau_err",
),
cache_dir=None,
version="latest",
store=[],
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.name = str(name)
self.ctx_variables = ctx_variables
try:
from py21cmemu import Emulator, properties
except:
print("Could not load py21cmemu. Make sure it is installed properly.")
self.astro_param_keys = (
"F_STAR10",
"ALPHA_STAR",
"F_ESC10",
"ALPHA_ESC",
"M_TURN",
"t_STAR",
"L_X",
"NU_X_THRESH",
"X_RAY_SPEC_INDEX",
)
if astro_params is not None:
if isinstance(astro_params, p21.AstroParams):
self.astro_params = astro_params
else:
self.astro_params = p21.AstroParams(astro_params)
else:
self.astro_params = p21.AstroParams()
self.cosmo_params = p21.CosmoParams(properties.COSMO_PARAMS)
self.flag_options = p21.FlagOptions(properties.FLAG_OPTIONS)
self.user_params = p21.UserParams(properties.USER_PARAMS)
self.global_params = global_params or {}
self.io_options = {
"store": store, # which summaries to store
"cache_dir": cache_dir, # where the stored data will be written
}
self.emulator = Emulator(version=version)
def _update_params(self, params):
"""
Update all the parameter structures which get passed to the driver.
Parameters
----------
params :
Parameter object from cosmoHammer
"""
ap_dict = copy.copy(self.astro_params.self)
ap_dict.update(
**{
k: getattr(params, k)
for k, v in params.items()
if k in self.astro_params.defining_dict
}
)
return p21.AstroParams(**ap_dict)
[docs]
def build_model_data(self, ctx):
"""Compute all data defined by this core and add it to the context."""
# Update parameters
logger.debug(f"Updating parameters: {ctx.getParams()}")
astro_params = ctx.getParams()
if isinstance(astro_params, dict):
values = astro_params.values()
keys = astro_params.keys()
elif isinstance(astro_params, p21.AstroParams):
values = astro_params.defining_dict.values
keys = self.astro_param_keys
else:
values = astro_params.values
keys = astro_params.keys
# For build_computation_chain when params passed are an empty dict
if len(values) == 0:
astro_params = self._update_params(astro_params).defining_dict
astro_params = {k: astro_params[k] for k in self.astro_param_keys}
if (
all(isinstance(v, (np.ndarray, list, int, float)) for v in values)
and len(values) > 0
):
lengths = [len(v) for v in values]
if lengths.count(lengths[0]) != len(lengths):
raise ValueError(
"For vectorized case, all parameters should have the same length."
)
ap = []
for t in zip(*values):
ap.append(dict(zip(keys, t)))
astro_params = np.array(ap, dtype=object)
logger.debug(f"AstroParams: {astro_params}")
theta, outputs, errors = self.emulator.predict(astro_params=astro_params)
if self.io_options["cache_dir"] is not None:
if len(astro_params.shape) == 2:
pars = astro_params[0]
else:
pars = astro_params
par_vals = [f"{i:0.3e}" for i in list(pars)]
name = "_".join(par_vals)
outputs.write(
fname=self.io_options["cache_dir"] + name,
theta=theta,
store=self.io_options["store"],
)
logger.debug(f"Adding {self.ctx_variables} to context data")
for key in self.ctx_variables:
try:
ctx.add(key + self.name, getattr(outputs, key))
except AttributeError:
try:
ctx.add(key + self.name, errors[key])
except:
raise ValueError(
f"ctx_variable {key} not an attribute of EmulatorOutput or errors dict."
)