Add simple distance flow model (#114)

* Add imports

* Add field LOS paths

* Add basic flow model

* Edit script

* Add nb

* Add nb

* Update nb

* Add some docs

* Add RA reading

* Add imoprts

* Updates to the flow model

* Update script

* Bring back A2

* Update imports

* Update imports

* Add Carrick to ICs

* Add Carrick boxsize

* Add Carrick and fix minor bugs

* Add Carrick box

* Update script

* Edit imports

* Add fixed flow!

* Update omega_m and add it

* Update nb

* Update nb

* Update nb

* Remove old print statements

* Update params

* Add thinning of chains

* Add import

* Add flow validation script

* Add submit script

* Add ksmooth

* Update nb

* Update params

* Update script

* Update string

* Move where distributions are defined

* Add density bias parameter

* Add lognorm mean

* Update scripts

* Update script
This commit is contained in:
Richard Stiskalek 2024-03-08 10:44:19 +00:00 committed by GitHub
parent fb93f85543
commit a65e3cb15b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 1762 additions and 60 deletions

View file

@ -20,22 +20,24 @@ from datetime import datetime
from gc import collect
from os import makedirs, remove, rmdir
from os.path import exists, join
from warnings import warn
import csiborgtools
import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
from h5py import File
from mpi4py import MPI
from taskmaster import work_delegation
from utils import get_nsims
###############################################################################
# I/O functions #
###############################################################################
def get_los(catalogue_name, comm):
def get_los(catalogue_name, simname, comm):
"""
Get the line of sight RA/dec coordinates for the given catalogue.
@ -43,6 +45,10 @@ def get_los(catalogue_name, comm):
----------
catalogue_name : str
Catalogue name.
simname : str
Simulation name.
comm : mpi4py.MPI.Comm
MPI communicator.
Returns
-------
@ -50,16 +56,42 @@ def get_los(catalogue_name, comm):
RA/dec coordinates of the line of sight.
"""
if comm.Get_rank() == 0:
pv_supranta_folder = "/mnt/extraspace/rstiskalek/catalogs/PV_Supranta"
folder = "/mnt/extraspace/rstiskalek/catalogs"
if catalogue_name == "A2":
with File(join(pv_supranta_folder, "A2.h5"), 'r') as f:
if catalogue_name == "LOSS" or catalogue_name == "Foundation":
fpath = join(folder, "PV_compilation_Supranta2019.hdf5")
with File(fpath, 'r') as f:
grp = f[catalogue_name]
RA = grp["RA"][:]
dec = grp["DEC"][:]
elif catalogue_name == "A2":
fpath = join(folder, "A2.h5")
with File(fpath, 'r') as f:
RA = f["RA"][:]
dec = f["DEC"][:]
elif "csiborg1" in catalogue_name:
nsim = int(catalogue_name.split("_")[-1])
cat = csiborgtools.read.CSiBORG1Catalogue(
nsim, bounds={"totmass": (1e13, None)})
seed = 42
gen = np.random.default_rng(seed)
mask = gen.choice(len(cat), size=100, replace=False)
sph_pos = cat["spherical_pos"]
RA = sph_pos[mask, 1]
dec = sph_pos[mask, 2]
else:
raise ValueError(f"Unknown field name: `{catalogue_name}`.")
pos = np.vstack((RA, dec)).T
# The Carrick+2015 is in galactic coordinates, so we need to convert
# the RA/dec to galactic coordinates.
if simname == "Carrick2015":
c = SkyCoord(ra=RA*u.degree, dec=dec*u.degree, frame='icrs')
pos = np.vstack((c.galactic.l, c.galactic.b)).T
else:
pos = np.vstack((RA, dec)).T
else:
pos = None
@ -90,6 +122,17 @@ def get_field(simname, nsim, kind, MAS, grid):
# Open the field reader.
if simname == "csiborg1":
field_reader = csiborgtools.read.CSiBORG1Field(nsim)
elif simname == "Carrick2015":
folder = "/mnt/extraspace/rstiskalek/catalogs"
warn(f"Using local paths from `{folder}`.", RuntimeWarning)
if kind == "density":
fpath = join(folder, "twompp_density_carrick2015.npy")
return np.load(fpath).astype(np.float32)
elif kind == "velocity":
fpath = join(folder, "twompp_velocity_carrick2015.npy")
return np.load(fpath).astype(np.float32)
else:
raise ValueError(f"Unknown field kind: `{kind}`.")
else:
raise ValueError(f"Unknown simulation name: `{simname}`.")
@ -230,8 +273,8 @@ if __name__ == "__main__":
args = parser.parse_args()
rmax = 200
dr = 0.1
smooth_scales = None
dr = 0.5
smooth_scales = [0, 2, 4, 6]
comm = MPI.COMM_WORLD
paths = csiborgtools.read.Paths(**csiborgtools.paths_glamdring)
@ -248,8 +291,8 @@ if __name__ == "__main__":
dump_folder = None
dump_folder = comm.bcast(dump_folder, root=0)
# Get the line of sight RA/dec coordinates.
pos = get_los(args.catalogue, comm)
# Get the line of sight sky coordinates.
pos = get_los(args.catalogue, args.simname, comm)
def main(nsim):
interpolate_field(pos, args.simname, nsim, args.MAS, args.grid,

View file

@ -1,4 +1,4 @@
nthreads=1
nthreads=11
memory=64
on_login=${1}
queue="berg"
@ -6,7 +6,8 @@ env="/mnt/users/rstiskalek/csiborgtools/venv_csiborg/bin/python"
file="field_los.py"
catalogue="A2"
nsims="7444"
# catalogue="csiborg1_9844"
nsims="-1"
simname="csiborg1"
MAS="SPH"
grid=1024

View file

@ -134,8 +134,6 @@ def open_galaxy_positions(survey_name, comm, scatter=None):
if scatter < 0:
raise ValueError("Scatter must be positive.")
if scatter > 0:
print(f"Adding scatter of {scatter} Mpc / h.",
flush=True)
pos = scatter_along_radial_direction(pos, scatter,
boxsize)
@ -186,7 +184,6 @@ def evaluate_field(field, pos, boxsize, smooth_scales, verbose=True):
field, scale * mpc2box, boxsize=1, make_copy=True)
else:
field_smoothed = numpy.copy(field)
print("Going to evaluate the field....")
val[:, i] = csiborgtools.field.evaluate_sky(
field_smoothed, pos=pos, mpc2box=mpc2box)

View file

@ -1,5 +1,5 @@
nthreads=1
memory=32
nthreads=11
memory=64
on_login=${1}
queue="berg"
env="/mnt/zfsusers/rstiskalek/csiborgtools/venv_csiborg/bin/python"
@ -7,11 +7,11 @@ file="field_sample.py"
nsims="-1"
simname="TNG300-1"
survey="TNG300-1"
simname="csiborg1"
survey="SDSS"
smooth_scales="0 2 4 8 16"
kind="density"
MAS="PCS"
MAS="SPH"
grid=1024
scatter=0

211
scripts/flow_validation.py Normal file
View file

@ -0,0 +1,211 @@
# Copyright (C) 2024 Richard Stiskalek
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
Script to run the PV validation model on various catalogues and simulations.
The script is MPI parallelized over the IC realizations.
"""
from argparse import ArgumentParser
from datetime import datetime
from os import makedirs, remove, rmdir
from os.path import exists, join
import csiborgtools
import jax
import numpy as np
from h5py import File
from mpi4py import MPI
from numpyro.infer import MCMC, NUTS
from taskmaster import work_delegation # noqa
def get_model(args, nsim):
"""
Load the data and create the NumPyro model.
Parameters
----------
args : argparse.Namespace
Command line arguments.
nsim : int
Simulation index.
Returns
-------
numpyro.Primitive
"""
folder = "/mnt/extraspace/rstiskalek/catalogs/"
if args.catalogue == "A2":
fpath = join(folder, "A2.h5")
elif args.catalogue == "LOSS" or args.catalogue == "Foundation":
raise NotImplementedError("To be implemented..")
else:
raise ValueError(f"Unknown catalogue: `{args.catalogue}`.")
loader = csiborgtools.flow.DataLoader(args.simname, args.catalogue, fpath,
paths, ksmooth=args.ksmooth)
Omega_m = csiborgtools.simname2Omega_m(args.simname)
# Read in the data from the loader.
los_overdensity = loader.los_density[:, nsim, :]
los_velocity = loader.los_radial_velocity[:, nsim, :]
RA = loader.cat["RA"]
dec = loader.cat["DEC"]
z_obs = loader.cat["z_obs"]
r_hMpc = loader.cat["r_hMpc"]
e_r_hMpc = loader.cat["e_rhMpc"]
return csiborgtools.flow.SD_PV_validation_model(
los_overdensity, los_velocity, RA, dec, z_obs, r_hMpc, e_r_hMpc,
loader.rdist, Omega_m)
def run_model(model, nsteps, nchains, nsim, dump_folder, show_progress=True):
"""
Run the NumPyro model and save the thinned samples to a temporary file.
Parameters
----------
model : jax.numpyro.Primitive
Model to be run.
nsteps : int
Number of steps.
nchains : int
Number of chains.
nsim : int
Simulation index.
dump_folder : str
Folder where the temporary files are stored.
show_progress : bool
Whether to show the progress bar.
Returns
-------
None
"""
nuts_kernel = NUTS(model)
mcmc = MCMC(nuts_kernel, num_warmup=nsteps // 2, num_samples=nsteps // 2,
chain_method="sequential", num_chains=nchains,
progress_bar=show_progress)
rng_key = jax.random.PRNGKey(42)
mcmc.run(rng_key)
if show_progress:
print(f"Summary of the MCMC run of simulation indexed {nsim}:")
mcmc.print_summary()
samples = mcmc.get_samples()
thinned_samples = csiborgtools.thin_samples_by_acl(samples)
# Save the samples to the temporary folder.
fname = join(dump_folder, f"samples_{nsim}.npz")
np.savez(fname, **thinned_samples)
def combine_from_simulations(catalogue_name, simname, nsims, outfolder,
dumpfolder, ksmooth):
"""
Combine the results from individual simulations into a single file.
Parameters
----------
catalogue_name : str
Catalogue name.
simname : str
Simulation name.
nsims : list
List of IC realisations.
outfolder : str
Output folder.
dumpfolder : str
Dumping folder where the temporary files are stored.
ksmooth : int
Smoothing index.
Returns
-------
None
"""
fname_out = join(
outfolder,
f"flow_samples_{catalogue_name}_{simname}_smooth_{ksmooth}.hdf5")
print(f"Combining results from invidivual simulations to `{fname_out}`.")
if exists(fname_out):
remove(fname_out)
for nsim in nsims:
fname = join(dumpfolder, f"samples_{nsim}.npz")
data = np.load(fname)
with File(fname_out, 'a') as f:
grp = f.create_group(f"sim_{nsim}")
for key in data.files:
grp.create_dataset(key, data=data[key])
# Remove the temporary file.
remove(fname)
# Remove the dumping folder.
rmdir(dumpfolder)
print("Finished combining results.")
###############################################################################
# Command line interface #
###############################################################################
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("--simname", type=str, required=True,
help="Simulation name.")
parser.add_argument("--catalogue", type=str, required=True,
help="PV catalogue.")
parser.add_argument("--ksmooth", type=int, required=True,
help="Smoothing index.")
args = parser.parse_args()
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
out_folder = "/mnt/extraspace/rstiskalek/csiborg_postprocessing/peculiar_velocity" # noqa
paths = csiborgtools.read.Paths(**csiborgtools.paths_glamdring)
nsims = paths.get_ics(args.simname)
nsteps = 5000
nchains = 1
# Create the dumping folder.
if comm.Get_rank() == 0:
dump_folder = join(out_folder,
f"temp_{str(datetime.now())}".replace(" ", "_"))
print(f"Creating folder `{dump_folder}`.")
makedirs(dump_folder)
else:
dump_folder = None
dump_folder = comm.bcast(dump_folder, root=0)
def main(nsim):
model = get_model(args, nsim)
run_model(model, nsteps, nchains, nsim, dump_folder,
show_progress=size == 1)
work_delegation(main, nsims, comm, master_verbose=True)
comm.Barrier()
if rank == 0:
combine_from_simulations(args.catalogue, args.simname, nsims,
out_folder, dump_folder, args.ksmooth)

23
scripts/flow_validation.sh Executable file
View file

@ -0,0 +1,23 @@
memory=4
on_login=${1}
nthreads=${2}
queue="berg"
env="/mnt/users/rstiskalek/csiborgtools/venv_csiborg/bin/python"
file="flow_validation.py"
catalogue="A2"
simname="Carrick2015"
ksmooth=2
pythoncm="$env $file --catalogue $catalogue --simname $simname --ksmooth $ksmooth"
if [ $on_login -eq 1 ]; then
echo $pythoncm
$pythoncm
else
cm="addqueue -q $queue -n $nthreads -m $memory $pythoncm"
echo "Submitting:"
echo $cm
echo
eval $cm
fi