borg_examples/example0.py
2025-04-16 14:36:34 +02:00

329 lines
11 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import aquila_borg as borg
import numpy as np
import numbers
import jaxlib
import jax.numpy as jnp
import jax
import configparser
# Output stream management
cons = borg.console()
def myprint(x):
if isinstance(x, str):
cons.print_std(x)
else:
cons.print_std(repr(x))
def get_cosmopar(ini_file):
"""
Extract cosmological parameters from an ini file
Args:
:ini_file (str): Path to the ini file
Returns:
:cpar (borg.cosmo.CosmologicalParameters): Cosmological parameters
"""
config = configparser.ConfigParser()
config.read(ini_file)
cpar = borg.cosmo.CosmologicalParameters()
cpar.default()
cpar.fnl = float(config['cosmology']['fnl'])
cpar.omega_k = float(config['cosmology']['omega_k'])
cpar.omega_m = float(config['cosmology']['omega_m'])
cpar.omega_b = float(config['cosmology']['omega_b'])
cpar.omega_q = float(config['cosmology']['omega_q'])
cpar.h = float(config['cosmology']['h100'])
cpar.sigma8 = float(config['cosmology']['sigma8'])
cpar.n_s = float(config['cosmology']['n_s'])
cpar.w = float(config['cosmology']['w'])
cpar.wprime = float(config['cosmology']['wprime'])
return cpar
class MyLikelihood(borg.likelihood.BaseLikelihood):
"""
HADES likelihood class
"""
def __init__(self, fwd: borg.forward.BaseForwardModel, ini_fname: str):
self.fwd = fwd
# Read the ini file
self.ini_fname = ini_fname
self.config = configparser.ConfigParser()
self.config.read(ini_fname)
self.N = [int(self.config['system'][f'N{i}']) for i in range(3)] # Number of grid points per side
self.L = [float(self.config['system'][f'L{i}']) for i in range(3)] # Box size lenght Mpc/h
self.sigma_dens = float(self.config['mock']['sigma_dens']) # Density scatter
myprint(f"Likelihood initialized with {self.N} grid points and box size {self.L} Mpc/h")
super().__init__(fwd, self.N, self.L)
# Set up cosmoligical parameters
cpar = get_cosmopar(ini_fname)
self.updateCosmology(cpar)
# Gradient of the likelihood
self.grad_like = jax.grad(self.dens2like)
def updateCosmology(self, cosmo: borg.cosmo.CosmologicalParameters) -> None:
self.fwd.setCosmoParams(cosmo)
def updateMetaParameters(self, state: borg.likelihood.MarkovState) -> None:
"""
Update the meta parameters of the sampler (not sampled) from the MarkovState.
Args:
- state (borg.likelihood.MarkovState): The state object to be used in the likelihood.
"""
cpar = state['cosmology']
self.fwd.setCosmoParams(cpar)
def initializeLikelihood(self, state: borg.likelihood.MarkovState) -> None:
"""
Initialize the likelihood function.
Args:
- state (borg.likelihood.MarkovState): The state object to be used in the likelihood.
"""
myprint("Init likelihood")
state.newArray3d("BORG_final_density", *self.fwd.getOutputBoxModel().N, True)
# Could load real data
# We'll generate mock data which has its own function
def generateMockData(self, s_hat:np.ndarray, state: borg.likelihood.MarkovState) -> None:
"""
Generates mock data by simulating the forward model with the given white noise
Args:
- s_hat (np.ndarray): The input (initial) white noise field.
- state (borg.likelihood.MarkovState): The Markov state object to be used in the likelihood.
"""
myprint('Making mock from BORG')
# Get density field from the initial conditions
# Could replace with any (better) simulation here
# This version is self-consistnet
dens = np.zeros(self.fwd.getOutputBoxModel().N)
myprint('Running forward model')
self.fwd.forwardModel_v2(s_hat)
self.fwd.getDensityFinal(dens)
state["BORG_final_density"][:] = dens
self.true_dens = dens.copy()
# Add some scatter
myprint('Adding scatter')
self.obs_dens = self.true_dens + np.random.randn(*self.true_dens.shape) * self.sigma_dens
# Compute the likelihood and print it
myprint('From mock')
self.saved_s_hat = s_hat.copy()
self.logLikelihoodComplex(s_hat, False)
def dens2like(self, output_density: np.ndarray):
"""
Compute the likelihood from the density field
Args:
- output_density (np.ndarray): The density field to be used in the likelihood.
Returns:
- float: The likelihood value.
"""
# Compute the likelihood from the density field
# This is a simple Gaussian likelihood
# Could be replaced with any other likelihood
diff = output_density - self.obs_dens
like = 0.5 * jnp.sum(diff**2) / (self.sigma_dens**2)
return like
def logLikelihoodComplex(self, s_hat:np.ndarray, gradientIsNext:bool):
myprint('Getting density field now')
# Get the density field from the forward model
dens = np.zeros(self.fwd.getOutputBoxModel().N)
self.fwd.forwardModel_v2(s_hat)
self.fwd.getDensityFinal(dens)
L = self.dens2like(dens)
if isinstance(L, numbers.Number) or isinstance(L, jaxlib.xla_extension.ArrayImpl):
myprint(f"var(s_hat): {np.var(s_hat)}, Call to logLike: {L}")
self.delta = dens.copy()
return L
def gradientLikelihoodComplex(self, s_hat:np.ndarray):
# Run BORG density field
output_density = np.zeros(self.N)
self.fwd.forwardModel_v2(s_hat)
self.fwd.getDensityFinal(output_density)
# Compute the gradient of the likelihood
# d logL / d dens
mygradient = self.grad_like(output_density)
# Now get d logL / d s_hat
mygradient = np.array(mygradient, dtype=np.float64)
self.fwd.adjointModel_v2(mygradient)
mygrad_hat = np.zeros(s_hat.shape, dtype=np.complex128)
self.fwd.getAdjointModel(mygrad_hat)
self.fwd.clearAdjointGradient()
return mygrad_hat
def commitAuxiliaryFields(self, state: borg.likelihood.MarkovState) -> None:
"""
Commits the final density field to the Markov state.
Args:
- state (borg.state.State): The state object containing the final density field.
"""
self.updateCosmology(self.fwd.getCosmoParams())
self.dens2like(self.delta)
state["BORG_final_density"][:] = self.delta
@borg.registerGravityBuilder
def build_gravity_model(state: borg.likelihood.MarkovState, box: borg.forward.BoxModel) -> borg.forward.BaseForwardModel:
"""
Builds the gravity model and returns the forward model chain.
Args:
- state (borg.likelihood.MarkovState): The Markov state object to be used in the likelihood.
- box (borg.forward.BoxModel): The input box model.
- ini_file (str, default=None): The location of the ini file. If None, use borg.getIniConfigurationFilename()
Returns:
borg.forward.BaseForwardModel: The forward model.
"""
global chain
myprint("Building gravity model")
# READ FROM INI FILE
which_model = 'cola'
ai = 0.05 # Initial scale factor
af = 1.0 # Final scale factor
supersampling = 2
forcesampling = 2
nsteps = 20 # Number of steps in the PM solver
chain = borg.forward.ChainForwardModel(box)
# Make sure that the initial conditions are real in position space
chain.addModel(borg.forward.models.HermiticEnforcer(box))
# CLASS transfer function
# chain @= borg.forward.model_lib.M_PRIMORDIAL_AS(box)
transfer_class = borg.forward.model_lib.M_TRANSFER_CLASS(box, opts=dict(a_transfer=1.0))
transfer_class.setModelParams({"extra_class_arguments":{'YHe':'0.24'}}) # helps deals with errors with primordial physics in CLASS for weird cosmologies
chain @= transfer_class
# Gravity model
if which_model == 'lpt':
mod = borg.forward.model_lib.M_LPT_CIC(
box,
opts=dict(a_initial=af,
a_final=af,
do_rsd=False,
supersampling=supersampling,
lightcone=False,
part_factor=1.01,))
elif which_model == '2lpt':
mod = borg.forward.model_lib.M_2LPT_CIC(
box,
opts=dict(a_initial=af,
a_final=af,
do_rsd=False,
supersampling=supersampling,
lightcone=False,
part_factor=1.01,))
elif which_model == 'pm':
mod = borg.forward.model_lib.M_PM_CIC(
box,
opts=dict(a_initial=af,
a_final=af,
pm_start_z=1/ai - 1,
do_rsd=False,
supersampling=supersampling,
forcesampling=forcesampling,
lightcone=False,
part_factor=1.01,
pm_nsteps=nsteps, # Number of steps in the PM solver
tcola=False
))
elif which_model == 'cola':
mod = borg.forward.model_lib.M_PM_CIC(
box,
opts=dict(a_initial=af,
a_final=af,
pm_start_z=1/ai - 1,
do_rsd=False,
supersampling=supersampling,
forcesampling=forcesampling,
lightcone=False,
part_factor=1.01,
pm_nsteps=nsteps, # Number of steps in the PM solver
tcola=True
))
mod.accumulateAdjoint(True)
chain @= mod
# Cosmological parameters
cpar = get_cosmopar(borg.getIniConfigurationFilename())
chain.setCosmoParams(cpar)
return chain
@borg.registerSamplerBuilder
def build_sampler(state: borg.likelihood.MarkovState, info: borg.likelihood.LikelihoodInfo, loop: borg.samplers.MainLoop):
"""
Builds the sampler and returns the main loop.
Args:
- state (borg.likelihood.MarkovState): The Markov state object to be used in the likelihood.
- info (borg.likelihood.LikelihoodInfo): The likelihood info object to be used in the likelihood.
- loop (borg.samplers.MainLoop): The main loop object to be used in the likelihood.
Returns:
borg.samplers.MainLoop: The main loop.
"""
# Here you can add cosmology sampling, model parameter sampling, etc.
# For now, we'll just sample the initial conditions so don't need to do anything
return []
@borg.registerLikelihoodBuilder
def build_likelihood(state: borg.likelihood.MarkovState, info: borg.likelihood.LikelihoodInfo) -> borg.likelihood.BaseLikelihood:
"""
Builds the likelihood and returns the likelihood object.
Args:
- state (borg.likelihood.MarkovState): The Markov state object to be used in the likelihood.
- info (borg.likelihood.LikelihoodInfo): The likelihood info object to be used in the likelihood.
Returns:
borg.likelihood.BaseLikelihood: The likelihood object.
"""
myprint("Building likelihood")
global likelihood
likelihood = MyLikelihood(chain, borg.getIniConfigurationFilename())
return likelihood