Overlapper improvements (#53)

* Store indices as f32

* Fix init sorting

* Organise imports

* Rename pathing

* Add particle loading

* Improve particle reading

* Add h5py reader

* edit particle path

* Update particles loading

* update particles loading

* Fix particle dumping

* Add init fitting

* Fix bug due to insufficient precision

* Add commnet

* Add comment

* Add clumps catalogue to halo cat

* Add comment

* Make sure PIDS never forced to float32

* fix pid reading

* fix pid reading

* Update matching to work with new arrays

* Stop using cubical sub boxes, turn off nshift if no smoothing

* Improve caching

* Move function definitions

* Simplify calculation

* Add import

* Small updates to the halo

* Simplify calculation

* Simplify looping calculation

* fix tonew

* Add initial data

* Add skip condition

* Add unit conversion

* Add loading background in batches

* Rename mmain index

* Switch overlaps to h5

* Add finite lagpatch check

* fix column name

* Add verbosity flags

* Save halo IDs instead.

* Switch back to npz

* Delte nbs

* Reduce size of the box

* Load correct bckg of halos being matched

* Remove verbosity

* verbosity edits

* Change lower thresholds
This commit is contained in:
Richard Stiskalek 2023-05-06 16:52:48 +01:00 committed by GitHub
parent 1c9dacfde5
commit 56e39a8b1d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 864 additions and 3816 deletions

View file

@ -18,9 +18,7 @@ realisation must have been split in advance by `runsplit_halos`.
"""
from argparse import ArgumentParser
from datetime import datetime
from os.path import join
import h5py
import numpy
from mpi4py import MPI
from tqdm import tqdm
@ -33,20 +31,26 @@ except ModuleNotFoundError:
sys.path.append("../")
import csiborgtools
parser = ArgumentParser()
parser.add_argument("--kind", type=str, choices=["halos", "clumps"])
args = parser.parse_args()
# Get MPI things
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nproc = comm.Get_size()
verbose = nproc == 1
parser = ArgumentParser()
parser.add_argument("--kind", type=str, choices=["halos", "clumps"])
parser.add_argument("--ics", type=int, nargs="+", default=None,
help="IC realisations. If `-1` processes all simulations.")
args = parser.parse_args()
paths = csiborgtools.read.CSiBORGPaths(**csiborgtools.paths_glamdring)
partreader = csiborgtools.read.ParticleReader(paths)
nfwpost = csiborgtools.fits.NFWPosterior()
ftemp = join(paths.temp_dumpdir, "fit_clump_{}_{}_{}.npy")
if args.ics is None or args.ics[0] == -1:
ics = paths.get_ics(tonew=False)
else:
ics = args.ics
cols_collect = [
("index", numpy.int32),
("npart", numpy.int32),
@ -63,7 +67,7 @@ cols_collect = [
("lambda200c", numpy.float32),
("r200m", numpy.float32),
("m200m", numpy.float32),
]
]
def fit_clump(particles, clump_info, box):
@ -95,46 +99,19 @@ def fit_clump(particles, clump_info, box):
return out
def load_clump_particles(clumpid, particles, clump_map):
"""
Load a clump's particles. If it is not there, i.e clump has no associated
particles, return `None`.
"""
try:
return particles[clump_map[clumpid], :]
except KeyError:
return None
def load_parent_particles(clumpid, particles, clump_map, clumps_cat):
"""
Load a parent halo's particles.
"""
indxs = clumps_cat["index"][clumps_cat["parent"] == clumpid]
# We first load the particles of each clump belonging to this parent
# and then concatenate them for further analysis.
clumps = []
for ind in indxs:
parts = load_clump_particles(ind, particles, clump_map)
if parts is not None:
clumps.append(parts)
if len(clumps) == 0:
return None
return numpy.concatenate(clumps)
# We now start looping over all simulations
for i, nsim in enumerate(paths.get_ics(tonew=False)):
if rank == 0:
print(f"{datetime.now()}: calculating {i}th simulation `{nsim}`.",
flush=True)
# We MPI loop over all simulations.
jobs = csiborgtools.fits.split_jobs(len(ics), nproc)[rank]
for nsim in [ics[i] for i in jobs]:
print(f"{datetime.now()}: rank {rank} calculating simulation `{nsim}`.",
flush=True)
nsnap = max(paths.get_snapshots(nsim))
box = csiborgtools.read.BoxUnits(nsnap, nsim, paths)
# Particle archive
particles = h5py.File(paths.particle_h5py_path(nsim), 'r')["particles"]
clump_map = h5py.File(paths.particle_h5py_path(nsim, "clumpmap"), 'r')
f = csiborgtools.read.read_h5(paths.particles_path(nsim))
particles = f["particles"]
clump_map = f["clumpmap"]
clid2map = {clid: i for i, clid in enumerate(clump_map[:, 0])}
clumps_cat = csiborgtools.read.ClumpsCatalogue(nsim, paths, rawdata=True,
load_fitted=False)
# We check whether we fit halos or clumps, will be indexing over different
@ -143,66 +120,39 @@ for i, nsim in enumerate(paths.get_ics(tonew=False)):
ismain = clumps_cat.ismain
else:
ismain = numpy.ones(len(clumps_cat), dtype=bool)
ntasks = len(clumps_cat)
# We split the clumps among the processes. Each CPU calculates a fraction
# of them and dumps the results in a structured array. Even if we are
# calculating parent halo this index runs over all clumps.
jobs = csiborgtools.fits.split_jobs(ntasks, nproc)[rank]
out = csiborgtools.read.cols_to_structured(len(jobs), cols_collect)
for i, j in enumerate(tqdm(jobs)) if nproc == 1 else enumerate(jobs):
clumpid = clumps_cat["index"][j]
out["index"][i] = clumpid
# Even if we are calculating parent halo this index runs over all clumps.
out = csiborgtools.read.cols_to_structured(len(clumps_cat), cols_collect)
indxs = clumps_cat["index"]
for i, clid in enumerate(tqdm(indxs)) if verbose else enumerate(indxs):
clid = clumps_cat["index"][i]
out["index"][i] = clid
# If we are fitting halos and this clump is not a main, then continue.
if args.kind == "halos" and not ismain[j]:
if args.kind == "halos" and not ismain[i]:
continue
if args.kind == "halos":
part = load_parent_particles(clumpid, particles, clump_map,
clumps_cat)
part = csiborgtools.read.load_parent_particles(
clid, particles, clump_map, clid2map, clumps_cat)
else:
part = load_clump_particles(clumpid, particles, clump_map)
part = csiborgtools.read.load_clump_particles(clid, particles,
clump_map, clid2map)
# We fit the particles if there are any. If not we assign the index,
# otherwise it would be NaN converted to integers (-2147483648) and
# yield an error further down.
if part is not None:
_out = fit_clump(part, clumps_cat[j], box)
for key in _out.keys():
out[key][i] = _out[key]
if part is None:
continue
fout = ftemp.format(str(nsim).zfill(5), str(nsnap).zfill(5), rank)
if nproc == 0:
print(f"{datetime.now()}: rank {rank} saving to `{fout}`.", flush=True)
_out = fit_clump(part, clumps_cat[i], box)
for key in _out.keys():
out[key][i] = _out[key]
# Finally, we save the results. If we were analysing main halos, then
# remove array indices that do not correspond to parent halos.
if args.kind == "halos":
out = out[ismain]
fout = paths.structfit_path(nsnap, nsim, args.kind)
print(f"Saving to `{fout}`.", flush=True)
numpy.save(fout, out)
# We saved this CPU's results in a temporary file. Wait now for the other
# CPUs and then collect results from the 0th rank and save them.
comm.Barrier()
if rank == 0:
print(f"{datetime.now()}: collecting results for simulation `{nsim}`.",
flush=True)
# We write to the output array. Load data from each CPU and append to
# the output array.
out = csiborgtools.read.cols_to_structured(ntasks, cols_collect)
clumpid2outpos = {indx: i
for i, indx in enumerate(clumps_cat["index"])}
for i in range(nproc):
inp = numpy.load(ftemp.format(str(nsim).zfill(5),
str(nsnap).zfill(5), i))
for j, clumpid in enumerate(inp["index"]):
k = clumpid2outpos[clumpid]
for key in inp.dtype.names:
out[key][k] = inp[key][j]
# If we were analysing main halos, then remove array indices that do
# not correspond to parent halos.
if args.kind == "halos":
out = out[ismain]
fout = paths.structfit_path(nsnap, nsim, args.kind)
print(f"Saving to `{fout}`.", flush=True)
numpy.save(fout, out)
# We now wait before moving on to another simulation.
comm.Barrier()