borg_examples/example3.py
2025-04-17 17:39:15 +02:00

516 lines
18 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import aquila_borg as borg
import numpy as np
import numbers
import jaxlib
import jax.numpy as jnp
import jax
import configparser
import ast
import warnings
from functools import partial
# Output stream management
cons = borg.console()
def myprint(x):
if isinstance(x, str):
cons.print_std(x)
else:
cons.print_std(repr(x))
def get_cosmopar(ini_file):
"""
Extract cosmological parameters from an ini file
Args:
:ini_file (str): Path to the ini file
Returns:
:cpar (borg.cosmo.CosmologicalParameters): Cosmological parameters
"""
config = configparser.ConfigParser()
config.read(ini_file)
cpar = borg.cosmo.CosmologicalParameters()
cpar.default()
cpar.fnl = float(config['cosmology']['fnl'])
cpar.omega_k = float(config['cosmology']['omega_k'])
cpar.omega_m = float(config['cosmology']['omega_m'])
cpar.omega_b = float(config['cosmology']['omega_b'])
cpar.omega_q = float(config['cosmology']['omega_q'])
cpar.h = float(config['cosmology']['h100'])
cpar.sigma8 = float(config['cosmology']['sigma8'])
cpar.n_s = float(config['cosmology']['n_s'])
cpar.w = float(config['cosmology']['w'])
cpar.wprime = float(config['cosmology']['wprime'])
cpar = compute_As(cpar)
return cpar
def compute_As(cpar):
"""
Compute As given values of sigma8
Args:
:cpar (borg.cosmo.CosmologicalParameters): Cosmological parameters with wrong As
Returns:
:cpar (borg.cosmo.CosmologicalParameters): Cosmological parameters with updated As
"""
# requires BORG-CLASS
if not hasattr(borg.cosmo, 'ClassCosmo'):
raise ImportError(
"BORG-CLASS is required to compute As, but is not installed.")
sigma8_true = jnp.copy(cpar.sigma8)
cpar.sigma8 = 0
cpar.A_s = 2.3e-9
k_max, k_per_decade = 10, 100
extra_class = {}
extra_class['YHe'] = '0.24'
cosmo = borg.cosmo.ClassCosmo(cpar, k_per_decade, k_max, extra=extra_class)
cosmo.computeSigma8()
cos = cosmo.getCosmology()
cpar.A_s = (sigma8_true/cos['sigma_8'])**2*cpar.A_s
cpar.sigma8 = sigma8_true
print('Updated cosmology:', cpar)
return cpar
class NullForward(borg.forward.BaseForwardModel):
"""
BORG forward model which does nothing but stores
the values of parameters to be used by the likelihood
"""
def __init__(self, box: borg.forward.BoxModel) -> None:
"""
Initialise the NullForward class
Args:
box (borg.forward.BoxModel): The input box model.
"""
super().__init__(box, box)
self.setName("nullforward")
self.params = {}
cpar = get_cosmopar(borg.getIniConfigurationFilename())
self.setCosmoParams(cpar)
def setModelParams(self, params: dict) -> None:
"""
Change the values of the model parameters to those given by params
Args:
params (dict): Dictionary of updated model parameters.
"""
for k, v in params.items():
self.params[k] = v
myprint(f'Updated model parameters: {self.params}')
def getModelParam(self, model, keyname: str):
"""
This queries the current state of the parameters keyname in model model.
Args:
model: The model
keyname (str): The name of the parameter of interest
"""
return self.params[keyname]
class MyLikelihood(borg.likelihood.BaseLikelihood):
"""
HADES likelihood class
"""
def __init__(self, fwd: borg.forward.BaseForwardModel, fwd_param: NullForward, ini_fname: str):
self.fwd = fwd
self.fwd_param = fwd_param
# Read the ini file
self.ini_fname = ini_fname
self.config = configparser.ConfigParser()
self.config.read(ini_fname)
self.N = [int(self.config['system'][f'N{i}']) for i in range(3)] # Number of grid points per side
self.L = [float(self.config['system'][f'L{i}']) for i in range(3)] # Box size lenght Mpc/h
self.sigma_dens = float(self.config['mock']['sigma_dens']) # Density scatter
myprint(f"Likelihood initialized with {self.N} grid points and box size {self.L} Mpc/h")
super().__init__(fwd, self.N, self.L)
# Set up cosmoligical parameters
cpar = get_cosmopar(ini_fname)
self.updateCosmology(cpar)
# Set up the forward model parameters
print('Setting up forward model parameters')
self.mu_true = float(self.config['model']['mu'])
model_params = {'mu':self.mu_true}
self.sigma_mu = float(self.config['model']['sigma_mu'])
self.fwd_param.setModelParams(model_params)
# Gradient of the likelihood
self.grad_like = jax.grad(self.dens2like)
def updateCosmology(self, cosmo: borg.cosmo.CosmologicalParameters) -> None:
cpar = compute_As(cosmo)
self.fwd.setCosmoParams(cpar)
self.fwd_param.setCosmoParams(cpar)
def updateMetaParameters(self, state: borg.likelihood.MarkovState) -> None:
"""
Update the meta parameters of the sampler (not sampled) from the MarkovState.
Args:
- state (borg.likelihood.MarkovState): The state object to be used in the likelihood.
"""
cosmo = state['cosmology']
cpar = compute_As(cosmo)
self.fwd.setCosmoParams(cpar)
self.fwd_param.setCosmoParams(cpar)
def initializeLikelihood(self, state: borg.likelihood.MarkovState) -> None:
"""
Initialize the likelihood function.
Args:
- state (borg.likelihood.MarkovState): The state object to be used in the likelihood.
"""
myprint("Init likelihood")
state.newArray3d("BORG_final_density", *self.fwd.getOutputBoxModel().N, True)
# Could load real data
# We'll generate mock data which has its own function
def generateMockData(self, s_hat:np.ndarray, state: borg.likelihood.MarkovState) -> None:
"""
Generates mock data by simulating the forward model with the given white noise
Args:
- s_hat (np.ndarray): The input (initial) white noise field.
- state (borg.likelihood.MarkovState): The Markov state object to be used in the likelihood.
"""
myprint('Making mock from BORG')
# Get density field from the initial conditions
# Could replace with any (better) simulation here
# This version is self-consistnet
dens = np.zeros(self.fwd.getOutputBoxModel().N)
myprint('Running forward model')
myprint(self.fwd.getCosmoParams())
self.fwd.forwardModel_v2(s_hat)
self.fwd.getDensityFinal(dens)
state["BORG_final_density"][:] = dens
self.true_dens = dens.copy()
# Add some scatter
myprint('Adding scatter')
self.obs_dens = self.true_dens + np.random.randn(*self.true_dens.shape) * self.sigma_dens
# Get observed mu
self.obs_mu = self.mu_true + np.random.randn() * self.sigma_mu
# Compute the likelihood and print it
myprint('From mock')
self.saved_s_hat = s_hat.copy()
self.logLikelihoodComplex(s_hat, False)
self.commitAuxiliaryFields(state)
myprint('Done')
def dens2like(self, output_density: np.ndarray):
"""
Compute the likelihood from the density field
Args:
- output_density (np.ndarray): The density field to be used in the likelihood.
Returns:
- float: The likelihood value.
"""
# Compute the likelihood from the density field
# This is a simple Gaussian likelihood
# Could be replaced with any other likelihood
diff = output_density - self.obs_dens
like = 0.5 * jnp.sum(diff**2) / (self.sigma_dens**2)
mu = self.fwd_param.getModelParam('nullforward', 'mu')
like += 0.5 * (mu - self.obs_mu)**2 / (self.sigma_mu**2)
return like
def logLikelihoodComplex(self, s_hat:np.ndarray, gradientIsNext:bool):
# myprint('Getting density field now')
# Get the density field from the forward model
dens = np.zeros(self.fwd.getOutputBoxModel().N)
self.fwd.forwardModel_v2(s_hat)
self.fwd.getDensityFinal(dens)
L = self.dens2like(dens)
if isinstance(L, numbers.Number) or isinstance(L, jaxlib.xla_extension.ArrayImpl):
myprint(f"var(s_hat): {np.var(s_hat)}, Call to logLike: {L}")
self.delta = dens.copy()
return L
def gradientLikelihoodComplex(self, s_hat:np.ndarray):
# Run BORG density field
output_density = np.zeros(self.N)
self.fwd.forwardModel_v2(s_hat)
self.fwd.getDensityFinal(output_density)
# Compute the gradient of the likelihood
# d logL / d dens
mygradient = self.grad_like(output_density)
# Now get d logL / d s_hat
mygradient = np.array(mygradient, dtype=np.float64)
self.fwd.adjointModel_v2(mygradient)
mygrad_hat = np.zeros(s_hat.shape, dtype=np.complex128)
self.fwd.getAdjointModel(mygrad_hat)
self.fwd.clearAdjointGradient()
return mygrad_hat
def commitAuxiliaryFields(self, state: borg.likelihood.MarkovState) -> None:
"""
Commits the final density field to the Markov state.
Args:
- state (borg.state.State): The state object containing the final density field.
"""
self.updateCosmology(self.fwd.getCosmoParams())
self.dens2like(self.delta)
state["BORG_final_density"][:] = self.delta.copy()
@borg.registerGravityBuilder
def build_gravity_model(state: borg.likelihood.MarkovState, box: borg.forward.BoxModel, ini_fname=None) -> borg.forward.BaseForwardModel:
"""
Builds the gravity model and returns the forward model chain.
Args:
- state (borg.likelihood.MarkovState): The Markov state object to be used in the likelihood.
- box (borg.forward.BoxModel): The input box model.
- ini_file (str, default=None): The location of the ini file. If None, use borg.getIniConfigurationFilename()
Returns:
borg.forward.BaseForwardModel: The forward model.
"""
global chain, fwd_param
myprint("Building gravity model")
if ini_fname is None:
ini_fname=borg.getIniConfigurationFilename()
config = configparser.ConfigParser()
config.read(ini_fname)
# READ FROM INI FILE
which_model = config['gravity']['which_model']
ai = float(config['gravity']['ai']) # Initial scale factor
af = float(config['gravity']['af']) # Final scale factor
supersampling = int(config['gravity']['supersampling'])
forcesampling = int(config['gravity']['forcesampling'])
nsteps = int(config['gravity']['nsteps']) # Number of steps in the PM solver
chain = borg.forward.ChainForwardModel(box)
# Make sure that the initial conditions are real in position space
chain.addModel(borg.forward.models.HermiticEnforcer(box))
# CLASS transfer function
chain @= borg.forward.model_lib.M_PRIMORDIAL_AS(box)
transfer_class = borg.forward.model_lib.M_TRANSFER_CLASS(box, opts=dict(a_transfer=1.0))
transfer_class.setModelParams({"extra_class_arguments":{'YHe':'0.24'}}) # helps deals with errors with primordial physics in CLASS for weird cosmologies
chain @= transfer_class
# This one works
# chain @= borg.forward.models.Primordial(box, ai)
# chain @= borg.forward.models.EisensteinHu(box)
# Gravity model
if which_model == 'lpt':
mod = borg.forward.model_lib.M_LPT_CIC(
box,
opts=dict(a_initial=af,
a_final=af,
do_rsd=False,
supersampling=supersampling,
lightcone=False,
part_factor=1.01,))
elif which_model == '2lpt':
mod = borg.forward.model_lib.M_2LPT_CIC(
box,
opts=dict(a_initial=af,
a_final=af,
do_rsd=False,
supersampling=supersampling,
lightcone=False,
part_factor=1.01,))
elif which_model == 'pm':
mod = borg.forward.model_lib.M_PM_CIC(
box,
opts=dict(a_initial=af,
a_final=af,
pm_start_z=1/ai - 1,
do_rsd=False,
supersampling=supersampling,
forcesampling=forcesampling,
lightcone=False,
part_factor=1.01,
pm_nsteps=nsteps, # Number of steps in the PM solver
tcola=False
))
elif which_model == 'cola':
mod = borg.forward.model_lib.M_PM_CIC(
box,
opts=dict(a_initial=af,
a_final=af,
pm_start_z=1/ai - 1,
do_rsd=False,
supersampling=supersampling,
forcesampling=forcesampling,
lightcone=False,
part_factor=1.01,
pm_nsteps=nsteps, # Number of steps in the PM solver
tcola=True
))
else:
raise ValueError(f"Unknown model {which_model}")
mod.accumulateAdjoint(True)
chain @= mod
# Cosmological parameters
cpar = get_cosmopar(borg.getIniConfigurationFilename())
print('Setting cosmo params', cpar)
chain.setCosmoParams(cpar)
# Model parameters
fwd_param = borg.forward.ChainForwardModel(box)
fwd_param @= NullForward(box)
fwd_param.setCosmoParams(cpar)
return chain
def check_cosmo_sampling(loop):
return loop.getStepID() > begin_cosmo
def check_model_sampling(loop):
return loop.getStepID() > begin_model
@borg.registerSamplerBuilder
def build_sampler(state: borg.likelihood.MarkovState, info: borg.likelihood.LikelihoodInfo, loop: borg.samplers.MainLoop):
"""
Builds the sampler and returns the main loop.
Args:
- state (borg.likelihood.MarkovState): The Markov state object to be used in the likelihood.
- info (borg.likelihood.LikelihoodInfo): The likelihood info object to be used in the likelihood.
- loop (borg.samplers.MainLoop): The main loop object to be used in the likelihood.
Returns:
borg.samplers.MainLoop: The main loop.
"""
global begin_cosmo, begin_model
myprint("Building sampler")
# Read from config file what to sample
config = configparser.ConfigParser()
config.read(borg.getIniConfigurationFilename())
end = '_sampler_blocked'
to_sample = [k[:-len(end)] for (k, v) in config['block_loop'].items() if k.endswith(end) and v.lower() == 'false']
myprint(f"Sampling {to_sample}")
all_sampler = []
# Add cosmology samplers
params = []
initial_values = {}
prior = {}
for p in ['omega_m', 'sigma8']:
if p not in to_sample:
continue
if p in config['prior'].keys() and p in config['cosmology'].keys():
myprint(f'Adding {p} sampler')
params.append(f"cosmology.{p}")
initial_values[f"cosmology.{p}"] = float(config['cosmology'][p])
prior[f"cosmology.{p}"] = np.array(ast.literal_eval(config['prior'][p]))
else:
s = f'Could not find {p} prior and/or default, so will not sample'
warnings.warn(s, stacklevel=2)
# Remove for later to prevent duplication
to_sample.remove(p)
begin_cosmo = int(config['mcmc']['warmup_cosmo'])
begin_model = int(config['mcmc']['warmup_model'])
if len(params) > 0:
myprint(f"Sampling cosmology parameters {params}")
cosmo_sampler = borg.samplers.ModelParamsSampler("", params, likelihood, chain, initial_values, prior)
cosmo_sampler.setName("cosmo_sampler")
all_sampler.append(cosmo_sampler)
loop.push(cosmo_sampler)
loop.addToConditionGroup("warmup_cosmo", "cosmo_sampler")
loop.addConditionToConditionGroup("warmup_cosmo", partial(check_cosmo_sampling, loop))
# Add model samplers
params = []
initial_values = {}
prior = {}
for p in to_sample:
if p in config['prior'].keys():
myprint(f'Adding {p} sampler')
params.append(p)
initial_values[p] = float(config['model'][p])
prior[p] = np.array(ast.literal_eval(config['prior'][p]))
else:
s = f'Could not find {p} prior and/or default, so will not sample'
warnings.warn(s, stacklevel=2)
if len(params) > 0:
myprint('Adding model parameter sampler')
model_sampler = borg.samplers.ModelParamsSampler("", params, likelihood, fwd_param, initial_values, prior)
model_sampler.setName("model_sampler")
all_sampler.append(model_sampler)
loop.push(model_sampler)
loop.addToConditionGroup("warmup_model", "model_sampler")
loop.addConditionToConditionGroup("warmup_model", partial(check_model_sampling, loop))
return []
@borg.registerLikelihoodBuilder
def build_likelihood(state: borg.likelihood.MarkovState, info: borg.likelihood.LikelihoodInfo) -> borg.likelihood.BaseLikelihood:
"""
Builds the likelihood and returns the likelihood object.
Args:
- state (borg.likelihood.MarkovState): The Markov state object to be used in the likelihood.
- info (borg.likelihood.LikelihoodInfo): The likelihood info object to be used in the likelihood.
Returns:
borg.likelihood.BaseLikelihood: The likelihood object.
"""
myprint("Building likelihood")
global likelihood
likelihood = MyLikelihood(chain, fwd_param, borg.getIniConfigurationFilename())
return likelihood