* Switch to CB2

* Update for extrapolation

* Add 'nan' extrapolation

* Update nb

* Update submits

* Add Rmax to the models

* Update nb

* Add print statement

* Update script settings

* Update flow model to new method

* Update printing

* Update path

* Update so that it works

* Update nb

* Update submit

* Add Rmin for hollow bulk flows

* Update script

* Update script

* Update scripts back

* Update scripts back

* Fix normalization bug

* Update script

* pep8
This commit is contained in:
Richard Stiskalek 2024-07-25 11:48:37 +01:00 committed by GitHub
parent 73ffffb826
commit 8d49aa071b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 379 additions and 198 deletions

View file

@ -26,10 +26,11 @@ import csiborgtools
import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.io import fits
from h5py import File
from mpi4py import MPI
from numba import jit
from taskmaster import work_delegation # noqa
from astropy.io import fits
from utils import get_nsims
@ -79,7 +80,7 @@ def get_los(catalogue_name, simname, comm):
RA = f["RA"][:]
dec = f["DEC"][:]
elif catalogue_name == "UPGLADE":
fname = "/mnt/users/rstiskalek/csiborgtools/data/upglade_z_0p05_all_PROCESSED.h5" # noqa
fname = "/mnt/users/rstiskalek/csiborgtools/data/upglade_all_z0p05_new_PROCESSED.h5" # noqa
with File(fname, 'r') as f:
RA = f["RA"][:]
dec = f["DEC"][:]
@ -242,6 +243,7 @@ def combine_from_simulations(catalogue_name, simname, nsims, outfolder,
f_out.create_dataset(f"rdist_{nsim}", data=f["rdist"][:])
f_out.create_dataset(f"density_{nsim}", data=f["density"][:])
f_out.create_dataset(f"velocity_{nsim}", data=f["velocity"][:])
f_out.create_dataset(f"rmax_{nsim}", data=f["rmax"][:])
# Remove the temporary file.
remove(fname)
@ -256,6 +258,30 @@ def combine_from_simulations(catalogue_name, simname, nsims, outfolder,
# Main interpolating function #
###############################################################################
@jit(nopython=True)
def find_index_of_first_nan(y):
for n in range(1, len(y)):
if np.isnan(y[n]):
return n
return None
def replace_nan_with_last_finite(x, y, apply_decay):
n = find_index_of_first_nan(y)
if n is None:
return y, x[-1]
y[n:] = y[n-1]
rmax = x[n-1]
if apply_decay:
# Optionally aply 1 / r decay
y[n:] *= rmax / x[n:]
return y, rmax
def interpolate_field(pos, simname, nsim, MAS, grid, dump_folder, rmax,
dr, smooth_scales, verbose=False):
@ -300,6 +326,14 @@ def interpolate_field(pos, simname, nsim, MAS, grid, dump_folder, rmax,
smooth_scales=smooth_scales, verbose=verbose,
interpolation_method="linear")
rmax_density = np.full((len(pos), len(smooth_scales)), np.nan)
for i in range(len(pos)):
for j in range(len(smooth_scales)):
y, current_rmax = replace_nan_with_last_finite(rdist, finterp[i, :, j], False) # noqa
finterp[i, :, j] = y
if current_rmax is not None:
rmax_density[i, j] = current_rmax
print(f"Writing temporary file `{fname_out}`.")
with File(fname_out, 'w') as f:
f.create_dataset("rdist", data=rdist)
@ -318,8 +352,20 @@ def interpolate_field(pos, simname, nsim, MAS, grid, dump_folder, rmax,
smooth_scales=smooth_scales, verbose=verbose,
interpolation_method="linear")
rmax_velocity = np.full((3, len(pos), len(smooth_scales)), np.nan)
for k in range(3):
for i in range(len(pos)):
for j in range(len(smooth_scales)):
y, current_rmax = replace_nan_with_last_finite(rdist, finterp[k][i, :, j], True) # noqa
finterp[k][i, :, j] = y
if current_rmax is not None:
rmax_velocity[k, i, j] = current_rmax
rmax_velocity = np.min(rmax_velocity, axis=0)
rmax = np.minimum(rmax_density, rmax_velocity)
with File(fname_out, 'a') as f:
f.create_dataset("velocity", data=finterp)
f.create_dataset("rmax", data=rmax)
###############################################################################
@ -339,8 +385,8 @@ if __name__ == "__main__":
parser.add_argument("--grid", type=int, help="Grid resolution.")
args = parser.parse_args()
rmax = 200
dr = 0.25
rmax = 300
dr = 0.5
smooth_scales = [0]
comm = MPI.COMM_WORLD

View file

@ -11,8 +11,7 @@ MAS="SPH"
grid=1024
# for catalogue in "LOSS" "Foundation" "Pantheon+" "2MTF" "SFI_gals"; do
for catalogue in "LOSS"; do
for catalogue in "UPGLADE"; do
# for catalogue in "Foundation"; do
pythoncm="$env $file --catalogue $catalogue --nsims $nsims --simname $simname --MAS $MAS --grid $grid"
if [ $on_login -eq 1 ]; then

View file

@ -100,6 +100,8 @@ def get_model(paths, get_model_kwargs, verbose=True):
ARGS.catalogue, fpath, paths,
ksmooth=ARGS.ksmooth)
print(f"\n{'Num. radial steps':<20} {len(loader.rdist)}\n", flush=True)
return csiborgtools.flow.get_model(loader, **get_model_kwargs)
@ -227,8 +229,6 @@ if __name__ == "__main__":
nsteps = 5000
nburn = 1000
zcmb_max = 0.06
sample_alpha = True
sample_beta = True
calculate_evidence = False
nchains_harmonic = 10
num_epochs = 30
@ -237,7 +237,6 @@ if __name__ == "__main__":
raise ValueError("The number of steps must be divisible by the number of chains.") # noqa
main_params = {"nsteps": nsteps, "nburn": nburn, "zcmb_max": zcmb_max,
"sample_alpha": sample_alpha, "sample_beta": sample_beta,
"calculate_evidence": calculate_evidence,
"nchains_harmonic": nchains_harmonic,
"num_epochs": num_epochs}
@ -247,10 +246,11 @@ if __name__ == "__main__":
"Vmono_min": -1000, "Vmono_max": 1000,
"alpha_min": -1.0, "alpha_max": 3.0,
"beta_min": -1.0, "beta_max": 3.0,
"sigma_v_min": 5.0, "sigma_v_max": 750.,
"sample_Vmono": True,
"sample_alpha": sample_alpha,
"sample_beta": sample_beta,
"sigma_v_min": 1.0, "sigma_v_max": 750.,
"sample_Vmono": False,
"sample_alpha": False,
"sample_beta": True,
"sample_sigma_v_ext": False,
}
print_variables(
calibration_hyperparams.keys(), calibration_hyperparams.values())
@ -280,5 +280,6 @@ if __name__ == "__main__":
get_model_kwargs = {"zcmb_max": zcmb_max}
model = get_model(paths, get_model_kwargs, )
run_model(model, nsteps, nburn, model_kwargs, out_folder, sample_beta,
calculate_evidence, nchains_harmonic, num_epochs, kwargs_print)
run_model(model, nsteps, nburn, model_kwargs, out_folder,
calibration_hyperparams["sample_beta"], calculate_evidence,
nchains_harmonic, num_epochs, kwargs_print)

View file

@ -19,9 +19,9 @@ fi
# Submit a job for each combination of simname, catalogue, ksim
# for simname in "Lilow2024" "CF4" "CF4gp" "csiborg1" "csiborg2_main" "csiborg2X"; do
for simname in "csiborg2X"; do
for simname in "Carrick2015"; do
# for simname in "csiborg1" "csiborg2_main" "csiborg2X"; do
for catalogue in "Pantheon+"; do
for catalogue in "Pantheon+_zSN"; do
# for catalogue in "2MTF"; do
# for ksim in 0 1 2; do
# for ksim in 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20; do

View file

@ -21,19 +21,21 @@ from tqdm import tqdm
if __name__ == "__main__":
paths = csiborgtools.read.Paths(**csiborgtools.paths_glamdring)
simname = "csiborg1"
simname = "csiborg2_random"
nsims = paths.get_ics(simname)
print(f"Number of simulations: {nsims}.")
fname_out = f"/mnt/users/rstiskalek/csiborgtools/data/halos_{simname}.hdf5"
fname_out = f"/mnt/users/rstiskalek/csiborgtools/data/random_halos_{simname}.hdf5" # noqa
print(f"Writing to `{fname_out}`.")
with File(fname_out, 'w') as f:
for nsim in tqdm(nsims, desc="Simulations"):
grp = f.create_group(f"sim_{nsim}")
cat = csiborgtools.read.CSiBORG1Catalogue(nsim, paths)
# cat = csiborgtools.read.CSiBORG1Catalogue(nsim, paths)
cat = csiborgtools.read.CSiBORG2Catalogue(
nsim, 99, "random", paths, )
grp["pos"] = cat["cartesian_pos"]
grp["totmass"] = cat["totmass"]

View file

@ -23,6 +23,7 @@ from os.path import join
import csiborgtools
import numpy as np
from csiborgtools import fprint
from h5py import File
from mpi4py import MPI
from taskmaster import work_delegation # noqa
@ -35,27 +36,35 @@ def t():
return datetime.now().strftime("%H:%M:%S")
def load_calibration(catalogue, simname, nsim, ksmooth, verbose=False):
def load_calibration(catalogue, simname, ksmooth, sample_beta,
verbose=False):
"""Load the pre-computed calibration samples."""
fname = f"/mnt/extraspace/rstiskalek/csiborg_postprocessing/peculiar_velocity/flow_samples_{catalogue}_{simname}_smooth_{ksmooth}.hdf5" # noqa
keys = ["Vext_x", "Vext_y", "Vext_z", "alpha", "beta", "sigma_v"]
fname = f"/mnt/extraspace/rstiskalek/csiborg_postprocessing/peculiar_velocity/samples_{simname}_{catalogue}_ksmooth{ksmooth}.hdf5" # noqa
if sample_beta:
fname = fname.replace(".hdf5", "_sample_beta.hdf5")
keys = ["Vext", "sigma_v", "alpha", "beta"]
calibration_samples = {}
with File(fname, 'r') as f:
for key in keys:
for n, key in enumerate(keys):
# In case alpha wasn't sampled just set to 1
if key == "alpha" and "alpha" not in f["samples"].keys():
calibration_samples[key] = np.full_like(
calibration_samples["sigma_v"], 1.0)
continue
# NOTE: here the posterior samples are down-sampled
calibration_samples[key] = f[f"sim_{nsim}/{key}"][:][::10]
calibration_samples[key] = f[f"samples/{key}"][:][::10]
if verbose:
k = list(calibration_samples.keys())[0]
nsamples = len(calibration_samples[k])
print(f"{t()}: found {nsamples} calibration posterior samples.",
flush=True)
if n == 0:
num_samples_original = len(f[f"samples/{key}"])
num_samples_final = len(calibration_samples[key])
fprint(f"downsampling calibration samples from {num_samples_original} to {num_samples_final}.", verbose=verbose) # noqa
return calibration_samples
def main(loader, model, indxs, fdir, fname, num_split, verbose):
def main(loader, nsim, model, indxs, fdir, fname, num_split, verbose):
out = np.full(
len(indxs), np.nan,
dtype=[("mean_zcosmo", float), ("std_zcosmo", float)])
@ -65,7 +74,7 @@ def main(loader, model, indxs, fdir, fname, num_split, verbose):
disable=not verbose)):
x, y = model.posterior_zcosmo(
loader.cat["zcmb"][n], loader.cat["RA"][n], loader.cat["DEC"][n],
loader.los_density[n], loader.los_radial_velocity[n],
loader.los_density[nsim, n], loader.los_radial_velocity[nsim, n],
extra_sigma_v=loader.cat["e_zcmb"][n] * SPEED_OF_LIGHT,
verbose=False)
@ -98,7 +107,7 @@ if __name__ == "__main__":
# Galaxy sample parameters
catalogue = "UPGLADE"
fpath_data = "/mnt/users/rstiskalek/csiborgtools/data/upglade_z_0p05_all_PROCESSED.h5" # noqa
fpath_data = "/mnt/users/rstiskalek/csiborgtools/data/upglade_all_z0p05_new_PROCESSED.h5" # noqa
# Number of splits for MPI
nsplits = 1000
@ -112,12 +121,14 @@ if __name__ == "__main__":
simname, nsim, catalogue, fpath_data, paths, ksmooth=ksmooth,
verbose=rank == 0)
calibration_samples = load_calibration(
catalogue_calibration, simname, nsim, ksmooth, verbose=rank == 0)
catalogue_calibration, simname, ksmooth, sample_beta=True,
verbose=rank == 0)
model = csiborgtools.flow.Observed2CosmologicalRedshift(
calibration_samples, loader.rdist, loader._Omega_m)
if rank == 0:
print(f"{t()}: the catalogue size is {loader.cat['zcmb'].size}.")
print(f"{t()}: loaded calibration samples and model.", flush=True)
fprint(f"catalogue size is {loader.cat['zcmb'].size}.", verbose=rank == 0)
fprint("loaded calibration samples and model.", verbose=rank == 0)
# Decide how to split up the job
if rank == 0:
@ -131,7 +142,8 @@ if __name__ == "__main__":
# Process all splits with MPI, the rank 0 delegates the jobs.
def main_wrapper(n):
main(loader, model, split_indxs[n], fdir, fname, n, verbose=size == 1)
main(loader, nsim, model, split_indxs[n], fdir, fname, n,
verbose=size == 1)
comm.Barrier()
work_delegation(

View file

@ -1,6 +1,6 @@
nthreads=${1}
on_login=${2}
memory=4
memory=12
queue="redwood"
env="/mnt/zfsusers/rstiskalek/csiborgtools/venv_csiborg/bin/python"
file="post_upglade.py"

View file

@ -15,6 +15,9 @@
"""
A script to calculate the bulk flow in Quijote simulations from either
particles or FoF haloes and to also save the resulting smaller halo catalogues.
If `Rmin > 0` the bulk flows computed from projected radial velocities are
wrong, but the 3D volume average bulk flows are still correct.
"""
from datetime import datetime
from os.path import join
@ -70,7 +73,7 @@ def volume_bulk_flow(rdist, mass, vel, distances):
###############################################################################
def main(nsim, folder, fname_basis, Rmax, subtract_observer_velocity,
def main(nsim, folder, fname_basis, Rmin, Rmax, subtract_observer_velocity,
verbose=True):
boxsize = csiborgtools.simname2boxsize("quijote")
observers = csiborgtools.read.fiducial_observers(boxsize, Rmax)
@ -100,6 +103,11 @@ def main(nsim, folder, fname_basis, Rmax, subtract_observer_velocity,
return_distance=True, sort_results=True)
rdist_part, indxs = rdist_part[0], indxs[0]
# And only the ones that are above Rmin
mask = rdist_part > Rmin
rdist_part = rdist_part[mask]
indxs = indxs[mask]
part_pos_current = part_pos[indxs] - observers[i]
part_vel_current = part_vel[indxs]
# Quijote particle masses are all equal
@ -110,13 +118,16 @@ def main(nsim, folder, fname_basis, Rmax, subtract_observer_velocity,
np.asarray(observers[i]).reshape(1, -1), Rmax,
return_distance=True, sort_results=True)
rdist_halo, indxs = rdist_halo[0], indxs[0]
mask = rdist_halo > Rmin
rdist_halo = rdist_halo[mask]
indxs = indxs[mask]
halo_pos_current = halo_pos[indxs] - observers[i]
halo_vel_current = halo_vel[indxs]
halo_mass_current = halo_mass[indxs]
# Subtract the observer velocity
rscale = 0.5 # Mpc / h
rscale = 2.0 # Mpc / h
weights = np.exp(-0.5 * (rdist_part / rscale)**2)
obs_vel_x = np.average(part_vel_current[:, 0], weights=weights)
obs_vel_y = np.average(part_vel_current[:, 1], weights=weights)
@ -183,6 +194,7 @@ def main(nsim, folder, fname_basis, Rmax, subtract_observer_velocity,
if __name__ == "__main__":
Rmin = 0
Rmax = 150
subtract_observer_velocity = True
folder = "/mnt/extraspace/rstiskalek/quijote/BulkFlow_fiducial"
@ -195,7 +207,7 @@ if __name__ == "__main__":
nsims = list(paths.get_ics("quijote"))
def main_wrapper(nsim):
main(nsim, folder, fname_basis, Rmax, subtract_observer_velocity,
main(nsim, folder, fname_basis, Rmin, Rmax, subtract_observer_velocity,
verbose=rank == 0)
if rank == 0:

View file

@ -1,4 +1,4 @@
nthreads=20
nthreads=12
memory=24
on_login=0
queue="berg"