Add marginalization over boxes (#131)

* Parallelize over simulations

* Update docs

* Update dependency

* Update imports

* Add adtitional dependencies

* Update .gitignore

* Update ERADME

* Simplify numpyro GOF

* Speed up GOF

* Deepcopy samples

* Update scripts

* Add GPU acceleration

* Select boxes

* Update script

* Optionally sample beta

* Fix old code

* Simplify code

* Start saving log posterior

* Start popping log_likeliood

* Add imports

* Add converting samples

* Fix sctipt name

* Add evidence with harmonic

* Remove comment

* Update imports

* Update imports so that pylians not required

* Stop requiring Pylians to be installed

* Update submission scripts for loops

* Update nb

* Update nb

* Add Manticore boxes

* Add verbosity flag

* Add bulk flow

* Update script

* Update nb

* Update normalization

* Update submit

* Update nb
This commit is contained in:
Richard Stiskalek 2024-06-26 10:43:26 +01:00 committed by GitHub
parent ffaf92cd4b
commit ce55a2b47e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 1436 additions and 1290 deletions

1
.gitignore vendored
View file

@ -1,6 +1,7 @@
# Python virtual environments
venv/
venv_csiborg/
venv_gpu_csiborgtools/
# Compiled Python files
*.pyc

View file

@ -2,6 +2,11 @@
Tools for analysing the suite of Constrained Simulations in BORG (CSiBORG) simulations. The interface is designed to work with the following suites of simulations: *CSiBORG1* (dark matter-only RAMSES), *CSiBORG2* (dark matter-only Gadget4), *Quijote* (dark-matter only Gadget2), however with little effort it can support other simulations as well.
## TODO
- [ ] In flow models test in a script that indeed the parallelization is working.
- [ ] Extend the parallelization to supernovae/simple distances.
## Ongoing projects
### Data to calculate

View file

@ -20,8 +20,9 @@ from .utils import (center_of_mass, delta2ncells, number_counts,
binned_statistic, cosine_similarity, fprint, # noqa
hms_to_degrees, dms_to_degrees, great_circle_distance, # noqa
radec_to_cartesian, cartesian_to_radec, # noqa
thin_samples_by_acl, numpyro_gof, radec_to_galactic, # noqa
heliocentric_to_cmb, calculate_acl) # noqa
thin_samples_by_acl, BIC_AIC, radec_to_galactic, # noqa
heliocentric_to_cmb, calculate_acl, harmonic_evidence, # noqa
dict_samples_to_array) # noqa
from .params import (paths_glamdring, simname2boxsize, simname2Omega_m, # noqa
snap2redshift) # noqa

View file

@ -12,16 +12,15 @@
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from warnings import warn
from warnings import warn # noqa
from csiborgtools.clustering.knn import kNN_1DCDF # noqa
from csiborgtools.clustering.utils import (BaseRVS, RVSinbox, # noqa
RVSinsphere, RVSonsphere,
normalised_marks)
from csiborgtools.clustering.knn import kNN_1DCDF # noqa
from csiborgtools.clustering.utils import ( # noqa
BaseRVS, RVSinbox, RVSinsphere, RVSonsphere, normalised_marks) # noqa
try:
import Corrfunc # noqa
from .tpcf import Mock2PCF # noqa
import Corrfunc # noqa
from .tpcf import Mock2PCF # noqa
except ImportError:
warn("`Corrfunc` not installed. 2PCF modules will not be available .") # noqa
warn("`Corrfunc` not installed. 2PCF modules will not be available.",
UserWarning) # noqa

View file

@ -12,15 +12,25 @@
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from .density import (DensityField, PotentialField, TidalTensorField, # noqa
VelocityField, radial_velocity, power_spectrum, # noqa
overdensity_field) # noqa
try:
import MAS_library as MASL # noqa
import Pk_library as PKL # noqa
from .density import (DensityField, PotentialField, TidalTensorField, # noqa
VelocityField, radial_velocity, power_spectrum, # noqa
overdensity_field) # noqa
from .interp import (evaluate_cartesian_cic, evaluate_sky, evaluate_los, # noqa
field2rsp, fill_outside, make_sky, # noqa
observer_peculiar_velocity, smoothen_field, # noqa
field_at_distance) # noqa
except ImportError:
from warnings import warn
warn("`MAS_library` and `Pk_library` not installed. `density` and "
"`interp` related modules are not available. "
"Please install `Pylians`.", UserWarning)
from .enclosed_mass import (particles_enclosed_mass, # noqa
particles_enclosed_momentum, field_enclosed_mass, # noqa
bulkflow_peery2018) # noqa
from .interp import (evaluate_cartesian_cic, evaluate_sky, evaluate_los, # noqa
field2rsp, fill_outside, make_sky, # noqa
observer_peculiar_velocity, smoothen_field, # noqa
field_at_distance) # noqa
from .corr import bayesian_bootstrap_correlation # noqa
from .utils import nside2radec # noqa

View file

@ -102,7 +102,7 @@ def _field_enclosed_mass(field, rmax, boxsize):
return mass * cell_volume, volume * cell_volume
def field_enclosed_mass(field, distances, boxsize):
def field_enclosed_mass(field, distances, boxsize, verbose=True):
"""
Calculate the approximate enclosed mass within a given radius from a
density field, counts the mass in cells and volume of cells whose
@ -116,6 +116,8 @@ def field_enclosed_mass(field, distances, boxsize):
Radii to calculate the enclosed mass at in `Mpc / h`.
boxsize : float
Box size in `Mpc / h`.
verbose : bool
Verbosity flag.
Returns
-------
@ -127,7 +129,7 @@ def field_enclosed_mass(field, distances, boxsize):
enclosed_mass = np.zeros_like(distances)
enclosed_volume = np.zeros_like(distances)
for i, dist in enumerate(tqdm(distances)):
for i, dist in enumerate(tqdm(distances, disable=not verbose)):
enclosed_mass[i], enclosed_volume[i] = _field_enclosed_mass(
field, dist, boxsize)

View file

@ -14,9 +14,6 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from .flow_model import (DataLoader, radial_velocity_los, dist2redshift, # noqa
dist2distmodulus, predict_zobs, project_Vext, # noqa
SD_PV_validation_model, SN_PV_validation_model, # noqa
TF_PV_validation_model, radec_to_galactic, # noqa
sample_prior, make_loss, get_model, # noqa
optimize_model_with_jackknife, distmodulus2dist, # noqa
PV_validation_model, get_model, distmodulus2dist, # noqa
Observed2CosmologicalRedshift, # noqa
stack_pzosmo_over_realizations) # noqa

File diff suppressed because it is too large Load diff

View file

@ -26,8 +26,6 @@ import numpy as np
from astropy import units as u
from astropy.coordinates import SkyCoord
from numba import jit
from numpyro.infer import util
from scipy.stats import multivariate_normal
###############################################################################
# Positions #
@ -429,55 +427,127 @@ def thin_samples_by_acl(samples):
return thinned_samples
def numpyro_gof(model, mcmc, model_kwargs={}):
###############################################################################
# Model comparison #
###############################################################################
def BIC_AIC(samples, log_likelihood, ndata):
"""
Get the goodness-of-fit statistics for a sampled Numpyro model. Calculates
the BIC and AIC using the maximum likelihood sampled point and the log
evidence using the Laplace approximation.
Get the BIC/AIC of HMC samples from a Numpyro model.
Parameters
----------
model : numpyro model
The model to evaluate.
mcmc : numpyro MCMC
The MCMC object containing the samples.
ndata : int
The number of data points.
model_kwargs : dict, optional
Additional keyword arguments to pass to the model.
samples: dict
Dictionary of samples from the Numpyro MCMC object.
log_likelihood: numpy array
Log likelihood values of the samples.
ndata: int
Number of data points.
Returns
-------
gof : dict
Dictionary containing the BIC, AIC and logZ.
BIC, AIC: floats
"""
samples = mcmc.get_samples(group_by_chain=False)
log_likelihood = util.log_likelihood(model, samples, **model_kwargs)["ll"]
# Calculate the BIC using the maximum likelihood sampled point.
kmax = np.argmax(log_likelihood)
nparam = len(samples)
try:
ndata = model.ndata
except AttributeError as e:
raise AttributeError("The model must have an attribute `ndata` "
"indicating the number of data points.") from e
BIC = -2 * log_likelihood[kmax] + nparam * np.log(ndata)
# Calculate AIC
# How many parameters?
nparam = 0
for val in samples.values():
if val.ndim == 1:
nparam += 1
elif val.ndim == 2:
nparam += val.shape[-1]
else:
raise ValueError("Invalid dimensionality of samples to count the number of parameters.") # noqa
BIC = nparam * np.log(ndata) - 2 * log_likelihood[kmax]
AIC = 2 * nparam - 2 * log_likelihood[kmax]
# Calculate log(Z) using Laplace approximation.
X = np.vstack([samples[key] for key in samples.keys()]).T
mu, cov = multivariate_normal.fit(X)
test_sample = {key: mu[i] for i, key in enumerate(samples.keys())}
return float(BIC), float(AIC)
ll_mu = util.log_likelihood(model, test_sample, **model_kwargs)["ll"]
cov_det = np.linalg.det(cov)
D = len(mu)
logZ = ll_mu + 0.5 * np.log(cov_det) + D / 2 * np.log(2 * np.pi)
# Convert to float
out = {"BIC": BIC, "AIC": AIC, "logZ": logZ}
out = {key: float(val) for key, val in out.items()}
return out
def dict_samples_to_array(samples):
"""Convert a dictionary of samples to a 2-dimensional array."""
data = []
names = []
for key, value in samples.items():
if value.ndim == 1:
data.append(value)
names.append(key)
elif value.ndim == 2:
for i in range(value.shape[-1]):
data.append(value[:, i])
names.append(f"{key}_{i}")
else:
raise ValueError("Invalid dimensionality of samples to stack.")
return np.vstack(data).T, names
def harmonic_evidence(samples, log_posterior, temperature=0.8, epochs_num=20,
return_flow_samples=True, verbose=True):
"""
Calculate the evidence using the `harmonic` package. The model has a few
more hyperparameters that are set to defaults now.
Parameters
----------
samples: 3-dimensional array
MCMC samples of shape `(nchains, nsamples, ndim)`.
log_posterior: 2-dimensional array
Log posterior values of shape `(nchains, nsamples)`.
temperature: float, optional
Temperature of the `harmonic` model.
epochs_num: int, optional
Number of epochs for training the model.
return_flow_samples: bool, optional
Whether to return the flow samples.
verbose: bool, optional
Whether to print progress.
Returns
-------
ln_inv_evidence, err_ln_inv_evidence: float and tuple of floats
The log inverse evidence and its error.
flow_samples: 2-dimensional array, optional
Flow samples of shape `(nsamples, ndim)`. To check their agreement
with the input samples.
"""
try:
import harmonic as hm
except ImportError:
raise ImportError("The `harmonic` package is required to calculate the evidence.") from None # noqa
# Do some standard checks of inputs.
if samples.ndim != 3:
raise ValueError("The samples must be a 3-dimensional array of shape `(nchains, nsamples, ndim)`.") # noqa
if log_posterior.ndim != 2 and log_posterior.shape[:2] != samples.shape[:2]: # noqa
raise ValueError("The log posterior must be a 2-dimensional array of shape `(nchains, nsamples)`.") # noqa
ndim = samples.shape[-1]
chains = hm.Chains(ndim)
chains.add_chains_3d(samples, log_posterior)
chains_train, chains_infer = hm.utils.split_data(
chains, training_proportion=0.5)
# This has a few more hyperparameters that are set to defaults now.
model = hm.model.RQSplineModel(
ndim, standardize=True, temperature=temperature)
model.fit(chains_train.samples, epochs=epochs_num, verbose=verbose)
ev = hm.Evidence(chains_infer.nchains, model)
ev.add_chains(chains_infer)
ln_inv_evidence = ev.ln_evidence_inv
err_ln_inv_evidence = ev.compute_ln_inv_evidence_errors()
if return_flow_samples:
samples = samples.reshape((-1, ndim))
samp_num = samples.shape[0]
flow_samples = model.sample(samp_num)
return ln_inv_evidence, err_ln_inv_evidence, flow_samples
return ln_inv_evidence, err_ln_inv_evidence

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -25,7 +25,7 @@ from os.path import join
from gc import collect
import csiborgtools
import numpy
import numpy as np
from tqdm import tqdm
from datetime import datetime
@ -101,7 +101,7 @@ def get_particles(reader, boxsize, get_velocity=True, verbose=True):
pos = reader.coordinates()
dtype = pos.dtype
pos -= boxsize / 2
dist = numpy.linalg.norm(pos, axis=1).astype(dtype)
dist = np.linalg.norm(pos, axis=1).astype(dtype)
del pos
collect()
@ -116,7 +116,7 @@ def get_particles(reader, boxsize, get_velocity=True, verbose=True):
if verbose:
print(f"{t()}: sorting arrays.")
indxs = numpy.argsort(dist)
indxs = np.argsort(dist)
dist = dist[indxs]
mass = mass[indxs]
if get_velocity:
@ -140,10 +140,10 @@ def main_borg(args, folder):
paths = csiborgtools.read.Paths(**csiborgtools.paths_glamdring)
boxsize = csiborgtools.simname2boxsize(args.simname)
nsims = paths.get_ics(args.simname)
distances = numpy.linspace(0, boxsize / 2, 101)[1:]
distances = np.linspace(0, boxsize / 2, 101)[1:]
cumulative_mass = numpy.zeros((len(nsims), len(distances)))
cumulative_volume = numpy.zeros((len(nsims), len(distances)))
cumulative_mass = np.zeros((len(nsims), len(distances)))
cumulative_volume = np.zeros((len(nsims), len(distances)))
for i, nsim in enumerate(tqdm(nsims, desc="Simulations")):
if args.simname == "borg1":
reader = csiborgtools.read.BORG1Field(nsim)
@ -160,21 +160,21 @@ def main_borg(args, folder):
# Finally save the output
fname = f"enclosed_mass_{args.simname}.npz"
fname = join(folder, fname)
numpy.savez(fname, enclosed_mass=cumulative_mass, distances=distances,
enclosed_volume=cumulative_volume)
np.savez(fname, enclosed_mass=cumulative_mass, distances=distances,
enclosed_volume=cumulative_volume)
def main_csiborg(args, folder):
paths = csiborgtools.read.Paths(**csiborgtools.paths_glamdring)
boxsize = csiborgtools.simname2boxsize(args.simname)
nsims = paths.get_ics(args.simname)
distances = numpy.linspace(0, boxsize / 2, 501)[1:]
distances = np.linspace(0, boxsize / 2, 501)[1:]
# Initialize arrays to store the results
cumulative_mass = numpy.zeros((len(nsims), len(distances)))
mass135 = numpy.zeros(len(nsims))
masstot = numpy.zeros(len(nsims))
cumulative_velocity = numpy.zeros((len(nsims), len(distances), 3))
cumulative_mass = np.zeros((len(nsims), len(distances)))
mass135 = np.zeros(len(nsims))
masstot = np.zeros(len(nsims))
cumulative_velocity = np.zeros((len(nsims), len(distances), 3))
for i, nsim in enumerate(tqdm(nsims, desc="Simulations")):
reader = get_reader(args.simname, paths, nsim)
@ -185,7 +185,7 @@ def main_csiborg(args, folder):
rdist, mass, distances)
mass135[i] = csiborgtools.field.particles_enclosed_mass(
rdist, mass, [135])[0]
masstot[i] = numpy.sum(mass)
masstot[i] = np.sum(mass)
# Calculate velocities
cumulative_velocity[i, ...] = csiborgtools.field.particles_enclosed_momentum( # noqa
@ -196,19 +196,61 @@ def main_csiborg(args, folder):
# Finally save the output
fname = f"enclosed_mass_{args.simname}.npz"
fname = join(folder, fname)
numpy.savez(fname, enclosed_mass=cumulative_mass, mass135=mass135,
masstot=masstot, distances=distances,
cumulative_velocity=cumulative_velocity)
np.savez(fname, enclosed_mass=cumulative_mass, mass135=mass135,
masstot=masstot, distances=distances,
cumulative_velocity=cumulative_velocity)
def main_csiborg2X(args, folder):
"""Bulk flow in the Manticore boxes provided by Stuart."""
paths = csiborgtools.read.Paths(**csiborgtools.paths_glamdring)
boxsize = csiborgtools.simname2boxsize(args.simname)
nsims = paths.get_ics(args.simname)
distances = np.linspace(0, boxsize / 2, 101)[1:]
cumulative_mass = np.zeros((len(nsims), len(distances)))
cumulative_volume = np.zeros((len(nsims), len(distances)))
cumulative_vel_x = np.zeros((len(nsims), len(distances)))
cumulative_vel_y = np.zeros_like(cumulative_vel_x)
cumulative_vel_z = np.zeros_like(cumulative_vel_x)
for i, nsim in enumerate(tqdm(nsims, desc="Simulations")):
reader = csiborgtools.read.CSiBORG2XField(nsim, paths)
density_field = reader.density_field()
velocity_field = reader.velocity_field()
cumulative_mass[i, :], cumulative_volume[i, :] = csiborgtools.field.field_enclosed_mass( # noqa
density_field, distances, boxsize, verbose=False)
cumulative_vel_x[i, :], __ = csiborgtools.field.field_enclosed_mass(
velocity_field[0], distances, boxsize, verbose=False)
cumulative_vel_y[i, :], __ = csiborgtools.field.field_enclosed_mass(
velocity_field[1], distances, boxsize, verbose=False)
cumulative_vel_z[i, :], __ = csiborgtools.field.field_enclosed_mass(
velocity_field[2], distances, boxsize, verbose=False)
cumulative_vel = np.stack(
[cumulative_vel_x, cumulative_vel_y, cumulative_vel_z], axis=-1)
cumulative_vel /= cumulative_volume[..., None]
# Finally save the output
fname = f"enclosed_mass_{args.simname}.npz"
fname = join(folder, fname)
np.savez(fname, enclosed_mass=cumulative_mass, distances=distances,
cumulative_velocity=cumulative_vel,
enclosed_volume=cumulative_volume)
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("--simname", type=str, help="Simulation name.",
choices=["csiborg1", "csiborg2_main", "csiborg2_varysmall", "csiborg2_random", "borg1", "borg2", "borg2_all"]) # noqa
choices=["csiborg1", "csiborg2_main", "csiborg2_varysmall", "csiborg2_random", "borg1", "borg2", "borg2_all", "csiborg2X"]) # noqa
args = parser.parse_args()
folder = "/mnt/extraspace/rstiskalek/csiborg_postprocessing/field_shells"
if "csiborg" in args.simname:
if args.simname == "csiborg2X":
main_csiborg2X(args, folder)
elif "csiborg" in args.simname:
main_csiborg(args, folder)
elif "borg" in args.simname:
main_borg(args, folder)

View file

@ -1,6 +1,6 @@
nthreads=1
memory=12
on_login=0
on_login=1
queue="berg"
env="/mnt/zfsusers/rstiskalek/csiborgtools/venv_csiborg/bin/python"
file="field_bulk.py"

View file

@ -14,166 +14,185 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
Script to run the PV validation model on various catalogues and simulations.
The script is MPI parallelized over the IC realizations.
The script is not MPI parallelised, instead it is best run on a GPU.
"""
from argparse import ArgumentParser
from datetime import datetime
from os import makedirs, remove, rmdir
from os.path import exists, join
import csiborgtools
import jax
import numpy as np
from h5py import File
from mpi4py import MPI
from numpyro.infer import MCMC, NUTS, init_to_sample
from taskmaster import work_delegation # noqa
from argparse import ArgumentParser, ArgumentTypeError
def get_model(args, nsim_iterator, get_model_kwargs):
"""
Load the data and create the NumPyro model.
def none_or_int(value):
if value.lower() == "none":
return None
try:
return int(value)
except ValueError:
raise ArgumentTypeError(f"Invalid value: {value}. Must be an integer or 'none'.") # noqa
Parameters
----------
args : argparse.Namespace
Command line arguments.
nsim_iterator : int
Simulation index, not the IC index. Ranges from 0, ... .
get_model_kwargs : dict
Keyword arguments for reading in the data for the model
(`csiboorgtools.flow.get_model`).
Returns
-------
numpyro model
"""
def parse_args():
parser = ArgumentParser()
parser.add_argument("--simname", type=str, required=True,
help="Simulation name.")
parser.add_argument("--catalogue", type=str, required=True,
help="PV catalogue.")
parser.add_argument("--ksmooth", type=int, default=1,
help="Smoothing index.")
parser.add_argument("--ksim", type=none_or_int, default=None,
help="IC iteration number. If 'None', all IC realizations are used.") # noqa
parser.add_argument("--ndevice", type=int, default=1,
help="Number of devices to request.")
parser.add_argument("--device", type=str, default="cpu",
help="Device to use.")
return parser.parse_args()
ARGS = parse_args()
# This must be done before we import JAX etc.
from numpyro import set_host_device_count, set_platform # noqa
set_platform(ARGS.device) # noqa
set_host_device_count(ARGS.ndevice) # noqa
import sys # noqa
from os.path import join # noqa
import jax # noqa
from h5py import File # noqa
from mpi4py import MPI # noqa
from numpyro.infer import MCMC, NUTS, init_to_median # noqa
import csiborgtools # noqa
def print_variables(names, variables):
for name, variable in zip(names, variables):
print(f"{name:<20} {variable}", flush=True)
print(flush=True)
def get_model(paths, get_model_kwargs, verbose=True):
"""Load the data and create the NumPyro model."""
paths = csiborgtools.read.Paths(**csiborgtools.paths_glamdring)
folder = "/mnt/extraspace/rstiskalek/catalogs/"
if args.catalogue == "A2":
nsims = paths.get_ics(ARGS.simname)
if ARGS.ksim is None:
nsim_iterator = [i for i in range(len(nsims))]
else:
nsim_iterator = [ARGS.ksim]
nsims = [nsims[ARGS.ksim]]
if verbose:
print(f"{'Simulation:':<20} {ARGS.simname}")
print(f"{'Catalogue:':<20} {ARGS.catalogue}")
print(f"{'Num. realisations:':<20} {len(nsims)}")
print(flush=True)
if ARGS.catalogue == "A2":
fpath = join(folder, "A2.h5")
elif args.catalogue in ["LOSS", "Foundation", "Pantheon+", "SFI_gals",
elif ARGS.catalogue in ["LOSS", "Foundation", "Pantheon+", "SFI_gals",
"2MTF", "SFI_groups", "SFI_gals_masked",
"Pantheon+_groups", "Pantheon+_groups_zSN",
"Pantheon+_zSN"]:
fpath = join(folder, "PV_compilation.hdf5")
elif "CB2_" in args.catalogue:
kind = args.catalogue.split("_")[-1]
fpath = join(folder, f"PV_mock_CB2_17417_{kind}.hdf5")
else:
raise ValueError(f"Unknown catalogue: `{args.catalogue}`.")
raise ValueError(f"Unsupported catalogue: `{ARGS.catalogue}`.")
loader = csiborgtools.flow.DataLoader(args.simname, nsim_iterator,
args.catalogue, fpath, paths,
ksmooth=args.ksmooth)
loader = csiborgtools.flow.DataLoader(ARGS.simname, nsim_iterator,
ARGS.catalogue, fpath, paths,
ksmooth=ARGS.ksmooth)
return csiborgtools.flow.get_model(loader, **get_model_kwargs)
def run_model(model, nsteps, nburn, nchains, nsim, dump_folder,
model_kwargs, show_progress=True):
"""
Run the NumPyro model and save the thinned samples to a temporary file.
def get_harmonic_evidence(samples, log_posterior, nchains_harmonic, epoch_num):
"""Compute evidence using the `harmonic` package."""
data, names = csiborgtools.dict_samples_to_array(samples)
data = data.reshape(nchains_harmonic, -1, len(names))
log_posterior = log_posterior.reshape(10, -1)
Parameters
----------
model : jax.numpyro.Primitive
Model to be run.
nsteps : int
Number of steps.
nburn : int
Number of burn-in steps.
nchains : int
Number of chains.
nsim : int
Simulation index.
dump_folder : str
Folder where the temporary files are stored.
show_progress : bool
Whether to show the progress bar.
return csiborgtools.harmonic_evidence(
data, log_posterior, return_flow_samples=False, epochs_num=epoch_num)
Returns
-------
None
"""
nuts_kernel = NUTS(model, init_strategy=init_to_sample)
mcmc = MCMC(nuts_kernel, num_warmup=nburn, num_samples=nsteps,
chain_method="sequential", num_chains=nchains,
progress_bar=show_progress)
rng_key = jax.random.PRNGKey(42)
mcmc.run(rng_key, **model_kwargs)
if show_progress:
print(f"Summary of the MCMC run of simulation indexed {nsim}:")
mcmc.print_summary()
samples = mcmc.get_samples()
thinned_samples = csiborgtools.thin_samples_by_acl(samples)
# Calculate the chi2
keys = list(thinned_samples.keys())
nsamples = len(thinned_samples[keys[0]])
def run_model(model, nsteps, nburn, model_kwargs, out_folder, sample_beta,
calculate_evidence, nchains_harmonic, epoch_num, kwargs_print):
"""Run the NumPyro model and save output to a file."""
try:
zobs_mean, zobs_std = model.predict_zobs(thinned_samples)
nu = model.ndata - len(keys)
chi2 = [np.sum((zobs_mean[:, i] - model._z_obs)**2 / zobs_std[:, i]**2) / nu # noqa
for i in range(nsamples)]
except NotImplementedError:
chi2 = [0. for _ in range(nsamples)]
ndata = model.ndata
except AttributeError as e:
raise AttributeError("The model must have an attribute `ndata` "
"indicating the number of data points.") from e
gof = csiborgtools.numpyro_gof(model, mcmc, model_kwargs)
nuts_kernel = NUTS(model, init_strategy=init_to_median(num_samples=1000))
mcmc = MCMC(nuts_kernel, num_warmup=nburn, num_samples=nsteps)
rng_key = jax.random.PRNGKey(42)
# Save the samples to the temporary folder.
fname = join(dump_folder, f"samples_{nsim}.npz")
np.savez(fname, **thinned_samples, **gof, chi2=chi2)
mcmc.run(rng_key, extra_fields=("potential_energy",), **model_kwargs)
samples = mcmc.get_samples()
log_posterior = -mcmc.get_extra_fields()["potential_energy"]
log_likelihood = samples.pop("ll_values")
if log_likelihood is None:
raise ValueError("The samples must contain the log likelihood values under the key `ll_values`.") # noqa
def combine_from_simulations(catalogue_name, simname, nsims, outfolder,
dumpfolder, ksmooth):
"""
Combine the results from individual simulations into a single file.
BIC, AIC = csiborgtools.BIC_AIC(samples, log_likelihood, ndata)
print(f"{'BIC':<20} {BIC}")
print(f"{'AIC':<20} {AIC}")
mcmc.print_summary()
Parameters
----------
catalogue_name : str
Catalogue name.
simname : str
Simulation name.
nsims : list
List of IC realisations.
outfolder : str
Output folder.
dumpfolder : str
Dumping folder where the temporary files are stored.
ksmooth : int
Smoothing index.
if calculate_evidence:
print("Calculating the evidence using `harmonic`.", flush=True)
ln_evidence, ln_evidence_err = get_harmonic_evidence(
samples, log_posterior, nchains_harmonic, epoch_num)
print(f"{'ln(Z)':<20} {ln_evidence}")
print(f"{'ln(Z) error':<20} {ln_evidence_err}")
else:
ln_evidence = jax.numpy.nan
ln_evidence_err = (jax.numpy.nan, jax.numpy.nan)
Returns
-------
None
"""
fname_out = join(
outfolder,
f"flow_samples_{catalogue_name}_{simname}_smooth_{ksmooth}.hdf5")
print(f"Combining results from invidivual simulations to `{fname_out}`.")
fname = f"samples_{ARGS.simname}_{ARGS.catalogue}_ksmooth{ARGS.ksmooth}.hdf5" # noqa
if ARGS.ksim is not None:
fname = fname.replace(".hdf5", f"_nsim{ARGS.ksim}.hdf5")
if exists(fname_out):
remove(fname_out)
if sample_beta:
fname = fname.replace(".hdf5", "_sample_beta.hdf5")
for nsim in nsims:
fname = join(dumpfolder, f"samples_{nsim}.npz")
data = np.load(fname)
fname = join(out_folder, fname)
print(f"Saving results to `{fname}`.")
with File(fname, "w") as f:
# Write samples
grp = f.create_group("samples")
for key, value in samples.items():
grp.create_dataset(key, data=value)
with File(fname_out, 'a') as f:
grp = f.create_group(f"sim_{nsim}")
for key in data.files:
grp.create_dataset(key, data=data[key])
# Write log likelihood and posterior
f.create_dataset("log_likelihood", data=log_likelihood)
f.create_dataset("log_posterior", data=log_posterior)
# Remove the temporary file.
remove(fname)
# Write goodness of fit
grp = f.create_group("gof")
grp.create_dataset("BIC", data=BIC)
grp.create_dataset("AIC", data=AIC)
grp.create_dataset("lnZ", data=ln_evidence)
grp.create_dataset("lnZ_err", data=ln_evidence_err)
fname_summary = fname.replace(".hdf5", ".txt")
print(f"Saving summary to `{fname_summary}`.")
with open(fname_summary, 'w') as f:
original_stdout = sys.stdout
sys.stdout = f
print("User parameters:")
for kwargs in kwargs_print:
print_variables(kwargs.keys(), kwargs.values())
print("HMC summary:")
print(f"{'BIC':<20} {BIC}")
print(f"{'AIC':<20} {AIC}")
print(f"{'ln(Z)':<20} {ln_evidence}")
print(f"{'ln(Z) error':<20} {ln_evidence_err}")
mcmc.print_summary(exclude_deterministic=False)
sys.stdout = original_stdout
# Remove the dumping folder.
rmdir(dumpfolder)
print("Finished combining results.")
###############################################################################
# Command line interface #
@ -181,52 +200,68 @@ def combine_from_simulations(catalogue_name, simname, nsims, outfolder,
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("--simname", type=str, required=True,
help="Simulation name.")
parser.add_argument("--catalogue", type=str, required=True,
help="PV catalogue.")
parser.add_argument("--ksmooth", type=int, required=True,
help="Smoothing index.")
parser.add_argument("--nchains", type=int, default=4,
help="Number of chains.")
parser.add_argument("--nsteps", type=int, default=2500,
help="Number of post burn-n steps.")
parser.add_argument("--nburn", type=int, default=500,
help="Number of burn-in steps.")
args = parser.parse_args()
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
out_folder = "/mnt/extraspace/rstiskalek/csiborg_postprocessing/peculiar_velocity" # noqa
paths = csiborgtools.read.Paths(**csiborgtools.paths_glamdring)
nsims = paths.get_ics(args.simname)
out_folder = "/mnt/extraspace/rstiskalek/csiborg_postprocessing/peculiar_velocity" # noqa
print(f"{'Num. devices:':<20} {jax.device_count()}")
print(f"{'Devices:':<20} {jax.devices()}")
get_model_kwargs = {"zcmb_max": 0.06}
model_kwargs = {"sample_alpha": True, "sample_beta": True}
if "CB2_" in args.catalogue:
model_kwargs["sample_h"] = False
###########################################################################
# Fixed user parameters #
###########################################################################
# Create the dumping folder.
if comm.Get_rank() == 0:
dump_folder = join(out_folder,
f"temp_{str(datetime.now())}".replace(" ", "_"))
print(f"Creating folder `{dump_folder}`.")
makedirs(dump_folder)
nsteps = 5000
nburn = 500
zcmb_max = 0.06
sample_alpha = True
sample_beta = True
calculate_evidence = False
nchains_harmonic = 10
num_epochs = 30
if nsteps % nchains_harmonic != 0:
raise ValueError("The number of steps must be divisible by the number of chains.") # noqa
main_params = {"nsteps": nsteps, "nburn": nburn, "zcmb_max": zcmb_max,
"sample_alpha": sample_alpha, "sample_beta": sample_beta,
"calculate_evidence": calculate_evidence,
"nchains_harmonic": nchains_harmonic,
"num_epochs": num_epochs}
print_variables(main_params.keys(), main_params.values())
calibration_hyperparams = {"Vext_std": 250,
"alpha_mean": 1.0, "alpha_std": 0.5,
"beta_mean": 1.0, "beta_std": 0.5,
"sigma_v_mean": 200., "sigma_v_std": 100.,
"sample_alpha": sample_alpha,
"sample_beta": sample_beta,
}
print_variables(
calibration_hyperparams.keys(), calibration_hyperparams.values())
if ARGS.catalogue in ["LOSS", "Foundation", "Pantheon+", "Pantheon+_groups"]: # noqa
distmod_hyperparams = {"e_mu_mean": 0.1, "e_mu_std": 0.05,
"mag_cal_mean": -18.25, "mag_cal_std": 0.5,
"alpha_cal_mean": 0.148, "alpha_cal_std": 0.05,
"beta_cal_mean": 3.112, "beta_cal_std": 1.0,
}
elif ARGS.catalogue in ["SFI_gals", "2MTF"]:
distmod_hyperparams = {"e_mu_mean": 0.3, "e_mu_std": 0.15,
"a_mean": -21., "a_std": 0.5,
"b_mean": -5.95, "b_std": 0.25,
}
else:
dump_folder = None
dump_folder = comm.bcast(dump_folder, root=0)
raise ValueError(f"Unsupported catalogue: `{ARGS.catalogue}`.")
def main(i):
model = get_model(args, i, get_model_kwargs)
run_model(model, args.nsteps, args.nburn, args.nchains, nsims[i],
dump_folder, model_kwargs, show_progress=size == 1)
print_variables(
distmod_hyperparams.keys(), distmod_hyperparams.values())
work_delegation(main, [i for i in range(len(nsims))], comm,
master_verbose=True)
comm.Barrier()
kwargs_print = (main_params, calibration_hyperparams, distmod_hyperparams)
###########################################################################
if rank == 0:
combine_from_simulations(args.catalogue, args.simname, nsims,
out_folder, dump_folder, args.ksmooth)
model_kwargs = {"calibration_hyperparams": calibration_hyperparams,
"distmod_hyperparams": distmod_hyperparams}
get_model_kwargs = {"zcmb_max": zcmb_max}
model = get_model(paths, get_model_kwargs, )
run_model(model, nsteps, nburn, model_kwargs, out_folder, sample_beta,
calculate_evidence, nchains_harmonic, num_epochs, kwargs_print)

View file

@ -1,25 +1,40 @@
memory=4
on_login=0
nthreads=${1}
ksmooth=${2}
#!/bin/bash
memory=8
on_login=${1}
ndevice=1
queue="berg"
env="/mnt/users/rstiskalek/csiborgtools/venv_csiborg/bin/python"
device="gpu"
queue="gpulong"
gputype="rtx2080with12gb"
env="/mnt/users/rstiskalek/csiborgtools/venv_gpu_csiborgtools/bin/python"
file="flow_validation.py"
#"Pantheon+_zSN"
catalogue="Pantheon+_groups"
simname="Carrick2015"
ksmooth=0
pythoncm="$env $file --catalogue $catalogue --simname $simname --ksmooth $ksmooth"
if [ $on_login -eq 1 ]; then
echo $pythoncm
$pythoncm
else
cm="addqueue -q $queue -n $nthreads -m $memory $pythoncm"
echo "Submitting:"
echo $cm
echo
eval $cm
if [ "$on_login" != "1" ] && [ "$on_login" != "0" ]; then
echo "Invalid input: 'on_login' (1). Please provide 1 or 0."
exit 1
fi
# Submit a job for each combination of simname, catalogue, ksim
for simname in "csiborg2_main"; do
for catalogue in "2MTF"; do
# for ksim in 0 1 2; do
for ksim in 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 "none"; do
# for ksim in 0; do
pythoncm="$env $file --catalogue $catalogue --simname $simname --ksim $ksim --ksmooth $ksmooth --ndevice $ndevice --device $device"
if [ $on_login -eq 1 ]; then
echo $pythoncm
$pythoncm
else
cm="addqueue -q $queue -s -m $memory --gpus 1 --gputype $gputype $pythoncm"
echo "Submitting:"
echo $cm
eval $cm
fi
echo
sleep 0.05
done
done
done

View file

@ -3,15 +3,18 @@ from setuptools import find_packages, setup
BUILD_REQ = ["numpy", "scipy"]
INSTALL_REQ = BUILD_REQ
INSTALL_REQ += [
"numba",
"tqdm",
"healpy",
"astropy",
"scikit-learn",
"h5py",
"pynbody",
"joblib",
]
"astropy",
"colossus",
"h5py",
"healpy",
"joblib",
"mpi4py",
"numba",
"numpyro",
"quadax",
"scikit-learn",
"tqdm",
]
setup(
name="csiborgtools",