borg_velocity/tests/velocity_bug.py
2025-02-05 15:53:57 +01:00

144 lines
4.7 KiB
Python

import aquila_borg as borg
import numpy as np
# Output stream management
cons = borg.console()
myprint = lambda x: cons.print_std(x) if type(x) == str else cons.print_std(repr(x))
def build_gravity_model(box: borg.forward.BoxModel, gravity='lpt', velmodel_name='LinearModel') -> borg.forward.BaseForwardModel:
"""
Builds the gravity model and returns the forward model chain.
Args:
- box (borg.forward.BoxModel): The input box model.
- gravity (str, default='lpt'): The gravity model to use.
- velmodel_name (str, default='LinearModel'): The velocity model to use.
Returns:
- chain (borg.forward.BaseForwardModel): The forward model.
- fwd_vel (borg.forward.BaseForwardModel): The forward model for velocities.
"""
myprint(f"Building gravity model {gravity}")
ai = 0.05
af = 1.0
supersampling = 2
forcesampling = 4
rsmooth = 4. # Mpc/h
nsteps = 20
# Setup forward model
chain = borg.forward.ChainForwardModel(box)
chain.addModel(borg.forward.models.HermiticEnforcer(box))
# CLASS transfer function
chain @= borg.forward.model_lib.M_PRIMORDIAL_AS(box)
transfer_class = borg.forward.model_lib.M_TRANSFER_CLASS(box, opts=dict(a_transfer=1.0))
transfer_class.setModelParams({"extra_class_arguments":{'YHe':'0.24'}})
chain @= transfer_class
if gravity == 'linear':
raise NotImplementedError(gravity)
elif gravity == 'lpt':
mod = borg.forward.model_lib.M_LPT_CIC(
box,
opts=dict(a_initial=af,
a_final=af,
do_rsd=False,
supersampling=supersampling,
lightcone=False,
part_factor=1.01,))
elif gravity == '2lpt':
mod = borg.forward.model_lib.M_2LPT_CIC(
box,
opts=dict(a_initial=af,
a_final=af,
do_rsd=False,
supersampling=supersampling,
lightcone=False,
part_factor=1.01,))
elif gravity == 'pm':
mod = borg.forward.model_lib.M_PM_CIC(
box,
opts=dict(a_initial=af,
a_final=af,
do_rsd=False,
supersampling=supersampling,
part_factor=1.01,
forcesampling=forcesampling,
pm_start_z=1/ai - 1,
pm_nsteps=nsteps,
tcola=False))
elif gravity == 'cola':
mod = borg.forward.model_lib.M_PM_CIC(
box,
opts=dict(a_initial=af,
a_final=af,
do_rsd=False,
supersampling=supersampling,
part_factor=1.01,
forcesampling=forcesampling,
pm_start_z=1/ai - 1,
pm_nsteps=nsteps,
tcola=True))
else:
raise NotImplementedError(gravity)
mod.accumulateAdjoint(True)
chain @= mod
# Cosmological parameters
cpar = borg.cosmo.CosmologicalParameters()
cpar.default()
chain.setCosmoParams(cpar)
# This is the forward model for velocity
velmodel = getattr(borg.forward.velocity, velmodel_name)
if velmodel_name == 'LinearModel':
fwd_vel = velmodel(box, mod, af)
elif velmodel_name == 'CICModel':
fwd_vel = velmodel(box, mod, rsmooth)
else:
fwd_vel = velmodel(box, mod)
return chain, fwd_vel
def get_velocity(fwd: borg.forward.BaseForwardModel, fwd_vel: borg.forward.BaseForwardModel, s_hat: np.ndarray):
N = s_hat.shape[0]
output_density = np.zeros((N,N,N))
fwd.forwardModel_v2(s_hat)
myprint("getting density")
fwd.getDensityFinal(output_density)
myprint("getting velocity")
output_velocity = fwd_vel.getVelocityField()
return output_velocity
# Input box
box_in = borg.forward.BoxModel()
box_in.L = (500.0, 500.0, 500.0)
box_in.N = (64, 64, 64)
box_in.xmin = (-250.0, -250.0, -250.0)
# Make some initial conditions
s_hat = np.fft.rfftn(np.random.randn(*box_in.N)) / box_in.Ntot ** (0.5)
s_real = np.fft.irfftn(s_hat, norm="ortho")
for gravity in ['lpt', '2lpt', 'pm', 'cola']:
print(f'\nGravity: {gravity}')
fwd, fwd_vel = build_gravity_model(box_in, gravity=gravity, velmodel_name='LinearModel')
v = get_velocity(fwd, fwd_vel, s_hat)
print('Linear', v.mean(), v.std(), v.shape)
np.save(f'vel_linear_{gravity}.npy', v)
fwd, fwd_vel = build_gravity_model(box_in, gravity=gravity, velmodel_name='CICModel')
v = get_velocity(fwd, fwd_vel, s_hat)
print('CIC', v.mean(), v.std(), v.shape)
np.save(f'vel_cic_{gravity}.npy', v)