import aquila_borg as borg import numpy as np # Output stream management cons = borg.console() myprint = lambda x: cons.print_std(x) if type(x) == str else cons.print_std(repr(x)) def build_gravity_model(box: borg.forward.BoxModel, gravity='lpt', velmodel_name='LinearModel') -> borg.forward.BaseForwardModel: """ Builds the gravity model and returns the forward model chain. Args: - box (borg.forward.BoxModel): The input box model. - gravity (str, default='lpt'): The gravity model to use. - velmodel_name (str, default='LinearModel'): The velocity model to use. Returns: - chain (borg.forward.BaseForwardModel): The forward model. - fwd_vel (borg.forward.BaseForwardModel): The forward model for velocities. """ myprint(f"Building gravity model {gravity}") ai = 0.05 af = 1.0 supersampling = 2 forcesampling = 4 rsmooth = 4. # Mpc/h nsteps = 20 # Setup forward model chain = borg.forward.ChainForwardModel(box) chain.addModel(borg.forward.models.HermiticEnforcer(box)) # CLASS transfer function chain @= borg.forward.model_lib.M_PRIMORDIAL_AS(box) transfer_class = borg.forward.model_lib.M_TRANSFER_CLASS(box, opts=dict(a_transfer=1.0)) transfer_class.setModelParams({"extra_class_arguments":{'YHe':'0.24'}}) chain @= transfer_class if gravity == 'linear': raise NotImplementedError(gravity) elif gravity == 'lpt': mod = borg.forward.model_lib.M_LPT_CIC( box, opts=dict(a_initial=af, a_final=af, do_rsd=False, supersampling=supersampling, lightcone=False, part_factor=1.01,)) elif gravity == '2lpt': mod = borg.forward.model_lib.M_2LPT_CIC( box, opts=dict(a_initial=af, a_final=af, do_rsd=False, supersampling=supersampling, lightcone=False, part_factor=1.01,)) elif gravity == 'pm': mod = borg.forward.model_lib.M_PM_CIC( box, opts=dict(a_initial=af, a_final=af, do_rsd=False, supersampling=supersampling, part_factor=1.01, forcesampling=forcesampling, pm_start_z=1/ai - 1, pm_nsteps=nsteps, tcola=False)) elif gravity == 'cola': mod = borg.forward.model_lib.M_PM_CIC( box, opts=dict(a_initial=af, a_final=af, do_rsd=False, supersampling=supersampling, part_factor=1.01, forcesampling=forcesampling, pm_start_z=1/ai - 1, pm_nsteps=nsteps, tcola=True)) else: raise NotImplementedError(gravity) mod.accumulateAdjoint(True) chain @= mod # Cosmological parameters cpar = borg.cosmo.CosmologicalParameters() cpar.default() chain.setCosmoParams(cpar) # This is the forward model for velocity velmodel = getattr(borg.forward.velocity, velmodel_name) if velmodel_name == 'LinearModel': fwd_vel = velmodel(box, mod, af) elif velmodel_name == 'CICModel': fwd_vel = velmodel(box, mod, rsmooth) else: fwd_vel = velmodel(box, mod) return chain, fwd_vel def get_velocity(fwd: borg.forward.BaseForwardModel, fwd_vel: borg.forward.BaseForwardModel, s_hat: np.ndarray): N = s_hat.shape[0] output_density = np.zeros((N,N,N)) fwd.forwardModel_v2(s_hat) myprint("getting density") fwd.getDensityFinal(output_density) myprint("getting velocity") output_velocity = fwd_vel.getVelocityField() return output_velocity # Input box box_in = borg.forward.BoxModel() box_in.L = (500.0, 500.0, 500.0) box_in.N = (64, 64, 64) box_in.xmin = (-250.0, -250.0, -250.0) # Make some initial conditions s_hat = np.fft.rfftn(np.random.randn(*box_in.N)) / box_in.Ntot ** (0.5) s_real = np.fft.irfftn(s_hat, norm="ortho") for gravity in ['lpt', '2lpt', 'pm', 'cola']: print(f'\nGravity: {gravity}') fwd, fwd_vel = build_gravity_model(box_in, gravity=gravity, velmodel_name='LinearModel') v = get_velocity(fwd, fwd_vel, s_hat) print('Linear', v.mean(), v.std(), v.shape) np.save(f'vel_linear_{gravity}.npy', v) fwd, fwd_vel = build_gravity_model(box_in, gravity=gravity, velmodel_name='CICModel') v = get_velocity(fwd, fwd_vel, s_hat) print('CIC', v.mean(), v.std(), v.shape) np.save(f'vel_cic_{gravity}.npy', v)